mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Clean raft configurations if "peers" option is set.
This commit is contained in:
parent
179d36ba0e
commit
7ce628bf09
|
@ -9,6 +9,8 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -46,6 +48,13 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
|
||||||
transporter := raft.NewHTTPTransporter("/cluster", 0)
|
transporter := raft.NewHTTPTransporter("/cluster", 0)
|
||||||
transporter.Transport.MaxIdleConnsPerHost = 1024
|
transporter.Transport.MaxIdleConnsPerHost = 1024
|
||||||
|
|
||||||
|
// Clear old cluster configurations if peers are set
|
||||||
|
if len(s.peers) > 0 {
|
||||||
|
os.RemoveAll(path.Join(s.dataDir, "conf"))
|
||||||
|
os.RemoveAll(path.Join(s.dataDir, "log"))
|
||||||
|
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
|
||||||
|
}
|
||||||
|
|
||||||
s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "")
|
s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
|
@ -53,35 +62,30 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
|
||||||
}
|
}
|
||||||
transporter.Install(s.raftServer, s)
|
transporter.Install(s.raftServer, s)
|
||||||
s.raftServer.SetHeartbeatInterval(1 * time.Second)
|
s.raftServer.SetHeartbeatInterval(1 * time.Second)
|
||||||
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 1150 * time.Millisecond)
|
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 3450 * time.Millisecond)
|
||||||
s.raftServer.Start()
|
s.raftServer.Start()
|
||||||
|
|
||||||
s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST")
|
s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST")
|
||||||
s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET")
|
s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET")
|
||||||
|
|
||||||
// Join to leader if specified.
|
|
||||||
if len(s.peers) > 0 {
|
if len(s.peers) > 0 {
|
||||||
if !s.raftServer.IsLogEmpty() {
|
// Join to leader if specified.
|
||||||
glog.V(0).Infoln("Starting cluster with existing logs.")
|
glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
|
||||||
} else {
|
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
||||||
glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
|
firstJoinError := s.Join(s.peers)
|
||||||
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
if firstJoinError != nil {
|
||||||
firstJoinError := s.Join(s.peers)
|
glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.")
|
||||||
if firstJoinError != nil {
|
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
|
||||||
glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.")
|
Name: s.raftServer.Name(),
|
||||||
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
|
ConnectionString: "http://" + s.httpAddr,
|
||||||
Name: s.raftServer.Name(),
|
})
|
||||||
ConnectionString: "http://" + s.httpAddr,
|
if err != nil {
|
||||||
})
|
glog.V(0).Infoln(err)
|
||||||
if err != nil {
|
return nil
|
||||||
glog.V(0).Infoln(err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the server by joining itself.
|
|
||||||
} else if s.raftServer.IsLogEmpty() {
|
} else if s.raftServer.IsLogEmpty() {
|
||||||
|
// Initialize the server by joining itself.
|
||||||
glog.V(0).Infoln("Initializing new cluster")
|
glog.V(0).Infoln("Initializing new cluster")
|
||||||
|
|
||||||
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
|
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
|
||||||
|
@ -95,7 +99,7 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
glog.V(0).Infoln("Recovered from log")
|
glog.V(0).Infoln("Old conf,log,snapshot should have been removed.")
|
||||||
}
|
}
|
||||||
|
|
||||||
return s
|
return s
|
||||||
|
|
Loading…
Reference in a new issue