From 7a6c559ab4a6b696bb574454b297ebefabec29ed Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 21 Jul 2022 22:01:05 -0700 Subject: [PATCH] fix Change replication via volume.configure.replication by collection fix https://github.com/chrislusf/seaweedfs/issues/3346 --- .../command_volume_configure_replication.go | 46 ++++++++----------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 610986489..b07d58083 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -68,45 +68,39 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern) // find all data nodes with volumes that needs replication change - var allLocations []location eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - loc := newLocation(dc, string(rack), dn) + var targetVolumeIds []uint32 for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { if volumeFilter(v) { - allLocations = append(allLocations, loc) - continue + targetVolumeIds = append(targetVolumeIds, v.Id) } } } - }) - - if len(allLocations) == 0 { - return fmt.Errorf("no volume needs change") - } - - for _, dst := range allLocations { - err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ - VolumeId: uint32(vid), - Replication: replicaPlacement.String(), - }) - if configureErr != nil { - return configureErr - } - if resp.Error != "" { - return errors.New(resp.Error) + if len(targetVolumeIds) == 0 { + return + } + err = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dn), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + for _, targetVolumeId := range targetVolumeIds { + resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ + VolumeId: targetVolumeId, + Replication: replicaPlacement.String(), + }) + if configureErr != nil { + return configureErr + } + if resp.Error != "" { + return errors.New(resp.Error) + } } return nil }) - if err != nil { - return err + return } + }) - } - - return nil + return err } func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32, collectionPattern string) func(message *master_pb.VolumeInformationMessage) bool {