Konstantin Lebedev 2021-09-30 20:24:24 +05:00
parent dad88a452f
commit fc51ffce2b

View file

@ -56,6 +56,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry")
volumespPerStep := volFixReplicationCommand.Int("volumes_per_step", 0, "how many volumes to fix in one cycle")
if err = volFixReplicationCommand.Parse(args); err != nil { if err = volFixReplicationCommand.Parse(args); err != nil {
return nil return nil
} }
@ -66,44 +68,54 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
takeAction := !*skipChange takeAction := !*skipChange
// collect topology information underReplicatedVolumeIdsCount := 1
topologyInfo, _, err := collectTopologyInfo(commandEnv) for underReplicatedVolumeIdsCount > 0 {
if err != nil { // collect topology information
return err topologyInfo, _, err := collectTopologyInfo(commandEnv)
} if err != nil {
return err
}
// find all volumes that needs replication // find all volumes that needs replication
// collect all data nodes // collect all data nodes
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
if len(allLocations) == 0 { if len(allLocations) == 0 {
return fmt.Errorf("no data nodes at all") return fmt.Errorf("no data nodes at all")
} }
// find all under replicated volumes // find all under replicated volumes
var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
for vid, replicas := range volumeReplicas { for vid, replicas := range volumeReplicas {
replica := replicas[0] replica := replicas[0]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
if replicaPlacement.GetCopyCount() > len(replicas) { if replicaPlacement.GetCopyCount() > len(replicas) {
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
} else if replicaPlacement.GetCopyCount() < len(replicas) { } else if replicaPlacement.GetCopyCount() < len(replicas) {
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
}
}
if len(overReplicatedVolumeIds) > 0 {
if err := c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations); err != nil {
return err
}
}
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
if underReplicatedVolumeIdsCount > 0 {
// find the most under populated data nodes
if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumespPerStep); err != nil {
return err
}
}
if *skipChange {
break
} }
} }
return nil
if len(overReplicatedVolumeIds) > 0 {
return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
}
if len(underReplicatedVolumeIds) == 0 {
return nil
}
// find the most under populated data nodes
return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount)
} }
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
@ -156,8 +168,10 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil return nil
} }
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumespPerStep int) (err error) {
if len(underReplicatedVolumeIds) > volumespPerStep && volumespPerStep > 0 {
underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumespPerStep]
}
for _, vid := range underReplicatedVolumeIds { for _, vid := range underReplicatedVolumeIds {
for i := 0; i < retryCount+1; i++ { for i := 0; i < retryCount+1; i++ {
if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {