From fc51ffce2b2e07145e6c7144781a40491febd6cb Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Thu, 30 Sep 2021 20:24:24 +0500 Subject: [PATCH 1/3] https://github.com/chrislusf/seaweedfs/issues/1846 --- weed/shell/command_volume_fix_replication.go | 84 ++++++++++++-------- 1 file changed, 49 insertions(+), 35 deletions(-) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 76a582b31..9ac082e81 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -56,6 +56,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") + volumespPerStep := volFixReplicationCommand.Int("volumes_per_step", 0, "how many volumes to fix in one cycle") + if err = volFixReplicationCommand.Parse(args); err != nil { return nil } @@ -66,44 +68,54 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, takeAction := !*skipChange - // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv) - if err != nil { - return err - } + underReplicatedVolumeIdsCount := 1 + for underReplicatedVolumeIdsCount > 0 { + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) + if err != nil { + return err + } - // find all volumes that needs replication - // collect all data nodes - volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) + // find all volumes that needs replication + // collect all data nodes + volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) - if len(allLocations) == 0 { - return fmt.Errorf("no data nodes at all") - } + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") + } - // find all under replicated volumes - var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 - for vid, replicas := range volumeReplicas { - replica := replicas[0] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - if replicaPlacement.GetCopyCount() > len(replicas) { - underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) - } else if replicaPlacement.GetCopyCount() < len(replicas) { - overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) - fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + // find all under replicated volumes + var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 + for vid, replicas := range volumeReplicas { + replica := replicas[0] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(replicas) { + underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + } else if replicaPlacement.GetCopyCount() < len(replicas) { + overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + } + } + + if len(overReplicatedVolumeIds) > 0 { + if err := c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations); err != nil { + return err + } + } + + underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) + if underReplicatedVolumeIdsCount > 0 { + // find the most under populated data nodes + if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumespPerStep); err != nil { + return err + } + } + + if *skipChange { + break } } - - if len(overReplicatedVolumeIds) > 0 { - return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations) - } - - if len(underReplicatedVolumeIds) == 0 { - return nil - } - - // find the most under populated data nodes - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount) - + return nil } func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { @@ -156,8 +168,10 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { - +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumespPerStep int) (err error) { + if len(underReplicatedVolumeIds) > volumespPerStep && volumespPerStep > 0 { + underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumespPerStep] + } for _, vid := range underReplicatedVolumeIds { for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { From 2cecde89c31db80fc72713aa2cef1e0d4a26fa70 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Fri, 1 Oct 2021 00:17:54 +0500 Subject: [PATCH 2/3] rename opt volumesPerStep --- weed/shell/command_volume_fix_replication.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 9ac082e81..21b3ead6b 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -56,7 +56,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") - volumespPerStep := volFixReplicationCommand.Int("volumes_per_step", 0, "how many volumes to fix in one cycle") + volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle") if err = volFixReplicationCommand.Parse(args); err != nil { return nil @@ -106,7 +106,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) if underReplicatedVolumeIdsCount > 0 { // find the most under populated data nodes - if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumespPerStep); err != nil { + if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep); err != nil { return err } } @@ -168,9 +168,9 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumespPerStep int) (err error) { - if len(underReplicatedVolumeIds) > volumespPerStep && volumespPerStep > 0 { - underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumespPerStep] +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error) { + if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { + underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] } for _, vid := range underReplicatedVolumeIds { for i := 0; i < retryCount+1; i++ { From 5e64b22b45dc272254cd9d5eeb1e51814035d7fd Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Fri, 1 Oct 2021 18:51:22 +0500 Subject: [PATCH 3/3] check that the topology has been updated --- weed/shell/command_ec_decode.go | 12 ++++++ weed/shell/command_volume_fix_replication.go | 45 ++++++++++++++++++-- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 18cea0504..3483156cb 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -208,6 +208,18 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr } +func LookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (err error, volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation) { + var resp *master_pb.LookupVolumeResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds}) + return err + }) + if err != nil { + return err, nil + } + return nil, resp.VolumeIdLocations +} + func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { var resp *master_pb.VolumeListResponse diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 21b3ead6b..f03cd550e 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -10,6 +10,8 @@ import ( "io" "path/filepath" "sort" + "strconv" + "time" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -70,6 +72,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount := 1 for underReplicatedVolumeIdsCount > 0 { + fixedVolumeReplicas := map[string]int{} + // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { @@ -106,7 +110,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) if underReplicatedVolumeIdsCount > 0 { // find the most under populated data nodes - if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep); err != nil { + err, fixedVolumeReplicas = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) + if err != nil { return err } } @@ -114,6 +119,36 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, if *skipChange { break } + + // check that the topology has been updated + if len(fixedVolumeReplicas) > 0 { + fixedVolumes := make([]string, 0, len(fixedVolumeReplicas)) + for k, _ := range fixedVolumeReplicas { + fixedVolumes = append(fixedVolumes, k) + } + err, volumeIdLocations := LookupVolumeIds(commandEnv, fixedVolumes) + if err != nil { + return err + } + for _, volumeIdLocation := range volumeIdLocations { + volumeId := volumeIdLocation.VolumeOrFileId + volumeIdLocationCount := len(volumeIdLocation.Locations) + i := 0 + for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount { + fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId) + time.Sleep(time.Duration(i+1) * time.Second * 7) + err, volumeLocIds := LookupVolumeIds(commandEnv, []string{volumeId}) + if err != nil { + return err + } + volumeIdLocationCount = len(volumeLocIds[0].Locations) + if *retryCount > i { + return fmt.Errorf("replicas volume %s mismatch in topology", volumeId) + } + i += 1 + } + } + } } return nil } @@ -168,18 +203,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error) { +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error, fixedVolumes map[string]int) { + fixedVolumes = map[string]int{} if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] } for _, vid := range underReplicatedVolumeIds { for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { + if takeAction { + fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid]) + } break } } } - return + return nil, fixedVolumes } func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {