From 008aee0dc1932f75c86e52893044d9cd953ef405 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 11 Apr 2014 16:23:58 -0700 Subject: [PATCH] Add retrying logic to wait for other peers during cluster bootstrapping. --- go/topology/topology.go | 13 +++++++---- go/weed/weed_server/master_server.go | 8 +++++-- go/weed/weed_server/raft_server.go | 26 +++++++++++++++++---- go/weed/weed_server/raft_server_handlers.go | 12 ++++++---- 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/go/topology/topology.go b/go/topology/topology.go index d5af60cd8..6c5bde304 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -52,21 +52,26 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL } func (t *Topology) IsLeader() bool { - return t.RaftServer == nil || t.Leader() == t.RaftServer.Name() + if leader, e := t.Leader(); e == nil { + return leader == t.RaftServer.Name() + } + return false } -func (t *Topology) Leader() string { +func (t *Topology) Leader() (string, error) { l := "" if t.RaftServer != nil { l = t.RaftServer.Leader() + } else { + return "", errors.New("Raft Server not ready yet!") } if l == "" { // We are a single node cluster, we are the leader - return t.RaftServer.Name() + return t.RaftServer.Name(), errors.New("Raft Server not initialized!") } - return l + return l, nil } func (t *Topology) loadConfiguration(configurationFile string) error { diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 63cd18546..286b90aca 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -77,12 +77,16 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { ms.Topo.RaftServer = raftServer.raftServer ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + } }) if ms.Topo.IsLeader() { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") } else { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + } } } diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go index 6b03ea7fb..13889df96 100644 --- a/go/weed/weed_server/raft_server.go +++ b/go/weed/weed_server/raft_server.go @@ -10,6 +10,7 @@ import ( "github.com/goraft/raft" "github.com/gorilla/mux" "io/ioutil" + "math/rand" "net/http" "net/url" "strings" @@ -59,13 +60,28 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin // Join to leader if specified. if len(s.peers) > 0 { - glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) - if !s.raftServer.IsLogEmpty() { - glog.V(0).Infoln("Cannot join with an existing log") + glog.V(0).Infoln("Starting cluster with existing logs.") } else { - if err := s.Join(s.peers); err != nil { - return nil + glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + firstJoinError := s.Join(s.peers) + if firstJoinError != nil { + glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") + _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: "http://" + s.httpAddr, + }) + if err != nil { + glog.V(0).Infoln(err) + return nil + } + } + var err error + for err != nil { + glog.V(0).Infoln("waiting for peers on", strings.Join(s.peers, ","), "...") + time.Sleep(time.Duration(1000+rand.Intn(2000)) * time.Millisecond) + err = s.Join(s.peers) } glog.V(0).Infoln("Joined cluster") } diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go index 38943cc8d..66430fae7 100644 --- a/go/weed/weed_server/raft_server_handlers.go +++ b/go/weed/weed_server/raft_server_handlers.go @@ -40,10 +40,10 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter } func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) { - if s.topo.Leader() != "" { + if leader, e := s.topo.Leader(); e == nil { //http.StatusMovedPermanently does not cause http POST following redirection - glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.topo.Leader()+req.URL.Path) - http.Redirect(w, req, "http://"+s.topo.Leader()+req.URL.Path, http.StatusMovedPermanently) + glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+leader+req.URL.Path) + http.Redirect(w, req, "http://"+leader+req.URL.Path, http.StatusMovedPermanently) } else { glog.V(0).Infoln("Error: Leader Unknown") http.Error(w, "Leader unknown", http.StatusInternalServerError) @@ -53,7 +53,11 @@ func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["IsLeader"] = s.topo.IsLeader() - m["Leader"] = s.topo.Leader() + if leader, e := s.topo.Leader(); e == nil { + m["Leader"] = leader + } else { + m["Leader"] = "" + } m["Peers"] = s.Peers() writeJsonQuiet(w, r, m) }