diff --git a/weed/command/master.go b/weed/command/master.go index 0f598f2da..3e37f827b 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -44,6 +44,8 @@ type MasterOptions struct { metricsIntervalSec *int raftResumeState *bool metricsHttpPort *int + heartbeatInterval *time.Duration + electionTimeout *time.Duration } func init() { @@ -65,6 +67,8 @@ func init() { m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") + m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") + m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") } var cmdMaster = &Command{ @@ -132,8 +136,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("Master startup error: %v", e) } // start raftServer - raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), - peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, *masterOption.raftResumeState) + raftServerOption := &weed_server.RaftServerOption{ + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"), + Peers: peers, + ServerAddr: myMasterAddress, + DataDir: util.ResolvePath(*masterOption.metaFolder), + Topo: ms.Topo, + RaftResumeState: *masterOption.raftResumeState, + HeartbeatInterval: *masterOption.heartbeatInterval, + ElectionTimeout: *masterOption.electionTimeout, + } + raftServer, err := weed_server.NewRaftServer(raftServerOption) if raftServer == nil { glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) } diff --git a/weed/command/server.go b/weed/command/server.go index 01c59fb85..3a58c4305 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -98,6 +98,8 @@ func init() { masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address") masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") masterOptions.raftResumeState = cmdServer.Flag.Bool("resumeState", false, "resume previous state on start master server") + masterOptions.heartbeatInterval = cmdServer.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") + masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 568bfc7b5..91dd185c8 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -19,6 +19,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" ) +type RaftServerOption struct { + GrpcDialOption grpc.DialOption + Peers []pb.ServerAddress + ServerAddr pb.ServerAddress + DataDir string + Topo *topology.Topology + RaftResumeState bool + HeartbeatInterval time.Duration + ElectionTimeout time.Duration +} + type RaftServer struct { peers []pb.ServerAddress // initial peers to join with raftServer raft.Server @@ -52,12 +63,12 @@ func (s StateMachine) Recovery(data []byte) error { return nil } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) { +func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ - peers: peers, - serverAddr: serverAddr, - dataDir: dataDir, - topo: topo, + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, } if glog.V(4) { @@ -67,10 +78,10 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) var err error - transporter := raft.NewGrpcTransporter(grpcDialOption) - glog.V(0).Infof("Starting RaftServer with %v", serverAddr) + transporter := raft.NewGrpcTransporter(option.GrpcDialOption) + glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr) - if !raftResumeState { + if !option.RaftResumeState { // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "log")) @@ -80,14 +91,15 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser return nil, err } - stateMachine := StateMachine{topo: topo} - s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "") + stateMachine := StateMachine{topo: option.Topo} + s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "") if err != nil { glog.V(0).Infoln(err) return nil, err } - s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond) - s.raftServer.SetElectionTimeout(10 * time.Second) + heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + s.raftServer.SetHeartbeatInterval(heartbeatInterval) + s.raftServer.SetElectionTimeout(option.ElectionTimeout) if err := s.raftServer.LoadSnapshot(); err != nil { return nil, err } @@ -123,7 +135,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser s.GrpcServer = raft.NewGrpcServer(s.raftServer) - if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { + if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) { // Initialize the server by joining itself. // s.DoJoinCommand() }