From de4fcc0e2c3578cae784ba4bbcccd8b7d16ffc80 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 16:27:02 +0500 Subject: [PATCH] sync update topologyInfo --- weed/shell/command_volume_server_evacuate.go | 45 +++++++++----------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 7d50b7f81..d1c474a76 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -24,7 +24,6 @@ type commandVolumeServerEvacuate struct { topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string - otherNodes []*Node } func (c *commandVolumeServerEvacuate) Name() string { @@ -98,28 +97,6 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn return err } - if applyChange { - stopchan := make(chan struct{}) - go func() { - for { - select { - default: - if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { - fmt.Fprintf(writer, "update topologyInfo %v", err) - } else { - c.topologyInfo = topologyInfo - _, c.otherNodes = c.nodesOtherThan( - collectVolumeServersByDc(c.topologyInfo, ""), volumeServer) - fmt.Fprintf(writer, "topologyInfo updated %v\n", len(c.otherNodes)) - } - case <-stopchan: - return - } - } - }() - defer close(stopchan) - } - if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } @@ -134,18 +111,34 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server volumeServers := collectVolumeServersByDc(c.topologyInfo, "") - var thisNodes []*Node - thisNodes, c.otherNodes = c.nodesOtherThan(volumeServers, volumeServer) + thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) } // move away normal volumes + ticker := time.NewTicker(topologyInfoUpdateInterval) for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { + if applyChange { + select { + case <-ticker.C: + if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + _, otherNodesNew := c.nodesOtherThan( + collectVolumeServersByDc(topologyInfo, ""), volumeServer) + if len(otherNodesNew) > 0 { + otherNodes = otherNodesNew + c.topologyInfo = topologyInfo + fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes)) + } + } + } + } volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, c.otherNodes, applyChange) + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) }