mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
a stable working clustering master node implementation
This commit is contained in:
parent
1d5c44e2df
commit
d6fbd741fd
|
@ -8,6 +8,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/goraft/raft"
|
||||
"github.com/gorilla/mux"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
@ -86,6 +87,10 @@ func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr strin
|
|||
return s
|
||||
}
|
||||
|
||||
func (s *RaftServer) IsLeader() bool {
|
||||
return s.Leader() == s.raftServer.Name()
|
||||
}
|
||||
|
||||
func (s *RaftServer) Leader() string {
|
||||
l := s.raftServer.Leader()
|
||||
|
||||
|
@ -97,7 +102,7 @@ func (s *RaftServer) Leader() string {
|
|||
return l
|
||||
}
|
||||
|
||||
func (s *RaftServer) Members() (members []string) {
|
||||
func (s *RaftServer) Peers() (members []string) {
|
||||
peers := s.raftServer.Peers()
|
||||
|
||||
for _, p := range peers {
|
||||
|
@ -118,12 +123,13 @@ func (s *RaftServer) Join(peers []string) error {
|
|||
json.NewEncoder(&b).Encode(command)
|
||||
|
||||
for _, m := range peers {
|
||||
glog.V(0).Infoln("Attempting to connect to:", m)
|
||||
target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m))
|
||||
glog.V(0).Infoln("Attempting to connect to:", target)
|
||||
|
||||
resp, err := http.Post(fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)), "application/json", &b)
|
||||
glog.V(0).Infoln("Post returned: ", err)
|
||||
err := postFollowingOneRedirect(target, "application/json", &b)
|
||||
|
||||
if err != nil {
|
||||
glog.V(0).Infoln("Post returned error: ", err)
|
||||
if _, ok := err.(*url.Error); ok {
|
||||
// If we receive a network error try the next member
|
||||
continue
|
||||
|
@ -132,9 +138,43 @@ func (s *RaftServer) Join(peers []string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New("Could not connect to any cluster peers")
|
||||
}
|
||||
|
||||
// a workaround because http POST following redirection misses request body
|
||||
func postFollowingOneRedirect(target string, contentType string, b *bytes.Buffer) error {
|
||||
backupReader := bytes.NewReader(b.Bytes())
|
||||
resp, err := http.Post(target, contentType, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
reply, _ := ioutil.ReadAll(resp.Body)
|
||||
statusCode := resp.StatusCode
|
||||
|
||||
if statusCode == http.StatusMovedPermanently {
|
||||
var urlStr string
|
||||
if urlStr = resp.Header.Get("Location"); urlStr == "" {
|
||||
return errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
|
||||
}
|
||||
|
||||
glog.V(0).Infoln("Post redirected to ", urlStr)
|
||||
resp2, err2 := http.Post(urlStr, contentType, backupReader)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
defer resp2.Body.Close()
|
||||
reply, _ = ioutil.ReadAll(resp2.Body)
|
||||
statusCode = resp2.StatusCode
|
||||
}
|
||||
|
||||
glog.V(0).Infoln("Post returned status: ", statusCode, string(reply))
|
||||
if statusCode != http.StatusOK {
|
||||
return errors.New(string(reply))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"code.google.com/p/weed-fs/go/glog"
|
||||
"encoding/json"
|
||||
"github.com/goraft/raft"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Handles incoming RAFT joins.
|
||||
|
@ -12,7 +14,9 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
|
|||
glog.V(0).Infoln("Processing incoming join. Current Leader", s.raftServer.Leader(), "Self", s.raftServer.Name(), "Peers", s.raftServer.Peers())
|
||||
command := &raft.DefaultJoinCommand{}
|
||||
|
||||
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
|
||||
commandText, _ := ioutil.ReadAll(req.Body)
|
||||
glog.V(0).Info("Command:", string(commandText))
|
||||
if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil {
|
||||
glog.V(0).Infoln("Error decoding json message:", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -37,7 +41,8 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter
|
|||
|
||||
func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) {
|
||||
if s.Leader() != "" {
|
||||
glog.V(0).Infoln("Redirecting to", "http://"+s.Leader()+req.URL.Path)
|
||||
//http.StatusMovedPermanently does not cause http POST following redirection
|
||||
glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.Leader()+req.URL.Path)
|
||||
http.Redirect(w, req, "http://"+s.Leader()+req.URL.Path, http.StatusMovedPermanently)
|
||||
} else {
|
||||
glog.V(0).Infoln("Error: Leader Unknown")
|
||||
|
@ -47,7 +52,8 @@ func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request)
|
|||
|
||||
func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
m := make(map[string]interface{})
|
||||
m["IsLeader"] = s.IsLeader()
|
||||
m["Leader"] = s.Leader()
|
||||
m["Members"] = s.Members()
|
||||
m["Peers"] = s.Peers()
|
||||
writeJsonQuiet(w, r, m)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue