mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #2438 from ekhvalov/volume_replication_collection
Volume configure replication by collection pattern
This commit is contained in:
commit
cef2718ede
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
|
@ -34,11 +35,12 @@ func (c *commandVolumeConfigureReplication) Help() string {
|
|||
`
|
||||
}
|
||||
|
||||
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) {
|
||||
|
||||
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
|
||||
replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
|
||||
collectionPattern := configureReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
|
||||
if err = configureReplicationCommand.Parse(args); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -55,7 +57,6 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
|||
if err != nil {
|
||||
return fmt.Errorf("replication format: %v", err)
|
||||
}
|
||||
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
|
||||
|
||||
// collect topology information
|
||||
topologyInfo, _, err := collectTopologyInfo(commandEnv)
|
||||
|
@ -64,6 +65,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
|||
}
|
||||
|
||||
vid := needle.VolumeId(*volumeIdInt)
|
||||
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern)
|
||||
|
||||
// find all data nodes with volumes that needs replication change
|
||||
var allLocations []location
|
||||
|
@ -71,7 +73,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
|||
loc := newLocation(dc, string(rack), dn)
|
||||
for _, diskInfo := range dn.DiskInfos {
|
||||
for _, v := range diskInfo.VolumeInfos {
|
||||
if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 {
|
||||
if volumeFilter(v) {
|
||||
allLocations = append(allLocations, loc)
|
||||
continue
|
||||
}
|
||||
|
@ -106,3 +108,19 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32, collectionPattern string) func(message *master_pb.VolumeInformationMessage) bool {
|
||||
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
|
||||
if volumeId > 0 {
|
||||
return func(v *master_pb.VolumeInformationMessage) bool {
|
||||
return v.Id == volumeId && v.ReplicaPlacement != replicaPlacementInt32
|
||||
}
|
||||
}
|
||||
return func(v *master_pb.VolumeInformationMessage) bool {
|
||||
matched, err := filepath.Match(collectionPattern, v.Collection)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return matched
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue