This commit is contained in:
Chris Lu 2020-09-07 11:31:33 -07:00
parent 2b643f477d
commit 1a09bc43d1

View file

@ -36,7 +36,7 @@ func (c *commandVolumeFixReplication) Help() string {
Note: Note:
* each time this will only add back one replica for one volume id. If there are multiple replicas * each time this will only add back one replica for one volume id. If there are multiple replicas
are missing, e.g. multiple volume servers are new, you may need to run this multiple times. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
* do not run this too quick within seconds, since the new volume replica may take a few seconds * do not run this too quickly within seconds, since the new volume replica may take a few seconds
to register itself to the master. to register itself to the master.
` `
@ -80,12 +80,14 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find all under replicated volumes // find all under replicated volumes
underReplicatedVolumeLocations := make(map[uint32][]location) underReplicatedVolumeLocations := make(map[uint32][]location)
overReplicatedVolumeLocations := make(map[uint32][]location)
for vid, locations := range replicatedVolumeLocations { for vid, locations := range replicatedVolumeLocations {
volumeInfo := replicatedVolumeInfo[vid] volumeInfo := replicatedVolumeInfo[vid]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
if replicaPlacement.GetCopyCount() > len(locations) { if replicaPlacement.GetCopyCount() > len(locations) {
underReplicatedVolumeLocations[vid] = locations underReplicatedVolumeLocations[vid] = locations
} else if replicaPlacement.GetCopyCount() < len(locations) { } else if replicaPlacement.GetCopyCount() < len(locations) {
overReplicatedVolumeLocations[vid] = locations
fmt.Fprintf(writer, "volume %d replication %s, but over replicated:%+v\n", volumeInfo.Id, replicaPlacement, locations) fmt.Fprintf(writer, "volume %d replication %s, but over replicated:%+v\n", volumeInfo.Id, replicaPlacement, locations)
} }
} }
@ -101,6 +103,11 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find the most under populated data nodes // find the most under populated data nodes
keepDataNodesSorted(allLocations) keepDataNodesSorted(allLocations)
return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeLocations, replicatedVolumeInfo, allLocations)
}
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeLocations map[uint32][]location, replicatedVolumeInfo map[uint32]*master_pb.VolumeInformationMessage, allLocations []location) error {
for vid, locations := range underReplicatedVolumeLocations { for vid, locations := range underReplicatedVolumeLocations {
volumeInfo := replicatedVolumeInfo[vid] volumeInfo := replicatedVolumeInfo[vid]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
@ -144,7 +151,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
} }
} }
return nil return nil
} }