update topologyInfo

This commit is contained in:
Konstantin Lebedev 2022-07-12 13:47:21 +05:00
parent ee95d23a22
commit 8372721a62

View file

@ -11,6 +11,7 @@ import (
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"io" "io"
"os" "os"
"time"
) )
func init() { func init() {
@ -18,6 +19,7 @@ func init() {
} }
type commandVolumeServerEvacuate struct { type commandVolumeServerEvacuate struct {
topologyInfo *master_pb.TopologyInfo
targetServer string targetServer string
volumeRack string volumeRack string
} }
@ -58,12 +60,12 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
} }
infoAboutSimulationMode(writer, *applyChange, "-force") infoAboutSimulationMode(writer, *applyChange, "-force")
if err = commandEnv.confirmIsLocked(args); err != nil { if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
return return
} }
if *volumeServer == "" { if *volumeServer == "" && *volumeRack == "" {
return fmt.Errorf("need to specify volume server by -node=<host>:<port>") return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack")
} }
if *targetServer != "" { if *targetServer != "" {
c.targetServer = *targetServer c.targetServer = *targetServer
@ -88,25 +90,33 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
// list all the volumes // list all the volumes
// collect topology information // collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0)
if err != nil { if err != nil {
return err return err
} }
if err := c.evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { go func() {
for {
if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil {
c.topologyInfo = topologyInfo
}
}
}()
if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err return err
} }
if err := c.evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err return err
} }
return nil return nil
} }
func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this volume server // find this volume server
volumeServers := collectVolumeServersByDc(topologyInfo, "") volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
if len(thisNodes) == 0 { if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster", volumeServer) return fmt.Errorf("%s is not found in this cluster", volumeServer)
@ -115,7 +125,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
// move away normal volumes // move away normal volumes
for _, thisNode := range thisNodes { for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos { for _, diskInfo := range thisNode.info.DiskInfos {
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
for _, vol := range diskInfo.VolumeInfos { for _, vol := range diskInfo.VolumeInfos {
hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
if err != nil { if err != nil {
@ -136,9 +146,9 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
return nil return nil
} }
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server // find this ec volume server
ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "") ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 { if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer) return fmt.Errorf("%s is not found in this cluster\n", volumeServer)