diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 98fb6aab1..03a3f74d1 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -11,7 +11,6 @@ import ( "regexp" "strings" "sync" - "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/cluster" @@ -33,9 +32,10 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" - RaftServerRemovalTime = 72 * time.Minute + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Minute + ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime" ) type MasterOption struct { @@ -66,8 +66,8 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCn chan string - onPeerUpdateGoroutineCount int32 + onPeerUpdateDoneCns map[string]*chan string + onPeerUpdateLock sync.RWMutex // notifying clients clientChansLock sync.RWMutex @@ -119,9 +119,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) - ms.onPeerUpdateDoneCn = make(chan string) ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate + ms.onPeerUpdateDoneCns = make(map[string]*chan string) seq := ms.createSequencer(option) if nil == seq { @@ -367,16 +367,31 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } } - if atomic.LoadInt32(&ms.onPeerUpdateGoroutineCount) > 0 { - ms.onPeerUpdateDoneCn <- peerName + ms.onPeerUpdateLock.RLock() + if len(ms.onPeerUpdateDoneCns) > 0 { + for _, onPeerUpdateDoneCn := range ms.onPeerUpdateDoneCns { + *onPeerUpdateDoneCn <- peerName + } } + ms.onPeerUpdateLock.RUnlock() } else if isLeader { + if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok { + *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg + return + } + onPeerUpdateDoneCn := make(chan string) + ms.onPeerUpdateLock.Lock() + ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn + ms.onPeerUpdateLock.Unlock() + go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerPingTicker := time.NewTicker(5 * time.Minute) - atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, 1) defer func() { - atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, -1) + ms.onPeerUpdateLock.Lock() + delete(ms.onPeerUpdateDoneCns, peerName) + ms.onPeerUpdateLock.Unlock() + close(onPeerUpdateDoneCn) }() for { select { @@ -408,11 +423,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } glog.V(0).Infof("old raft server %s removed", peerName) return - case peerDone := <-ms.onPeerUpdateDoneCn: + case peerDone := <-onPeerUpdateDoneCn: if peerName == peerDone { glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName) return } + if peerDone == ResetRaftServerRemovalTimeMsg { + raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime) + glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete", + RaftServerRemovalTime, peerName) + } } } }(peerName)