mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #2661 from garenchan/ck-dev1
This commit is contained in:
commit
d3ee621fce
|
@ -44,6 +44,8 @@ type MasterOptions struct {
|
||||||
metricsIntervalSec *int
|
metricsIntervalSec *int
|
||||||
raftResumeState *bool
|
raftResumeState *bool
|
||||||
metricsHttpPort *int
|
metricsHttpPort *int
|
||||||
|
heartbeatInterval *time.Duration
|
||||||
|
electionTimeout *time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -65,6 +67,8 @@ func init() {
|
||||||
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
||||||
m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
|
m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
|
||||||
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
|
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
|
||||||
|
m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
|
||||||
|
m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers")
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdMaster = &Command{
|
var cmdMaster = &Command{
|
||||||
|
@ -132,8 +136,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
|
||||||
glog.Fatalf("Master startup error: %v", e)
|
glog.Fatalf("Master startup error: %v", e)
|
||||||
}
|
}
|
||||||
// start raftServer
|
// start raftServer
|
||||||
raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
|
raftServerOption := &weed_server.RaftServerOption{
|
||||||
peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, *masterOption.raftResumeState)
|
GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),
|
||||||
|
Peers: peers,
|
||||||
|
ServerAddr: myMasterAddress,
|
||||||
|
DataDir: util.ResolvePath(*masterOption.metaFolder),
|
||||||
|
Topo: ms.Topo,
|
||||||
|
RaftResumeState: *masterOption.raftResumeState,
|
||||||
|
HeartbeatInterval: *masterOption.heartbeatInterval,
|
||||||
|
ElectionTimeout: *masterOption.electionTimeout,
|
||||||
|
}
|
||||||
|
raftServer, err := weed_server.NewRaftServer(raftServerOption)
|
||||||
if raftServer == nil {
|
if raftServer == nil {
|
||||||
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
|
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,6 +98,8 @@ func init() {
|
||||||
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
|
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
|
||||||
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
||||||
masterOptions.raftResumeState = cmdServer.Flag.Bool("resumeState", false, "resume previous state on start master server")
|
masterOptions.raftResumeState = cmdServer.Flag.Bool("resumeState", false, "resume previous state on start master server")
|
||||||
|
masterOptions.heartbeatInterval = cmdServer.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
|
||||||
|
masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers")
|
||||||
|
|
||||||
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
|
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
|
||||||
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
|
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
|
||||||
|
|
|
@ -19,6 +19,17 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/topology"
|
"github.com/chrislusf/seaweedfs/weed/topology"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type RaftServerOption struct {
|
||||||
|
GrpcDialOption grpc.DialOption
|
||||||
|
Peers []pb.ServerAddress
|
||||||
|
ServerAddr pb.ServerAddress
|
||||||
|
DataDir string
|
||||||
|
Topo *topology.Topology
|
||||||
|
RaftResumeState bool
|
||||||
|
HeartbeatInterval time.Duration
|
||||||
|
ElectionTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
type RaftServer struct {
|
type RaftServer struct {
|
||||||
peers []pb.ServerAddress // initial peers to join with
|
peers []pb.ServerAddress // initial peers to join with
|
||||||
raftServer raft.Server
|
raftServer raft.Server
|
||||||
|
@ -52,12 +63,12 @@ func (s StateMachine) Recovery(data []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
|
func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
|
||||||
s := &RaftServer{
|
s := &RaftServer{
|
||||||
peers: peers,
|
peers: option.Peers,
|
||||||
serverAddr: serverAddr,
|
serverAddr: option.ServerAddr,
|
||||||
dataDir: dataDir,
|
dataDir: option.DataDir,
|
||||||
topo: topo,
|
topo: option.Topo,
|
||||||
}
|
}
|
||||||
|
|
||||||
if glog.V(4) {
|
if glog.V(4) {
|
||||||
|
@ -67,10 +78,10 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser
|
||||||
raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
|
raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
transporter := raft.NewGrpcTransporter(grpcDialOption)
|
transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
|
||||||
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
|
glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
|
||||||
|
|
||||||
if !raftResumeState {
|
if !option.RaftResumeState {
|
||||||
// always clear previous metadata
|
// always clear previous metadata
|
||||||
os.RemoveAll(path.Join(s.dataDir, "conf"))
|
os.RemoveAll(path.Join(s.dataDir, "conf"))
|
||||||
os.RemoveAll(path.Join(s.dataDir, "log"))
|
os.RemoveAll(path.Join(s.dataDir, "log"))
|
||||||
|
@ -80,14 +91,15 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
stateMachine := StateMachine{topo: topo}
|
stateMachine := StateMachine{topo: option.Topo}
|
||||||
s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "")
|
s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond)
|
heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
|
||||||
s.raftServer.SetElectionTimeout(10 * time.Second)
|
s.raftServer.SetHeartbeatInterval(heartbeatInterval)
|
||||||
|
s.raftServer.SetElectionTimeout(option.ElectionTimeout)
|
||||||
if err := s.raftServer.LoadSnapshot(); err != nil {
|
if err := s.raftServer.LoadSnapshot(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -123,7 +135,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser
|
||||||
|
|
||||||
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
|
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
|
||||||
|
|
||||||
if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
|
if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) {
|
||||||
// Initialize the server by joining itself.
|
// Initialize the server by joining itself.
|
||||||
// s.DoJoinCommand()
|
// s.DoJoinCommand()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue