refactoring

This commit is contained in:
Chris Lu 2020-09-10 23:05:00 -07:00
parent 19537c9d21
commit 89a62e8007

View file

@ -42,15 +42,16 @@ func (c *commandVolumeBalance) Help() string {
idealWritableVolumes = totalWritableVolumes / numVolumeServers idealWritableVolumes = totalWritableVolumes / numVolumeServers
for hasMovedOneVolume { for hasMovedOneVolume {
sort all volume servers ordered by the number of local writable volumes sort all volume servers ordered by the number of local writable volumes
pick the volume server A with the lowest number of writable volumes x
pick the volume server B with the highest number of writable volumes y pick the volume server B with the highest number of writable volumes y
for any the volume server A with the number of writable volumes x +1 <= idealWritableVolume {
if y > idealWritableVolumes and x +1 <= idealWritableVolumes { if y > idealWritableVolumes and x +1 <= idealWritableVolumes {
if B has a writable volume id v that A does not have { if B has a writable volume id v that A does not have, and satisfy v replication requirements {
move writable volume v from A to B move writable volume v from A to B
} }
} }
} }
} }
}
func balanceReadOnlyVolumes(){ func balanceReadOnlyVolumes(){
//similar to balanceWritableVolumes //similar to balanceWritableVolumes
} }
@ -185,7 +186,7 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
}) })
} }
func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error { func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
selectedVolumeCount := 0 selectedVolumeCount := 0
for _, dn := range nodes { for _, dn := range nodes {
selectedVolumeCount += len(dn.selectedVolumes) selectedVolumeCount += len(dn.selectedVolumes)
@ -193,24 +194,43 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates
idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes)) idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes))
hasMove := true hasMoved := true
for hasMove { for hasMoved {
hasMove = false hasMoved = false
sort.Slice(nodes, func(i, j int) bool { sort.Slice(nodes, func(i, j int) bool {
// TODO sort by free volume slots??? // TODO sort by free volume slots???
return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes) return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes)
}) })
emptyNode, fullNode := nodes[0], nodes[len(nodes)-1]
if len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes {
// sort the volumes to move fullNode := nodes[len(nodes)-1]
var candidateVolumes []*master_pb.VolumeInformationMessage var candidateVolumes []*master_pb.VolumeInformationMessage
for _, v := range fullNode.selectedVolumes { for _, v := range fullNode.selectedVolumes {
candidateVolumes = append(candidateVolumes, v) candidateVolumes = append(candidateVolumes, v)
} }
sortCandidatesFn(candidateVolumes) sortCandidatesFn(candidateVolumes)
for i := 0; i < len(nodes)-1; i++ {
emptyNode := nodes[i]
if !(len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes) {
// no more volume servers with empty slots
break
}
hasMoved, err = attemptToMoveOneVolume(commandEnv, fullNode, candidateVolumes, emptyNode, applyBalancing)
if err != nil {
return
}
if hasMoved {
// moved one volume
break
}
}
}
return nil
}
func attemptToMoveOneVolume(commandEnv *CommandEnv, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) {
for _, v := range candidateVolumes { for _, v := range candidateVolumes {
if v.ReplicaPlacement > 0 { if v.ReplicaPlacement > 0 {
if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack { if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack {
@ -220,19 +240,17 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates
} }
} }
if _, found := emptyNode.selectedVolumes[v.Id]; !found { if _, found := emptyNode.selectedVolumes[v.Id]; !found {
if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil { if err = moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil {
delete(fullNode.selectedVolumes, v.Id) delete(fullNode.selectedVolumes, v.Id)
emptyNode.selectedVolumes[v.Id] = v emptyNode.selectedVolumes[v.Id] = v
hasMove = true hasMoved = true
break break
} else { } else {
return err return
} }
} }
} }
} return
}
return nil
} }
func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error { func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error {