properly handle quick volume server restart

This commit is contained in:
Chris Lu 2012-12-22 16:26:02 -08:00
parent 264678c9b1
commit 018f0b73be
4 changed files with 25 additions and 7 deletions

View file

@ -102,6 +102,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
} }
func dirJoinHandler(w http.ResponseWriter, r *http.Request) { func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
init := r.FormValue("init")=="true"
ip := r.FormValue("ip") ip := r.FormValue("ip")
if ip == "" { if ip == "" {
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
@ -113,7 +114,7 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
volumes := new([]storage.VolumeInfo) volumes := new([]storage.VolumeInfo)
json.Unmarshal([]byte(r.FormValue("volumes")), volumes) json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
debug(s, "volumes", r.FormValue("volumes")) debug(s, "volumes", r.FormValue("volumes"))
topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount) topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
m := make(map[string]interface{}) m := make(map[string]interface{})
m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB)*1024*1024 m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB)*1024*1024
writeJson(w, r, m) writeJson(w, r, m)

View file

@ -19,9 +19,10 @@ type Store struct {
PublicUrl string PublicUrl string
MaxVolumeCount int MaxVolumeCount int
//read from the master
masterNode string masterNode string
volumeSizeLimit uint64 connected bool
volumeSizeLimit uint64 //read from the master
} }
func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) { func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
@ -108,7 +109,7 @@ func (s *Store) loadExistingVolumes() {
if s.volumes[vid] == nil { if s.volumes[vid] == nil {
v := NewVolume(s.dir, vid, CopyNil) v := NewVolume(s.dir, vid, CopyNil)
s.volumes[vid] = v s.volumes[vid] = v
log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version,"size =", v.Size()) log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size())
} }
} }
} }
@ -141,6 +142,9 @@ func (s *Store) Join() error {
} }
bytes, _ := json.Marshal(stats) bytes, _ := json.Marshal(stats)
values := make(url.Values) values := make(url.Values)
if !s.connected {
values.Add("init", "true")
}
values.Add("port", strconv.Itoa(s.Port)) values.Add("port", strconv.Itoa(s.Port))
values.Add("ip", s.Ip) values.Add("ip", s.Ip)
values.Add("publicUrl", s.PublicUrl) values.Add("publicUrl", s.PublicUrl)
@ -155,6 +159,7 @@ func (s *Store) Join() error {
return err return err
} }
s.volumeSizeLimit = ret.VolumeSizeLimit s.volumeSizeLimit = ret.VolumeSizeLimit
s.connected = true
return nil return nil
} }
func (s *Store) Close() { func (s *Store) Close() {

View file

@ -18,6 +18,15 @@ func NewRack(id string) *Rack {
return r return r
} }
func (r *Rack) FindDataNode(ip string, port int) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
return dn
}
}
return nil
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
for _, c := range r.Children() { for _, c := range r.Children() {
dn := c.(*DataNode) dn := c.(*DataNode)

View file

@ -120,11 +120,15 @@ func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
} }
func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
dcName, rackName := t.configuration.Locate(ip) dcName, rackName := t.configuration.Locate(ip)
dc := t.GetOrCreateDataCenter(dcName) dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName) rack := dc.GetOrCreateRack(rackName)
dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) dn := rack.FindDataNode(ip, port)
if init && dn != nil {
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
for _, v := range volumeInfos { for _, v := range volumeInfos {
dn.AddOrUpdateVolume(v) dn.AddOrUpdateVolume(v)
t.RegisterVolumeLayout(&v, dn) t.RegisterVolumeLayout(&v, dn)
@ -142,4 +146,3 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
t.LinkChildNode(dc) t.LinkChildNode(dc)
return dc return dc
} }