update otherNodes

This commit is contained in:
Konstantin Lebedev 2022-07-18 11:32:28 +05:00
parent 72dca31cfa
commit fa88dff7ce

View file

@ -24,6 +24,7 @@ type commandVolumeServerEvacuate struct {
topologyInfo *master_pb.TopologyInfo topologyInfo *master_pb.TopologyInfo
targetServer string targetServer string
volumeRack string volumeRack string
otherNodes []*Node
} }
func (c *commandVolumeServerEvacuate) Name() string { func (c *commandVolumeServerEvacuate) Name() string {
@ -97,22 +98,27 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
return err return err
} }
stopchan := make(chan struct{}) if applyChange {
go func() { stopchan := make(chan struct{})
for { go func() {
select { for {
default: select {
if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { default:
fmt.Fprintf(writer, "update topologyInfo %v", err) if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil {
} else { fmt.Fprintf(writer, "update topologyInfo %v", err)
c.topologyInfo = topologyInfo } else {
c.topologyInfo = topologyInfo
_, c.otherNodes = c.nodesOtherThan(
collectVolumeServersByDc(c.topologyInfo, ""), volumeServer)
fmt.Fprintf(writer, "topologyInfo updated %v\n", len(c.otherNodes))
}
case <-stopchan:
return
} }
case <-stopchan:
return
} }
} }()
}() defer close(stopchan)
defer close(stopchan) }
if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err return err
@ -128,7 +134,8 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this volume server // find this volume server
volumeServers := collectVolumeServersByDc(c.topologyInfo, "") volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) var thisNodes []*Node
thisNodes, c.otherNodes = c.nodesOtherThan(volumeServers, volumeServer)
if len(thisNodes) == 0 { if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster", volumeServer) return fmt.Errorf("%s is not found in this cluster", volumeServer)
} }
@ -138,7 +145,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
for _, diskInfo := range thisNode.info.DiskInfos { for _, diskInfo := range thisNode.info.DiskInfos {
volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
for _, vol := range diskInfo.VolumeInfos { for _, vol := range diskInfo.VolumeInfos {
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, c.otherNodes, applyChange)
if err != nil { if err != nil {
fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err)
} }
@ -152,7 +159,6 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
} }
} }
} }
} }
return nil return nil
} }