diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 5640e58bb..7e9627b40 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb" "io" + "path/filepath" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -34,11 +35,12 @@ func (c *commandVolumeConfigureReplication) Help() string { ` } -func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { +func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) { configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id") replicationString := configureReplicationCommand.String("replication", "", "the intended replication value") + collectionPattern := configureReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") if err = configureReplicationCommand.Parse(args); err != nil { return nil } @@ -55,7 +57,6 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman if err != nil { return fmt.Errorf("replication format: %v", err) } - replicaPlacementInt32 := uint32(replicaPlacement.Byte()) // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv) @@ -64,6 +65,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman } vid := needle.VolumeId(*volumeIdInt) + volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern) // find all data nodes with volumes that needs replication change var allLocations []location @@ -71,7 +73,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman loc := newLocation(dc, string(rack), dn) for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { - if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 { + if volumeFilter(v) { allLocations = append(allLocations, loc) continue } @@ -106,3 +108,19 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman return nil } + +func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32, collectionPattern string) func(message *master_pb.VolumeInformationMessage) bool { + replicaPlacementInt32 := uint32(replicaPlacement.Byte()) + if volumeId > 0 { + return func(v *master_pb.VolumeInformationMessage) bool { + return v.Id == volumeId && v.ReplicaPlacement != replicaPlacementInt32 + } + } + return func(v *master_pb.VolumeInformationMessage) bool { + matched, err := filepath.Match(collectionPattern, v.Collection) + if err != nil { + return false + } + return matched + } +}