refactoring

This commit is contained in:
Chris Lu 2020-09-07 12:35:02 -07:00
parent 1a09bc43d1
commit d80538a187

View file

@ -64,35 +64,35 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find all volumes that needs replication // find all volumes that needs replication
// collect all data nodes // collect all data nodes
replicatedVolumeLocations := make(map[uint32][]location) volumeReplicas := make(map[uint32][]*VolumeReplica)
replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
var allLocations []location var allLocations []location
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn) loc := newLocation(dc, string(rack), dn)
for _, v := range dn.VolumeInfos { for _, v := range dn.VolumeInfos {
if v.ReplicaPlacement > 0 { if v.ReplicaPlacement > 0 {
replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
replicatedVolumeInfo[v.Id] = v location: &loc,
info: v,
})
} }
} }
allLocations = append(allLocations, loc) allLocations = append(allLocations, loc)
}) })
// find all under replicated volumes // find all under replicated volumes
underReplicatedVolumeLocations := make(map[uint32][]location) var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
overReplicatedVolumeLocations := make(map[uint32][]location) for vid, replicas := range volumeReplicas {
for vid, locations := range replicatedVolumeLocations { replica := replicas[rand.Intn(len(replicas))]
volumeInfo := replicatedVolumeInfo[vid] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) if replicaPlacement.GetCopyCount() > len(replicas) {
if replicaPlacement.GetCopyCount() > len(locations) { underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
underReplicatedVolumeLocations[vid] = locations } else if replicaPlacement.GetCopyCount() < len(replicas) {
} else if replicaPlacement.GetCopyCount() < len(locations) { overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
overReplicatedVolumeLocations[vid] = locations fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
fmt.Fprintf(writer, "volume %d replication %s, but over replicated:%+v\n", volumeInfo.Id, replicaPlacement, locations)
} }
} }
if len(underReplicatedVolumeLocations) == 0 { if len(underReplicatedVolumeIds) == 0 {
return fmt.Errorf("no under replicated volumes") return fmt.Errorf("no under replicated volumes")
} }
@ -103,23 +103,22 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find the most under populated data nodes // find the most under populated data nodes
keepDataNodesSorted(allLocations) keepDataNodesSorted(allLocations)
return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeLocations, replicatedVolumeInfo, allLocations) return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
} }
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeLocations map[uint32][]location, replicatedVolumeInfo map[uint32]*master_pb.VolumeInformationMessage, allLocations []location) error { func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
for vid, locations := range underReplicatedVolumeLocations { for _, vid := range underReplicatedVolumeIds {
volumeInfo := replicatedVolumeInfo[vid] replicas := volumeReplicas[vid]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) replica := replicas[rand.Intn(len(replicas))]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false foundNewLocation := false
for _, dst := range allLocations { for _, dst := range allLocations {
// check whether data nodes satisfy the constraints // check whether data nodes satisfy the constraints
if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) { if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// ask the volume server to replicate the volume // ask the volume server to replicate the volume
sourceNodes := underReplicatedVolumeLocations[vid]
sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
foundNewLocation = true foundNewLocation = true
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id) fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
if !takeAction { if !takeAction {
break break
@ -127,11 +126,11 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeInfo.Id, VolumeId: replica.info.Id,
SourceDataNode: sourceNode.dataNode.Id, SourceDataNode: replica.location.dataNode.Id,
}) })
if replicateErr != nil { if replicateErr != nil {
return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr) return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
} }
return nil return nil
}) })
@ -147,7 +146,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
} }
} }
if !foundNewLocation { if !foundNewLocation {
fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations) fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
} }
} }
@ -190,11 +189,11 @@ func keepDataNodesSorted(dataNodes []location) {
return false return false
} }
*/ */
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool { func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
existingDataNodes := make(map[string]int) existingDataNodes := make(map[string]int)
for _, loc := range existingLocations { for _, replica := range replicas {
existingDataNodes[loc.String()] += 1 existingDataNodes[replica.location.String()] += 1
} }
sameDataNodeCount := existingDataNodes[possibleLocation.String()] sameDataNodeCount := existingDataNodes[possibleLocation.String()]
// avoid duplicated volume on the same data node // avoid duplicated volume on the same data node
@ -203,8 +202,8 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi
} }
existingDataCenters := make(map[string]int) existingDataCenters := make(map[string]int)
for _, loc := range existingLocations { for _, replica := range replicas {
existingDataCenters[loc.DataCenter()] += 1 existingDataCenters[replica.location.DataCenter()] += 1
} }
primaryDataCenters, _ := findTopKeys(existingDataCenters) primaryDataCenters, _ := findTopKeys(existingDataCenters)
@ -227,11 +226,11 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi
// now this is one of the primary dcs // now this is one of the primary dcs
existingRacks := make(map[string]int) existingRacks := make(map[string]int)
for _, loc := range existingLocations { for _, replica := range replicas {
if loc.DataCenter() != possibleLocation.DataCenter() { if replica.location.DataCenter() != possibleLocation.DataCenter() {
continue continue
} }
existingRacks[loc.Rack()] += 1 existingRacks[replica.location.Rack()] += 1
} }
primaryRacks, _ := findTopKeys(existingRacks) primaryRacks, _ := findTopKeys(existingRacks)
sameRackCount := existingRacks[possibleLocation.Rack()] sameRackCount := existingRacks[possibleLocation.Rack()]
@ -288,6 +287,11 @@ func isAmong(key string, keys []string) bool {
return false return false
} }
type VolumeReplica struct {
location *location
info *master_pb.VolumeInformationMessage
}
type location struct { type location struct {
dc string dc string
rack string rack string