seaweedfs/weed/shell/command_volume_server_evacuate.go

266 lines
9 KiB
Go
Raw Normal View History

package shell
import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"golang.org/x/exp/slices"
"io"
"os"
)
func init() {
Commands = append(Commands, &commandVolumeServerEvacuate{})
}
type commandVolumeServerEvacuate struct {
2022-07-12 08:47:21 +00:00
topologyInfo *master_pb.TopologyInfo
targetServer string
2022-07-12 06:33:08 +00:00
volumeRack string
}
func (c *commandVolumeServerEvacuate) Name() string {
return "volumeServer.evacuate"
}
func (c *commandVolumeServerEvacuate) Help() string {
return `move out all data on a volume server
volumeServer.evacuate -node <host:port>
This command moves all data away from the volume server.
The volumes on the volume servers will be redistributed.
Usually this is used to prepare to shutdown or upgrade the volume server.
Sometimes a volume can not be moved because there are no
good destination to meet the replication requirement.
E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
You can use "-skipNonMoveable" to move the rest volumes.
`
}
func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
2022-07-12 06:56:58 +00:00
volumeRack := vsEvacuateCommand.String("rack", "", "source rack for the volume servers")
targetServer := vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
if err = vsEvacuateCommand.Parse(args); err != nil {
return nil
}
2022-05-31 21:48:46 +00:00
infoAboutSimulationMode(writer, *applyChange, "-force")
2022-07-12 08:47:21 +00:00
if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
return
}
2022-07-12 08:47:21 +00:00
if *volumeServer == "" && *volumeRack == "" {
return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack")
}
if *targetServer != "" {
c.targetServer = *targetServer
}
2022-07-12 06:33:08 +00:00
if *volumeRack != "" {
c.volumeRack = *volumeRack
}
2021-07-21 21:38:12 +00:00
for i := 0; i < *retryCount+1; i++ {
if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
return nil
}
}
return
}
func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
// 1. confirm the volume server is part of the cluster
// 2. collect all other volume servers, sort by empty slots
// 3. move to any other volume server as long as it satisfy the replication requirements
// list all the volumes
2021-02-22 08:28:42 +00:00
// collect topology information
2022-07-12 08:47:21 +00:00
c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
2022-07-12 08:47:21 +00:00
if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
2022-07-12 08:47:21 +00:00
if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
return nil
}
2022-07-12 08:47:21 +00:00
func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this volume server
2022-07-12 08:47:21 +00:00
volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
2022-07-18 11:27:02 +00:00
thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
2022-07-12 06:33:08 +00:00
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster", volumeServer)
}
// move away normal volumes
2022-07-12 06:33:08 +00:00
for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos {
2022-07-18 11:27:02 +00:00
if applyChange {
if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil {
fmt.Fprintf(writer, "update topologyInfo %v", err)
} else {
_, otherNodesNew := c.nodesOtherThan(
collectVolumeServersByDc(topologyInfo, ""), volumeServer)
if len(otherNodesNew) > 0 {
otherNodes = otherNodesNew
c.topologyInfo = topologyInfo
fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes))
2022-07-18 11:27:02 +00:00
}
}
}
2022-07-12 08:47:21 +00:00
volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
2022-07-12 06:33:08 +00:00
for _, vol := range diskInfo.VolumeInfos {
2022-07-18 11:27:02 +00:00
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
2022-07-12 06:33:08 +00:00
if err != nil {
2022-08-23 08:20:45 +00:00
fmt.Fprintf(writer, "move away volume %d from %s: %v\n", vol.Id, volumeServer, err)
2022-07-12 06:33:08 +00:00
}
if !hasMoved {
if skipNonMoveable {
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
} else {
return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
}
2021-02-16 10:47:02 +00:00
}
}
}
}
return nil
}
2022-07-12 08:47:21 +00:00
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
2022-07-12 08:47:21 +00:00
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
2022-07-12 06:33:08 +00:00
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
}
// move away ec volumes
2022-07-12 06:33:08 +00:00
for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
if err != nil {
fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
}
if !hasMoved {
if skipNonMoveable {
fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
} else {
return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
}
2021-02-16 10:47:02 +00:00
}
}
}
}
return nil
}
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
slices.SortFunc(otherNodes, func(a, b *EcNode) bool {
return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id)
})
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
collectionPrefix := ""
if ecShardInfo.Collection != "" {
collectionPrefix = ecShardInfo.Collection + "_"
}
fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
if err != nil {
return
} else {
hasMoved = true
break
}
}
if !hasMoved {
return
}
}
return
}
2022-07-15 08:51:08 +00:00
func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
2021-02-16 18:47:32 +00:00
fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
for _, n := range otherNodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
return v.DiskType == vol.DiskType
})
}
slices.SortFunc(otherNodes, func(a, b *Node) bool {
return a.localVolumeRatio(fn) < b.localVolumeRatio(fn)
})
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
if err != nil {
return
}
if hasMoved {
break
}
}
return
}
2022-07-12 06:33:08 +00:00
func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) {
for _, node := range volumeServers {
2022-07-12 06:33:08 +00:00
if node.info.Id == thisServer || (c.volumeRack != "" && node.rack == c.volumeRack) {
thisNodes = append(thisNodes, node)
continue
}
if c.volumeRack != "" && c.volumeRack == node.rack {
continue
}
if c.targetServer != "" && c.targetServer != node.info.Id {
continue
}
otherNodes = append(otherNodes, node)
}
return
}
2022-07-12 06:33:08 +00:00
func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) {
for _, node := range volumeServers {
2022-07-12 06:33:08 +00:00
if node.info.Id == thisServer || (c.volumeRack != "" && string(node.rack) == c.volumeRack) {
thisNodes = append(thisNodes, node)
continue
}
if c.volumeRack != "" && c.volumeRack == string(node.rack) {
continue
}
if c.targetServer != "" && c.targetServer != node.info.Id {
continue
}
otherNodes = append(otherNodes, node)
}
return
}