From d80538a18744d20943fc42f315a14cb76b76961a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 7 Sep 2020 12:35:02 -0700 Subject: [PATCH] refactoring --- weed/shell/command_volume_fix_replication.go | 76 ++++++++++---------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 8f9700292..4a1d2e056 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -64,35 +64,35 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find all volumes that needs replication // collect all data nodes - replicatedVolumeLocations := make(map[uint32][]location) - replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage) + volumeReplicas := make(map[uint32][]*VolumeReplica) var allLocations []location eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, v := range dn.VolumeInfos { if v.ReplicaPlacement > 0 { - replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) - replicatedVolumeInfo[v.Id] = v + volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ + location: &loc, + info: v, + }) } } allLocations = append(allLocations, loc) }) // find all under replicated volumes - underReplicatedVolumeLocations := make(map[uint32][]location) - overReplicatedVolumeLocations := make(map[uint32][]location) - for vid, locations := range replicatedVolumeLocations { - volumeInfo := replicatedVolumeInfo[vid] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) - if replicaPlacement.GetCopyCount() > len(locations) { - underReplicatedVolumeLocations[vid] = locations - } else if replicaPlacement.GetCopyCount() < len(locations) { - overReplicatedVolumeLocations[vid] = locations - fmt.Fprintf(writer, "volume %d replication %s, but over replicated:%+v\n", volumeInfo.Id, replicaPlacement, locations) + var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 + for vid, replicas := range volumeReplicas { + replica := replicas[rand.Intn(len(replicas))] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(replicas) { + underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + } else if replicaPlacement.GetCopyCount() < len(replicas) { + overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) } } - if len(underReplicatedVolumeLocations) == 0 { + if len(underReplicatedVolumeIds) == 0 { return fmt.Errorf("no under replicated volumes") } @@ -103,23 +103,22 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find the most under populated data nodes keepDataNodesSorted(allLocations) - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeLocations, replicatedVolumeInfo, allLocations) + return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations) } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeLocations map[uint32][]location, replicatedVolumeInfo map[uint32]*master_pb.VolumeInformationMessage, allLocations []location) error { - for vid, locations := range underReplicatedVolumeLocations { - volumeInfo := replicatedVolumeInfo[vid] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { + for _, vid := range underReplicatedVolumeIds { + replicas := volumeReplicas[vid] + replica := replicas[rand.Intn(len(replicas))] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false for _, dst := range allLocations { // check whether data nodes satisfy the constraints - if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) { + if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // ask the volume server to replicate the volume - sourceNodes := underReplicatedVolumeLocations[vid] - sourceNode := sourceNodes[rand.Intn(len(sourceNodes))] foundNewLocation = true - fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id) + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) if !takeAction { break @@ -127,11 +126,11 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: volumeInfo.Id, - SourceDataNode: sourceNode.dataNode.Id, + VolumeId: replica.info.Id, + SourceDataNode: replica.location.dataNode.Id, }) if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr) + return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) } return nil }) @@ -147,7 +146,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm } } if !foundNewLocation { - fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations) + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } } @@ -190,11 +189,11 @@ func keepDataNodesSorted(dataNodes []location) { return false } */ -func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool { +func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool { existingDataNodes := make(map[string]int) - for _, loc := range existingLocations { - existingDataNodes[loc.String()] += 1 + for _, replica := range replicas { + existingDataNodes[replica.location.String()] += 1 } sameDataNodeCount := existingDataNodes[possibleLocation.String()] // avoid duplicated volume on the same data node @@ -203,8 +202,8 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi } existingDataCenters := make(map[string]int) - for _, loc := range existingLocations { - existingDataCenters[loc.DataCenter()] += 1 + for _, replica := range replicas { + existingDataCenters[replica.location.DataCenter()] += 1 } primaryDataCenters, _ := findTopKeys(existingDataCenters) @@ -227,11 +226,11 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi // now this is one of the primary dcs existingRacks := make(map[string]int) - for _, loc := range existingLocations { - if loc.DataCenter() != possibleLocation.DataCenter() { + for _, replica := range replicas { + if replica.location.DataCenter() != possibleLocation.DataCenter() { continue } - existingRacks[loc.Rack()] += 1 + existingRacks[replica.location.Rack()] += 1 } primaryRacks, _ := findTopKeys(existingRacks) sameRackCount := existingRacks[possibleLocation.Rack()] @@ -288,6 +287,11 @@ func isAmong(key string, keys []string) bool { return false } +type VolumeReplica struct { + location *location + info *master_pb.VolumeInformationMessage +} + type location struct { dc string rack string