clouse background update

This commit is contained in:
Konstantin Lebedev 2022-07-15 13:51:08 +05:00
parent 6622240df7
commit 4d5144e50d

View file

@ -14,6 +14,8 @@ import (
"time"
)
const topologyInfoUpdateInterval = 5 * time.Minute
func init() {
Commands = append(Commands, &commandVolumeServerEvacuate{})
}
@ -95,13 +97,22 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
return err
}
stopchan := make(chan struct{})
go func() {
for {
if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil {
c.topologyInfo = topologyInfo
select {
default:
if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil {
c.topologyInfo = topologyInfo
} else {
fmt.Fprintf(writer, "update topologyInfo %v", err)
}
case <-stopchan:
return
}
}
}()
defer close(stopchan)
if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
@ -127,7 +138,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
for _, diskInfo := range thisNode.info.DiskInfos {
volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
for _, vol := range diskInfo.VolumeInfos {
hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
if err != nil {
fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err)
}
@ -204,7 +215,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
return
}
func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
for _, n := range otherNodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {