mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
shell: stop long running jobs if lock is lost
This commit is contained in:
parent
601ba5fb68
commit
676e27c589
|
@ -18,6 +18,10 @@ import (
|
||||||
|
|
||||||
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
|
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
|
||||||
|
|
||||||
|
if !commandEnv.isLocked() {
|
||||||
|
return fmt.Errorf("lock is lost")
|
||||||
|
}
|
||||||
|
|
||||||
copiedShardIds := []uint32{uint32(shardId)}
|
copiedShardIds := []uint32{uint32(shardId)}
|
||||||
|
|
||||||
if applyBalancing {
|
if applyBalancing {
|
||||||
|
|
|
@ -89,6 +89,11 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||||
}
|
}
|
||||||
|
|
||||||
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
|
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
|
||||||
|
|
||||||
|
if !commandEnv.isLocked() {
|
||||||
|
return fmt.Errorf("lock is lost")
|
||||||
|
}
|
||||||
|
|
||||||
// find volume location
|
// find volume location
|
||||||
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
|
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
|
||||||
|
|
||||||
|
|
|
@ -93,6 +93,10 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||||
}
|
}
|
||||||
|
|
||||||
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
|
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
|
||||||
|
if !commandEnv.isLocked() {
|
||||||
|
return fmt.Errorf("lock is lost")
|
||||||
|
}
|
||||||
|
|
||||||
// find volume location
|
// find volume location
|
||||||
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
|
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
|
||||||
if !found && len(locations) > 0 {
|
if !found && len(locations) > 0 {
|
||||||
|
|
|
@ -131,6 +131,10 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s
|
||||||
|
|
||||||
func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
|
func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
|
||||||
|
|
||||||
|
if !commandEnv.isLocked() {
|
||||||
|
return fmt.Errorf("lock is lost")
|
||||||
|
}
|
||||||
|
|
||||||
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
|
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
|
||||||
|
|
||||||
// collect shard files to rebuilder local disk
|
// collect shard files to rebuilder local disk
|
||||||
|
|
|
@ -306,6 +306,10 @@ func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]
|
||||||
|
|
||||||
func maybeMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolume *master_pb.VolumeInformationMessage, emptyNode *Node, applyChange bool) (hasMoved bool, err error) {
|
func maybeMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolume *master_pb.VolumeInformationMessage, emptyNode *Node, applyChange bool) (hasMoved bool, err error) {
|
||||||
|
|
||||||
|
if !commandEnv.isLocked() {
|
||||||
|
return false, fmt.Errorf("lock is lost")
|
||||||
|
}
|
||||||
|
|
||||||
if candidateVolume.ReplicaPlacement > 0 {
|
if candidateVolume.ReplicaPlacement > 0 {
|
||||||
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(candidateVolume.ReplicaPlacement))
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(candidateVolume.ReplicaPlacement))
|
||||||
if !isGoodMove(replicaPlacement, volumeReplicas[candidateVolume.Id], fullNode, emptyNode) {
|
if !isGoodMove(replicaPlacement, volumeReplicas[candidateVolume.Id], fullNode, emptyNode) {
|
||||||
|
|
|
@ -104,6 +104,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !commandEnv.isLocked() {
|
||||||
|
return fmt.Errorf("lock is lost")
|
||||||
|
}
|
||||||
|
|
||||||
if len(overReplicatedVolumeIds) > 0 {
|
if len(overReplicatedVolumeIds) > 0 {
|
||||||
if err := c.deleteOneVolume(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil {
|
if err := c.deleteOneVolume(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -222,6 +222,10 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
|
||||||
|
|
||||||
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64) (err error) {
|
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64) (err error) {
|
||||||
|
|
||||||
|
if !commandEnv.isLocked() {
|
||||||
|
return fmt.Errorf("lock is lost")
|
||||||
|
}
|
||||||
|
|
||||||
// mark all replicas as read only
|
// mark all replicas as read only
|
||||||
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
|
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
|
||||||
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
|
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
|
||||||
|
|
Loading…
Reference in a new issue