From 6cfbfb084941e213f38a83a763fd359cc6611108 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 00:04:12 +0500 Subject: [PATCH 01/20] check for ping before deleting raft server https://github.com/chrislusf/seaweedfs/issues/3083 --- weed/server/master_server.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 0fdc3944f..0e95fa91f 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -372,8 +372,26 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } else if isLeader { go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) + raftServerPingTicker := time.NewTicker(5 * time.Minute) + defer func() { + ms.onPeerUpdateDoneCnExist = false + }() for { select { + case <-raftServerPingTicker.C: + err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.Ping(context.Background(), &master_pb.PingRequest{ + Target: peerName, + TargetType: cluster.MasterType, + }) + return err + }) + if err != nil { + glog.Warningf("raft server %s ping failed %+v", peerName, err) + } else { + glog.V(0).Infof("raft server %s remove canceled on ping success", peerName) + return + } case <-raftServerRemovalTimeAfter: err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ @@ -384,12 +402,13 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF }) if err != nil { glog.Warningf("failed to removing old raft server %s: %v", peerName, err) + return } glog.V(0).Infof("old raft server %s removed", peerName) return case peerDone := <-ms.onPeerUpdateDoneCn: if peerName == peerDone { - glog.V(0).Infof("raft server %s remove canceled", peerName) + glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName) return } } From f6a966b4fc43addad8b2ebe01970f15957b8cc02 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 00:31:57 +0500 Subject: [PATCH 02/20] add waiting log message --- weed/server/master_server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 0e95fa91f..e75c4df54 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -414,6 +414,7 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } } }(peerName) + glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) ms.onPeerUpdateDoneCnExist = true } } From 6c390851e7bb8e25080ebf81ea0e1064ee9018af Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 18:08:12 +0500 Subject: [PATCH 03/20] fix design --- weed/server/master_server.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index e75c4df54..7f9bff389 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -11,6 +11,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/cluster" @@ -65,8 +66,8 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCn chan string - onPeerUpdateDoneCnExist bool + onPeerUpdateDoneCn chan string + onPeerUpdateGoroutineCount uint32 // notifying clients clientChansLock sync.RWMutex @@ -366,15 +367,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } } - if ms.onPeerUpdateDoneCnExist { + if atomic.LoadUint32(&ms.onPeerUpdateGoroutineCount) > 0 { ms.onPeerUpdateDoneCn <- peerName } } else if isLeader { go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerPingTicker := time.NewTicker(5 * time.Minute) + atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, 1) defer func() { - ms.onPeerUpdateDoneCnExist = false + atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, -1) }() for { select { @@ -415,6 +417,5 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } }(peerName) glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) - ms.onPeerUpdateDoneCnExist = true } } From b6471ecd754dbcd26c7ccc767be16c8acc6337b9 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 00:45:13 +0500 Subject: [PATCH 04/20] err msg with duplicated local subscription detected move to log level 1 https://github.com/chrislusf/seaweedfs/issues/3320 --- weed/filer/meta_aggregator.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index c672ce342..5799e247e 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" + "strings" "sync" "time" @@ -99,7 +100,11 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres return } if err != nil { - glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) + errLvl := glog.Level(0) + if strings.Contains(err.Error(), "duplicated local subscription detected") { + errLvl = glog.Level(1) + } + glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err) } if lastTsNs < nextLastTsNs { lastTsNs = nextLastTsNs From ba0e3ce5fa4290be5d619345a22e8bbd61a757ee Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 11 Jul 2022 16:58:15 +0500 Subject: [PATCH 05/20] volume server evacuate to target server --- weed/shell/command_volume_server_evacuate.go | 29 ++++++++++++++------ 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index ffbee0302..f2c24a8b4 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -47,7 +47,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeServer := vsEvacuateCommand.String("node", "", ": of the volume server") - c.targetServer = *vsEvacuateCommand.String("target", "", ": of target volume") + targetServer := vsEvacuateCommand.String("target", "", ": of target volume") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry") @@ -63,6 +63,9 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, if *volumeServer == "" { return fmt.Errorf("need to specify volume server by -node=:") } + if *targetServer != "" { + c.targetServer = *targetServer + } for i := 0; i < *retryCount+1; i++ { if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil { return nil @@ -103,14 +106,27 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE if thisNode == nil { return fmt.Errorf("%s is not found in this cluster", volumeServer) } + if c.targetServer != "" { + targetServerFound := false + for _, otherNode := range otherNodes { + if otherNode.info.Id == c.targetServer { + otherNodes = []*Node{otherNode} + targetServerFound = true + break + } + } + if !targetServerFound { + return fmt.Errorf("target %s is not found in this cluster", c.targetServer) + } + } // move away normal volumes volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) for _, diskInfo := range thisNode.info.DiskInfos { for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { - return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err) + fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } if !hasMoved { if skipNonMoveable { @@ -138,7 +154,7 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, for _, ecShardInfo := range diskInfo.EcShardInfos { hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) if err != nil { - return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) + fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) } if !hasMoved { if skipNonMoveable { @@ -160,9 +176,6 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv }) for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] - if c.targetServer != "" && c.targetServer != emptyNode.info.Id { - continue - } collectionPrefix := "" if ecShardInfo.Collection != "" { collectionPrefix = ecShardInfo.Collection + "_" @@ -184,7 +197,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv return } -func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { +func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) for _, n := range otherNodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { From 6f764e101454ba85e50fcdfab056fcb5d8c66584 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 12 Jul 2022 11:33:08 +0500 Subject: [PATCH 06/20] volume server evacuate from rack --- weed/shell/command_volume_server_evacuate.go | 103 ++++++++++--------- 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index f2c24a8b4..37fb29b14 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -19,6 +19,7 @@ func init() { type commandVolumeServerEvacuate struct { targetServer string + volumeRack string } func (c *commandVolumeServerEvacuate) Name() string { @@ -47,6 +48,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeServer := vsEvacuateCommand.String("node", "", ": of the volume server") + volumeRack := vsEvacuateCommand.String("rack", "", "rack for then volume servers") targetServer := vsEvacuateCommand.String("target", "", ": of target volume") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") @@ -66,6 +68,9 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, if *targetServer != "" { c.targetServer = *targetServer } + if *volumeRack != "" { + c.volumeRack = *volumeRack + } for i := 0; i < *retryCount+1; i++ { if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil { return nil @@ -102,41 +107,31 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server volumeServers := collectVolumeServersByDc(topologyInfo, "") - thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer) - if thisNode == nil { + thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) + if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) } - if c.targetServer != "" { - targetServerFound := false - for _, otherNode := range otherNodes { - if otherNode.info.Id == c.targetServer { - otherNodes = []*Node{otherNode} - targetServerFound = true - break - } - } - if !targetServerFound { - return fmt.Errorf("target %s is not found in this cluster", c.targetServer) - } - } // move away normal volumes - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) - for _, diskInfo := range thisNode.info.DiskInfos { - for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) - if err != nil { - fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) - } - if !hasMoved { - if skipNonMoveable { - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) - fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) - } else { - return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + for _, thisNode := range thisNodes { + for _, diskInfo := range thisNode.info.DiskInfos { + volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + for _, vol := range diskInfo.VolumeInfos { + hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + if err != nil { + fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) + fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) + } else { + return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + } } } } + } return nil } @@ -144,23 +139,25 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "") - thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer) - if thisNode == nil { + thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) + if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster\n", volumeServer) } // move away ec volumes - for _, diskInfo := range thisNode.info.DiskInfos { - for _, ecShardInfo := range diskInfo.EcShardInfos { - hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) - if err != nil { - fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) - } - if !hasMoved { - if skipNonMoveable { - fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) - } else { - return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + for _, thisNode := range thisNodes { + for _, diskInfo := range thisNode.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) + if err != nil { + fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) + } else { + return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + } } } } @@ -220,10 +217,16 @@ func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *Comman return } -func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) { +func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) { for _, node := range volumeServers { - if node.info.Id == thisServer { - thisNode = node + if node.info.Id == thisServer || (c.volumeRack != "" && node.rack == c.volumeRack) { + thisNodes = append(thisNodes, node) + continue + } + if c.volumeRack != "" && c.volumeRack == node.rack { + continue + } + if c.targetServer != "" && c.targetServer != node.info.Id { continue } otherNodes = append(otherNodes, node) @@ -231,10 +234,16 @@ func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, o return } -func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) { +func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) { for _, node := range volumeServers { - if node.info.Id == thisServer { - thisNode = node + if node.info.Id == thisServer || (c.volumeRack != "" && string(node.rack) == c.volumeRack) { + thisNodes = append(thisNodes, node) + continue + } + if c.volumeRack != "" && c.volumeRack == string(node.rack) { + continue + } + if c.targetServer != "" && c.targetServer != node.info.Id { continue } otherNodes = append(otherNodes, node) From 867269cdcf6ba79d80886ee769c44742f68092b6 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 12 Jul 2022 11:56:58 +0500 Subject: [PATCH 07/20] help rack --- weed/shell/command_volume_server_evacuate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 37fb29b14..dad8d8626 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -48,7 +48,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeServer := vsEvacuateCommand.String("node", "", ": of the volume server") - volumeRack := vsEvacuateCommand.String("rack", "", "rack for then volume servers") + volumeRack := vsEvacuateCommand.String("rack", "", "source rack for the volume servers") targetServer := vsEvacuateCommand.String("target", "", ": of target volume") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") From b5e5f6f55ad6d8fab838f5735867d1457c5aa420 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 12 Jul 2022 13:47:21 +0500 Subject: [PATCH 08/20] update topologyInfo --- weed/shell/command_volume_server_evacuate.go | 32 +++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index dad8d8626..195cc2699 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -11,6 +11,7 @@ import ( "golang.org/x/exp/slices" "io" "os" + "time" ) func init() { @@ -18,6 +19,7 @@ func init() { } type commandVolumeServerEvacuate struct { + topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string } @@ -58,12 +60,12 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, } infoAboutSimulationMode(writer, *applyChange, "-force") - if err = commandEnv.confirmIsLocked(args); err != nil { + if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange { return } - if *volumeServer == "" { - return fmt.Errorf("need to specify volume server by -node=:") + if *volumeServer == "" && *volumeRack == "" { + return fmt.Errorf("need to specify volume server by -node=: or source rack") } if *targetServer != "" { c.targetServer = *targetServer @@ -88,25 +90,33 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn // list all the volumes // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) + c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0) if err != nil { return err } - if err := c.evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + go func() { + for { + if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil { + c.topologyInfo = topologyInfo + } + } + }() + + if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } - if err := c.evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } return nil } -func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { +func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server - volumeServers := collectVolumeServersByDc(topologyInfo, "") + volumeServers := collectVolumeServersByDc(c.topologyInfo, "") thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) @@ -115,7 +125,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE // move away normal volumes for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { @@ -136,9 +146,9 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE return nil } -func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { +func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server - ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "") + ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "") thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster\n", volumeServer) From 39eaf426f828ff84559c591b755e9c365516487f Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 12 Jul 2022 14:56:34 +0500 Subject: [PATCH 09/20] fix TestVolumeServerEvacuate --- weed/shell/command_volume_server_evacuate_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate_test.go b/weed/shell/command_volume_server_evacuate_test.go index 2cdb94a60..4563f38ba 100644 --- a/weed/shell/command_volume_server_evacuate_test.go +++ b/weed/shell/command_volume_server_evacuate_test.go @@ -6,12 +6,11 @@ import ( ) func TestVolumeServerEvacuate(t *testing.T) { - topologyInfo := parseOutput(topoData) + c := commandVolumeServerEvacuate{} + c.topologyInfo = parseOutput(topoData) volumeServer := "192.168.1.4:8080" - - c := commandVolumeServerEvacuate{} - if err := c.evacuateNormalVolumes(nil, topologyInfo, volumeServer, true, false, os.Stdout); err != nil { + if err := c.evacuateNormalVolumes(nil, volumeServer, true, false, os.Stdout); err != nil { t.Errorf("evacuate: %v", err) } From 884ffbafeedf380c5d1f81208577eddae0bc6311 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 15 Jul 2022 13:51:08 +0500 Subject: [PATCH 10/20] clouse background update --- weed/shell/command_volume_server_evacuate.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 195cc2699..3b0c8381b 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -14,6 +14,8 @@ import ( "time" ) +const topologyInfoUpdateInterval = 5 * time.Minute + func init() { Commands = append(Commands, &commandVolumeServerEvacuate{}) } @@ -95,13 +97,22 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn return err } + stopchan := make(chan struct{}) go func() { for { - if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil { - c.topologyInfo = topologyInfo + select { + default: + if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { + c.topologyInfo = topologyInfo + } else { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } + case <-stopchan: + return } } }() + defer close(stopchan) if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err @@ -127,7 +138,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE for _, diskInfo := range thisNode.info.DiskInfos { volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } @@ -204,7 +215,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv return } -func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { +func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) for _, n := range otherNodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { From 72dca31cfa64b6a60633578f8ba924b645db74ba Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 01:46:31 +0500 Subject: [PATCH 11/20] fix update topologyInfo --- weed/shell/command_volume_server_evacuate.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 3b0c8381b..0595ef308 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -103,9 +103,9 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn select { default: if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { - c.topologyInfo = topologyInfo - } else { fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + c.topologyInfo = topologyInfo } case <-stopchan: return From fa88dff7ce9ffad1974698aaaca0fad762ffc4b5 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 11:32:28 +0500 Subject: [PATCH 12/20] update otherNodes --- weed/shell/command_volume_server_evacuate.go | 40 +++++++++++--------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 0595ef308..7d50b7f81 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -24,6 +24,7 @@ type commandVolumeServerEvacuate struct { topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string + otherNodes []*Node } func (c *commandVolumeServerEvacuate) Name() string { @@ -97,22 +98,27 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn return err } - stopchan := make(chan struct{}) - go func() { - for { - select { - default: - if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { - fmt.Fprintf(writer, "update topologyInfo %v", err) - } else { - c.topologyInfo = topologyInfo + if applyChange { + stopchan := make(chan struct{}) + go func() { + for { + select { + default: + if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + c.topologyInfo = topologyInfo + _, c.otherNodes = c.nodesOtherThan( + collectVolumeServersByDc(c.topologyInfo, ""), volumeServer) + fmt.Fprintf(writer, "topologyInfo updated %v\n", len(c.otherNodes)) + } + case <-stopchan: + return } - case <-stopchan: - return } - } - }() - defer close(stopchan) + }() + defer close(stopchan) + } if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err @@ -128,7 +134,8 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server volumeServers := collectVolumeServersByDc(c.topologyInfo, "") - thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) + var thisNodes []*Node + thisNodes, c.otherNodes = c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) } @@ -138,7 +145,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE for _, diskInfo := range thisNode.info.DiskInfos { volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, c.otherNodes, applyChange) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } @@ -152,7 +159,6 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE } } } - } return nil } From de4fcc0e2c3578cae784ba4bbcccd8b7d16ffc80 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 16:27:02 +0500 Subject: [PATCH 13/20] sync update topologyInfo --- weed/shell/command_volume_server_evacuate.go | 45 +++++++++----------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 7d50b7f81..d1c474a76 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -24,7 +24,6 @@ type commandVolumeServerEvacuate struct { topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string - otherNodes []*Node } func (c *commandVolumeServerEvacuate) Name() string { @@ -98,28 +97,6 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn return err } - if applyChange { - stopchan := make(chan struct{}) - go func() { - for { - select { - default: - if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { - fmt.Fprintf(writer, "update topologyInfo %v", err) - } else { - c.topologyInfo = topologyInfo - _, c.otherNodes = c.nodesOtherThan( - collectVolumeServersByDc(c.topologyInfo, ""), volumeServer) - fmt.Fprintf(writer, "topologyInfo updated %v\n", len(c.otherNodes)) - } - case <-stopchan: - return - } - } - }() - defer close(stopchan) - } - if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } @@ -134,18 +111,34 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server volumeServers := collectVolumeServersByDc(c.topologyInfo, "") - var thisNodes []*Node - thisNodes, c.otherNodes = c.nodesOtherThan(volumeServers, volumeServer) + thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) } // move away normal volumes + ticker := time.NewTicker(topologyInfoUpdateInterval) for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { + if applyChange { + select { + case <-ticker.C: + if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + _, otherNodesNew := c.nodesOtherThan( + collectVolumeServersByDc(topologyInfo, ""), volumeServer) + if len(otherNodesNew) > 0 { + otherNodes = otherNodesNew + c.topologyInfo = topologyInfo + fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes)) + } + } + } + } volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, c.otherNodes, applyChange) + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } From e2d991d8d0529dbd221ea686ca73b9c87234eedb Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 16:38:19 +0500 Subject: [PATCH 14/20] ticker.Stop --- weed/shell/command_volume_server_evacuate.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index d1c474a76..c9df2c79a 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -118,6 +118,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE // move away normal volumes ticker := time.NewTicker(topologyInfoUpdateInterval) + defer ticker.Stop() for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { if applyChange { From c5189c343baa0d7b36349bbca669f8bba12d491b Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 00:54:23 +0500 Subject: [PATCH 15/20] remove ticker update the topology before each file --- weed/shell/command_volume_server_evacuate.go | 26 +++++++------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index c9df2c79a..f72d73230 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -11,11 +11,8 @@ import ( "golang.org/x/exp/slices" "io" "os" - "time" ) -const topologyInfoUpdateInterval = 5 * time.Minute - func init() { Commands = append(Commands, &commandVolumeServerEvacuate{}) } @@ -117,23 +114,18 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE } // move away normal volumes - ticker := time.NewTicker(topologyInfoUpdateInterval) - defer ticker.Stop() for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { if applyChange { - select { - case <-ticker.C: - if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil { - fmt.Fprintf(writer, "update topologyInfo %v", err) - } else { - _, otherNodesNew := c.nodesOtherThan( - collectVolumeServersByDc(topologyInfo, ""), volumeServer) - if len(otherNodesNew) > 0 { - otherNodes = otherNodesNew - c.topologyInfo = topologyInfo - fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes)) - } + if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + _, otherNodesNew := c.nodesOtherThan( + collectVolumeServersByDc(topologyInfo, ""), volumeServer) + if len(otherNodesNew) > 0 { + otherNodes = otherNodesNew + c.topologyInfo = topologyInfo + fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes)) } } } From 7875470e74022e2b4262e51a349c4e3c15459a33 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 18:40:35 +0500 Subject: [PATCH 16/20] onPeerUpdateGoroutineCount use int32 --- weed/server/master_server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 7f9bff389..98fb6aab1 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -67,7 +67,7 @@ type MasterServer struct { boundedLeaderChan chan int onPeerUpdateDoneCn chan string - onPeerUpdateGoroutineCount uint32 + onPeerUpdateGoroutineCount int32 // notifying clients clientChansLock sync.RWMutex @@ -367,16 +367,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } } - if atomic.LoadUint32(&ms.onPeerUpdateGoroutineCount) > 0 { + if atomic.LoadInt32(&ms.onPeerUpdateGoroutineCount) > 0 { ms.onPeerUpdateDoneCn <- peerName } } else if isLeader { go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerPingTicker := time.NewTicker(5 * time.Minute) - atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, 1) + atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, 1) defer func() { - atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, -1) + atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, -1) }() for { select { From 93ca87b7cbf53e6b17a8a6d3b5dec32af94cd00c Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 21 Jul 2022 15:51:14 +0500 Subject: [PATCH 17/20] use safe onPeerUpdateDoneCns --- weed/server/master_server.go | 44 ++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 98fb6aab1..03a3f74d1 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -11,7 +11,6 @@ import ( "regexp" "strings" "sync" - "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/cluster" @@ -33,9 +32,10 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" - RaftServerRemovalTime = 72 * time.Minute + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Minute + ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime" ) type MasterOption struct { @@ -66,8 +66,8 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCn chan string - onPeerUpdateGoroutineCount int32 + onPeerUpdateDoneCns map[string]*chan string + onPeerUpdateLock sync.RWMutex // notifying clients clientChansLock sync.RWMutex @@ -119,9 +119,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) - ms.onPeerUpdateDoneCn = make(chan string) ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate + ms.onPeerUpdateDoneCns = make(map[string]*chan string) seq := ms.createSequencer(option) if nil == seq { @@ -367,16 +367,31 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } } - if atomic.LoadInt32(&ms.onPeerUpdateGoroutineCount) > 0 { - ms.onPeerUpdateDoneCn <- peerName + ms.onPeerUpdateLock.RLock() + if len(ms.onPeerUpdateDoneCns) > 0 { + for _, onPeerUpdateDoneCn := range ms.onPeerUpdateDoneCns { + *onPeerUpdateDoneCn <- peerName + } } + ms.onPeerUpdateLock.RUnlock() } else if isLeader { + if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok { + *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg + return + } + onPeerUpdateDoneCn := make(chan string) + ms.onPeerUpdateLock.Lock() + ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn + ms.onPeerUpdateLock.Unlock() + go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerPingTicker := time.NewTicker(5 * time.Minute) - atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, 1) defer func() { - atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, -1) + ms.onPeerUpdateLock.Lock() + delete(ms.onPeerUpdateDoneCns, peerName) + ms.onPeerUpdateLock.Unlock() + close(onPeerUpdateDoneCn) }() for { select { @@ -408,11 +423,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } glog.V(0).Infof("old raft server %s removed", peerName) return - case peerDone := <-ms.onPeerUpdateDoneCn: + case peerDone := <-onPeerUpdateDoneCn: if peerName == peerDone { glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName) return } + if peerDone == ResetRaftServerRemovalTimeMsg { + raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime) + glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete", + RaftServerRemovalTime, peerName) + } } } }(peerName) From 3c42814b5885b0ae10631c6a8a66b858c7923380 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 21 Jul 2022 17:15:10 +0500 Subject: [PATCH 18/20] avoid deadlock --- weed/server/master_server.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 03a3f74d1..6c0b6652b 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -368,8 +368,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } } ms.onPeerUpdateLock.RLock() - if len(ms.onPeerUpdateDoneCns) > 0 { - for _, onPeerUpdateDoneCn := range ms.onPeerUpdateDoneCns { + isGtZero := len(ms.onPeerUpdateDoneCns) > 0 + ms.onPeerUpdateLock.RUnlock() + if isGtZero { + var chanPtrs []*chan string + ms.onPeerUpdateLock.RLock() + for _, cn := range ms.onPeerUpdateDoneCns { + chanPtrs = append(chanPtrs, cn) + } + ms.onPeerUpdateLock.RUnlock() + for _, onPeerUpdateDoneCn := range chanPtrs { *onPeerUpdateDoneCn <- peerName } } From c88ea31f62315bca22cb0457dca92376b032f8b2 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 26 Jul 2022 12:57:07 +0500 Subject: [PATCH 19/20] fix RUnlock of unlocked RWMutex --- weed/server/master_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 6c0b6652b..d2e98c2a2 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -381,7 +381,6 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF *onPeerUpdateDoneCn <- peerName } } - ms.onPeerUpdateLock.RUnlock() } else if isLeader { if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok { *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg From a98f6d66a38b8dd7231191361a4333b46182a407 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 1 Aug 2022 12:51:41 +0500 Subject: [PATCH 20/20] rollback over onPeerupdate implementation of automatic clean-up of failed servers in favor of synchronous ping --- weed/filer/meta_aggregator.go | 2 +- weed/server/master_server.go | 110 ++++------------------------------ 2 files changed, 13 insertions(+), 99 deletions(-) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 5799e247e..c78dcac95 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -102,7 +102,7 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres if err != nil { errLvl := glog.Level(0) if strings.Contains(err.Error(), "duplicated local subscription detected") { - errLvl = glog.Level(1) + errLvl = glog.Level(4) } glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err) } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index d2e98c2a2..1c623388c 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/stats" "net/http" @@ -32,10 +31,8 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" - RaftServerRemovalTime = 72 * time.Minute - ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" ) type MasterOption struct { @@ -66,9 +63,6 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCns map[string]*chan string - onPeerUpdateLock sync.RWMutex - // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.KeepConnectedResponse @@ -121,7 +115,6 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se ms.boundedLeaderChan = make(chan int, 16) ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate - ms.onPeerUpdateDoneCns = make(map[string]*chan string) seq := ms.createSequencer(option) if nil == seq { @@ -352,97 +345,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF peerAddress := pb.ServerAddress(update.Address) peerName := string(peerAddress) isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader - if update.IsAdd { - if isLeader { - raftServerFound := false - for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { - if string(server.ID) == peerName { - raftServerFound = true - } - } - if !raftServerFound { - glog.V(0).Infof("adding new raft server: %s", peerName) - ms.Topo.HashicorpRaft.AddVoter( - hashicorpRaft.ServerID(peerName), - hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + if update.IsAdd && isLeader { + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true } } - ms.onPeerUpdateLock.RLock() - isGtZero := len(ms.onPeerUpdateDoneCns) > 0 - ms.onPeerUpdateLock.RUnlock() - if isGtZero { - var chanPtrs []*chan string - ms.onPeerUpdateLock.RLock() - for _, cn := range ms.onPeerUpdateDoneCns { - chanPtrs = append(chanPtrs, cn) - } - ms.onPeerUpdateLock.RUnlock() - for _, onPeerUpdateDoneCn := range chanPtrs { - *onPeerUpdateDoneCn <- peerName - } + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerName) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } - } else if isLeader { - if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok { - *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg - return - } - onPeerUpdateDoneCn := make(chan string) - ms.onPeerUpdateLock.Lock() - ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn - ms.onPeerUpdateLock.Unlock() - - go func(peerName string) { - raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) - raftServerPingTicker := time.NewTicker(5 * time.Minute) - defer func() { - ms.onPeerUpdateLock.Lock() - delete(ms.onPeerUpdateDoneCns, peerName) - ms.onPeerUpdateLock.Unlock() - close(onPeerUpdateDoneCn) - }() - for { - select { - case <-raftServerPingTicker.C: - err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err := client.Ping(context.Background(), &master_pb.PingRequest{ - Target: peerName, - TargetType: cluster.MasterType, - }) - return err - }) - if err != nil { - glog.Warningf("raft server %s ping failed %+v", peerName, err) - } else { - glog.V(0).Infof("raft server %s remove canceled on ping success", peerName) - return - } - case <-raftServerRemovalTimeAfter: - err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ - Id: peerName, - Force: false, - }) - return err - }) - if err != nil { - glog.Warningf("failed to removing old raft server %s: %v", peerName, err) - return - } - glog.V(0).Infof("old raft server %s removed", peerName) - return - case peerDone := <-onPeerUpdateDoneCn: - if peerName == peerDone { - glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName) - return - } - if peerDone == ResetRaftServerRemovalTimeMsg { - raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime) - glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete", - RaftServerRemovalTime, peerName) - } - } - } - }(peerName) - glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) } }