From 14a2cc83bf35c2c30bb2306cca128ca8a1eb8286 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 6 Apr 2022 21:17:04 +0500 Subject: [PATCH] raft update peers via OnPeerUpdate --- weed/cluster/cluster.go | 18 ++++++++++++++ weed/server/master_server.go | 46 +++++++++++++++++++++++++++++++++-- weed/server/raft_hashicorp.go | 9 ++++--- 3 files changed, 68 insertions(+), 5 deletions(-) diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index e8752f4d9..3cff13724 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -81,6 +81,15 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress }, } case MasterType: + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: true, + }, + }, + } } return nil } @@ -120,6 +129,15 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr } } case MasterType: + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: false, + }, + }, + } } return nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 37e7f245c..2c626b511 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/stats" "net/http" @@ -17,6 +18,7 @@ import ( "github.com/chrislusf/raft" "github.com/gorilla/mux" + hashicorpRaft "github.com/hashicorp/raft" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -30,8 +32,9 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Hour ) type MasterOption struct { @@ -61,6 +64,7 @@ type MasterServer struct { vgCh chan *topology.VolumeGrowRequest boundedLeaderChan chan int + onPeerUpdatDoneCn chan string // notifying clients clientChansLock sync.RWMutex @@ -112,6 +116,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) + ms.onPeerUpdatDoneCn = make(chan string) + ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate seq := ms.createSequencer(option) if nil == seq { @@ -323,3 +329,39 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer } return seq } + +func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { + glog.V(0).Infof("OnPeerUpdate: %+v", update) + if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { + return + } + peerAddress := pb.ServerAddress(update.Address) + if update.IsAdd { + if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { + glog.V(0).Infof("adding new raft server: %s", peerAddress.String()) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerAddress.String()), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + } + ms.onPeerUpdatDoneCn <- string(peerAddress) + } else if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { + go func(peerName string) { + for { + select { + case peerDone := <-ms.onPeerUpdatDoneCn: + if peerName == peerDone { + return + } + case <-time.After(RaftServerRemovalTime): + glog.V(0).Infof("removing old raft server: %s", peerName) + if _, err := ms.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + }); err != nil { + glog.Warningf("failed removing old raft server: %v", err) + } + return + } + } + }(string(peerAddress)) + } +} diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index 3ce3ebcda..885ffdcc7 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -21,8 +21,9 @@ import ( ) const ( - ldbFile = "logs.dat" - sdbFile = "stable.dat" + ldbFile = "logs.dat" + sdbFile = "stable.dat" + updatePeersTimeout = 15 * time.Minute ) func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int { @@ -84,7 +85,9 @@ func (s *RaftServer) UpdatePeers() { s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) } } - break + return + case <-time.After(updatePeersTimeout): + return } } }