rollback over onPeerupdate implementation of automatic clean-up of failed servers in favor of synchronous ping

This commit is contained in:
Konstantin Lebedev 2022-08-01 12:51:41 +05:00
parent c88ea31f62
commit a98f6d66a3
2 changed files with 13 additions and 99 deletions

View file

@ -102,7 +102,7 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres
if err != nil { if err != nil {
errLvl := glog.Level(0) errLvl := glog.Level(0)
if strings.Contains(err.Error(), "duplicated local subscription detected") { if strings.Contains(err.Error(), "duplicated local subscription detected") {
errLvl = glog.Level(1) errLvl = glog.Level(4)
} }
glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err) glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
} }

View file

@ -1,7 +1,6 @@
package weed_server package weed_server
import ( import (
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/stats"
"net/http" "net/http"
@ -32,10 +31,8 @@ import (
) )
const ( const (
SequencerType = "master.sequencer.type" SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
RaftServerRemovalTime = 72 * time.Minute
ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime"
) )
type MasterOption struct { type MasterOption struct {
@ -66,9 +63,6 @@ type MasterServer struct {
boundedLeaderChan chan int boundedLeaderChan chan int
onPeerUpdateDoneCns map[string]*chan string
onPeerUpdateLock sync.RWMutex
// notifying clients // notifying clients
clientChansLock sync.RWMutex clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse clientChans map[string]chan *master_pb.KeepConnectedResponse
@ -121,7 +115,6 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
ms.boundedLeaderChan = make(chan int, 16) ms.boundedLeaderChan = make(chan int, 16)
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
ms.onPeerUpdateDoneCns = make(map[string]*chan string)
seq := ms.createSequencer(option) seq := ms.createSequencer(option)
if nil == seq { if nil == seq {
@ -352,97 +345,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
peerAddress := pb.ServerAddress(update.Address) peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress) peerName := string(peerAddress)
isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
if update.IsAdd { if update.IsAdd && isLeader {
if isLeader { raftServerFound := false
raftServerFound := false for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { if string(server.ID) == peerName {
if string(server.ID) == peerName { raftServerFound = true
raftServerFound = true
}
}
if !raftServerFound {
glog.V(0).Infof("adding new raft server: %s", peerName)
ms.Topo.HashicorpRaft.AddVoter(
hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
} }
} }
ms.onPeerUpdateLock.RLock() if !raftServerFound {
isGtZero := len(ms.onPeerUpdateDoneCns) > 0 glog.V(0).Infof("adding new raft server: %s", peerName)
ms.onPeerUpdateLock.RUnlock() ms.Topo.HashicorpRaft.AddVoter(
if isGtZero { hashicorpRaft.ServerID(peerName),
var chanPtrs []*chan string hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
ms.onPeerUpdateLock.RLock()
for _, cn := range ms.onPeerUpdateDoneCns {
chanPtrs = append(chanPtrs, cn)
}
ms.onPeerUpdateLock.RUnlock()
for _, onPeerUpdateDoneCn := range chanPtrs {
*onPeerUpdateDoneCn <- peerName
}
} }
} else if isLeader {
if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok {
*onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg
return
}
onPeerUpdateDoneCn := make(chan string)
ms.onPeerUpdateLock.Lock()
ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn
ms.onPeerUpdateLock.Unlock()
go func(peerName string) {
raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
raftServerPingTicker := time.NewTicker(5 * time.Minute)
defer func() {
ms.onPeerUpdateLock.Lock()
delete(ms.onPeerUpdateDoneCns, peerName)
ms.onPeerUpdateLock.Unlock()
close(onPeerUpdateDoneCn)
}()
for {
select {
case <-raftServerPingTicker.C:
err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.Ping(context.Background(), &master_pb.PingRequest{
Target: peerName,
TargetType: cluster.MasterType,
})
return err
})
if err != nil {
glog.Warningf("raft server %s ping failed %+v", peerName, err)
} else {
glog.V(0).Infof("raft server %s remove canceled on ping success", peerName)
return
}
case <-raftServerRemovalTimeAfter:
err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
Id: peerName,
Force: false,
})
return err
})
if err != nil {
glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
return
}
glog.V(0).Infof("old raft server %s removed", peerName)
return
case peerDone := <-onPeerUpdateDoneCn:
if peerName == peerDone {
glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName)
return
}
if peerDone == ResetRaftServerRemovalTimeMsg {
raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime)
glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete",
RaftServerRemovalTime, peerName)
}
}
}
}(peerName)
glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName)
} }
} }