Volume filter by collection pattern added.

This commit is contained in:
user 2021-11-11 18:01:47 +09:00
parent dbb8003ce3
commit 563a74a9eb

View file

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"io" "io"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@ -39,6 +40,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id") volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
replicationString := configureReplicationCommand.String("replication", "", "the intended replication value") replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
collectionPattern := configureReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
if err = configureReplicationCommand.Parse(args); err != nil { if err = configureReplicationCommand.Parse(args); err != nil {
return nil return nil
} }
@ -63,7 +65,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
} }
vid := needle.VolumeId(*volumeIdInt) vid := needle.VolumeId(*volumeIdInt)
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid)) volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern)
// find all data nodes with volumes that needs replication change // find all data nodes with volumes that needs replication change
var allLocations []location var allLocations []location
@ -107,8 +109,17 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
return nil return nil
} }
func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32) func(message *master_pb.VolumeInformationMessage) bool { func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32, collectionPattern string) func(message *master_pb.VolumeInformationMessage) bool {
replicaPlacementInt32 := uint32(replicaPlacement.Byte()) replicaPlacementInt32 := uint32(replicaPlacement.Byte())
if collectionPattern != "" {
return func(v *master_pb.VolumeInformationMessage) bool {
matched, err := filepath.Match(collectionPattern, v.Collection)
if err != nil {
return false
}
return matched
}
}
return func(v *master_pb.VolumeInformationMessage) bool { return func(v *master_pb.VolumeInformationMessage) bool {
return v.Id == volumeId && v.ReplicaPlacement != replicaPlacementInt32 return v.Id == volumeId && v.ReplicaPlacement != replicaPlacementInt32
} }