seaweedfs/weed/shell/command_volume_configure_replication.go
2021-12-10 13:24:38 -08:00

127 lines
3.6 KiB
Go

package shell
import (
"context"
"errors"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func init() {
Commands = append(Commands, &commandVolumeConfigureReplication{})
}
type commandVolumeConfigureReplication struct {
}
func (c *commandVolumeConfigureReplication) Name() string {
return "volume.configure.replication"
}
func (c *commandVolumeConfigureReplication) Help() string {
return `change volume replication value
This command changes a volume replication value. It should be followed by "volume.fix.replication".
`
}
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) {
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
collectionPattern := configureReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
if err = configureReplicationCommand.Parse(args); err != nil {
return nil
}
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
if *replicationString == "" {
return fmt.Errorf("empty replication value")
}
replicaPlacement, err := super_block.NewReplicaPlacementFromString(*replicationString)
if err != nil {
return fmt.Errorf("replication format: %v", err)
}
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
vid := needle.VolumeId(*volumeIdInt)
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern)
// find all data nodes with volumes that needs replication change
var allLocations []location
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
if volumeFilter(v) {
allLocations = append(allLocations, loc)
continue
}
}
}
})
if len(allLocations) == 0 {
return fmt.Errorf("no volume needs change")
}
for _, dst := range allLocations {
err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: replicaPlacement.String(),
})
if configureErr != nil {
return configureErr
}
if resp.Error != "" {
return errors.New(resp.Error)
}
return nil
})
if err != nil {
return err
}
}
return nil
}
func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32, collectionPattern string) func(message *master_pb.VolumeInformationMessage) bool {
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
if volumeId > 0 {
return func(v *master_pb.VolumeInformationMessage) bool {
return v.Id == volumeId && v.ReplicaPlacement != replicaPlacementInt32
}
}
return func(v *master_pb.VolumeInformationMessage) bool {
matched, err := filepath.Match(collectionPattern, v.Collection)
if err != nil {
return false
}
return matched
}
}