diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index ee7eaf886..767db3aea 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,8 +1,11 @@ package weed_server import ( + "math/rand" "net/http" "strconv" + "sync" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" @@ -11,17 +14,21 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer/redis_store" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" ) type FilerServer struct { port string master string + mnLock sync.RWMutex collection string defaultReplication string redirectOnRead bool disableDirListing bool secret security.Secret filer filer.Filer + masterNodes *storage.MasterNodes } func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, @@ -59,9 +66,80 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st r.HandleFunc("/", fs.filerHandler) + go func() { + connected := true + + fs.masterNodes = storage.NewMasterNodes(fs.master) + glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode()) + + //force initialize with all available master nodes + fs.masterNodes.FindMaster() + + for { + glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode()) + master, err := fs.detectHealthyMaster(fs.getMasterNode()) + if err == nil { + if !connected { + connected = true + if fs.getMasterNode() != master { + fs.setMasterNode(master) + } + glog.V(0).Infoln("Filer Server Connected with master at", master) + } + } else { + glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err) + if connected { + connected = false + } + } + if connected { + time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond) + } else { + time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond) + } + } + }() + return fs, nil } func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { return security.GenJwt(fs.secret, fileId) } + +func (fs *FilerServer) getMasterNode() string { + fs.mnLock.RLock() + defer fs.mnLock.RUnlock() + return fs.master +} + +func (fs *FilerServer) setMasterNode(masterNode string) { + fs.mnLock.Lock() + defer fs.mnLock.Unlock() + fs.master = masterNode +} + +func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) { + statUrl := "http://" + masterNode + "/stats" + glog.V(4).Infof("Connecting to %s ...", statUrl) + _, e = util.Get(statUrl) + if e != nil { + fs.masterNodes.Reset() + for i := 0; i <= 3; i++ { + master, e = fs.masterNodes.FindMaster() + if e != nil { + continue + } else { + statUrl := "http://" + master + "/stats" + glog.V(4).Infof("Connecting to %s ...", statUrl) + _, e = util.Get(statUrl) + if e == nil { + break + } + } + } + } else { + master = masterNode + } + return +} diff --git a/weed/storage/store.go b/weed/storage/store.go index d44d6a863..485ed437f 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -32,14 +32,14 @@ func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} return } -func (mn *MasterNodes) reset() { +func (mn *MasterNodes) Reset() { glog.V(4).Infof("Resetting master nodes: %v", mn) if len(mn.nodes) > 1 && mn.lastNode >= 0 { glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes) mn.lastNode = -mn.lastNode - 1 } } -func (mn *MasterNodes) findMaster() (string, error) { +func (mn *MasterNodes) FindMaster() (string, error) { if len(mn.nodes) == 0 { return "", errors.New("No master node found!") } @@ -210,7 +210,7 @@ func (s *Store) SetBootstrapMaster(bootstrapMaster string) { s.masterNodes = NewMasterNodes(bootstrapMaster) } func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { - masterNode, e = s.masterNodes.findMaster() + masterNode, e = s.masterNodes.FindMaster() if e != nil { return } @@ -270,17 +270,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S jsonBlob, err := util.PostBytes(joinUrl, data) if err != nil { - s.masterNodes.reset() + s.masterNodes.Reset() return "", "", err } var ret operation.JoinResult if err := json.Unmarshal(jsonBlob, &ret); err != nil { glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) - s.masterNodes.reset() + s.masterNodes.Reset() return masterNode, "", err } if ret.Error != "" { - s.masterNodes.reset() + s.masterNodes.Reset() return masterNode, "", errors.New(ret.Error) } s.volumeSizeLimit = ret.VolumeSizeLimit