raft update peers via OnPeerUpdate

This commit is contained in:
Konstantin Lebedev 2022-04-06 21:17:04 +05:00
parent 357aa818fe
commit 14a2cc83bf
3 changed files with 68 additions and 5 deletions

View file

@ -81,6 +81,15 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress
},
}
case MasterType:
return []*master_pb.KeepConnectedResponse{
{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsAdd: true,
},
},
}
}
return nil
}
@ -120,6 +129,15 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr
}
}
case MasterType:
return []*master_pb.KeepConnectedResponse{
{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsAdd: false,
},
},
}
}
return nil
}

View file

@ -1,6 +1,7 @@
package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
@ -17,6 +18,7 @@ import (
"github.com/chrislusf/raft"
"github.com/gorilla/mux"
hashicorpRaft "github.com/hashicorp/raft"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -30,8 +32,9 @@ import (
)
const (
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
RaftServerRemovalTime = 72 * time.Hour
)
type MasterOption struct {
@ -61,6 +64,7 @@ type MasterServer struct {
vgCh chan *topology.VolumeGrowRequest
boundedLeaderChan chan int
onPeerUpdatDoneCn chan string
// notifying clients
clientChansLock sync.RWMutex
@ -112,6 +116,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
ms.onPeerUpdatDoneCn = make(chan string)
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
seq := ms.createSequencer(option)
if nil == seq {
@ -323,3 +329,39 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
return seq
}
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
glog.V(0).Infof("OnPeerUpdate: %+v", update)
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
return
}
peerAddress := pb.ServerAddress(update.Address)
if update.IsAdd {
if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
glog.V(0).Infof("adding new raft server: %s", peerAddress.String())
ms.Topo.HashicorpRaft.AddVoter(
hashicorpRaft.ServerID(peerAddress.String()),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
ms.onPeerUpdatDoneCn <- string(peerAddress)
} else if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
go func(peerName string) {
for {
select {
case peerDone := <-ms.onPeerUpdatDoneCn:
if peerName == peerDone {
return
}
case <-time.After(RaftServerRemovalTime):
glog.V(0).Infof("removing old raft server: %s", peerName)
if _, err := ms.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
Id: peerName,
}); err != nil {
glog.Warningf("failed removing old raft server: %v", err)
}
return
}
}
}(string(peerAddress))
}
}

View file

@ -21,8 +21,9 @@ import (
)
const (
ldbFile = "logs.dat"
sdbFile = "stable.dat"
ldbFile = "logs.dat"
sdbFile = "stable.dat"
updatePeersTimeout = 15 * time.Minute
)
func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int {
@ -84,7 +85,9 @@ func (s *RaftServer) UpdatePeers() {
s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
}
}
break
return
case <-time.After(updatePeersTimeout):
return
}
}
}