diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index efd1498e0..46833db5c 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -5,13 +5,11 @@ import ( "errors" "log" "net/http" - "pkg/directory" "pkg/replication" "pkg/storage" "pkg/topology" "strconv" "strings" - "time" ) func init() { @@ -37,7 +35,6 @@ var ( confFile = cmdMaster.Flag.String("conf", "/etc/weed.conf", "xml configuration file") ) -var mapper *directory.Mapper var topo *topology.Topology var vg *replication.VolumeGrowth @@ -48,30 +45,25 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) { vid = vid[0:commaSep] } volumeId, _ := storage.NewVolumeId(vid) - machines, e := mapper.Get(volumeId) - if e == nil { + machines := topo.Lookup(volumeId) + if machines == nil { ret := []map[string]string{} - for _, machine := range machines { - ret = append(ret, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl}) + for _, dn := range *machines { + ret = append(ret, map[string]string{"url": dn.Ip + strconv.Itoa(dn.Port), "publicUrl": dn.PublicUrl}) } writeJson(w, r, map[string]interface{}{"locations": ret}) } else { log.Println("Invalid volume id", volumeId) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. " + e.Error()}) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) } } func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - c := r.FormValue("count") - fid, count, machine, err := mapper.PickForWrite(c) - if err == nil { - writeJson(w, r, map[string]interface{}{"fid": fid, "url": machine.Url, "publicUrl": machine.PublicUrl, "count": count}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } -} -func dirAssign2Handler(w http.ResponseWriter, r *http.Request) { c, _ := strconv.Atoi(r.FormValue("count")) - rt, err := storage.NewReplicationType(r.FormValue("replication")) + repType := r.FormValue("replication") + if repType == ""{ + repType = "00" + } + rt, err := storage.NewReplicationType(repType) if err != nil { writeJson(w, r, map[string]string{"error": err.Error()}) return @@ -101,15 +93,11 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { if *IsDebug { log.Println(s, "volumes", r.FormValue("volumes")) } - mapper.Add(directory.NewMachine(s, publicUrl, *volumes, time.Now().Unix())) //new ways topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount) } -func dirOldStatusHandler(w http.ResponseWriter, r *http.Request) { - writeJson(w, r, mapper) -} -func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) { +func dirStatusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, topo.ToMap()) } func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { @@ -135,16 +123,12 @@ func runMaster(cmd *Command, args []string) bool { topo = topology.NewTopology("topo", *confFile, *metaFolder, "toposequence", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) vg = replication.NewDefaultVolumeGrowth() log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") - mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) http.HandleFunc("/dir/assign", dirAssignHandler) - http.HandleFunc("/dir/assign2", dirAssign2Handler) http.HandleFunc("/dir/lookup", dirLookupHandler) http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirOldStatusHandler) - http.HandleFunc("/dir/status2", dirNewStatusHandler) //temporary + http.HandleFunc("/dir/status", dirStatusHandler) http.HandleFunc("/vol/grow", volumeGrowHandler) - mapper.StartRefreshWritableVolumes() topo.StartRefreshWritableVolumes() log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport)) diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index ed04c3e73..e5b83da10 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -105,12 +105,12 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { log.Println("read bytes", count, "error", e) } if e != nil || count <= 0 { - w.WriteHeader(404) + w.WriteHeader(http.StatusNotFound) return } if n.Cookie != cookie { log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - w.WriteHeader(404) + w.WriteHeader(http.StatusNotFound) return } if ext != "" { @@ -162,6 +162,7 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { log.Println("Failed to lookup for", volumeId, lookupErr.Error()) } } + w.WriteHeader(http.StatusCreated) } m := make(map[string]uint32) m["size"] = ret diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go deleted file mode 100644 index 99aa90a5f..000000000 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ /dev/null @@ -1,130 +0,0 @@ -package directory - -import ( - "errors" - "log" - "math/rand" - "pkg/sequence" - "pkg/storage" - "pkg/util" - "time" -) - -type Machine struct { - C1Volumes []storage.VolumeInfo - Url string //[:port] - PublicUrl string - LastSeen int64 // unix time in seconds -} - -type Mapper struct { - Machines map[string]*Machine - vid2machines map[storage.VolumeId][]*Machine - Writers []storage.VolumeId // transient array of Writers volume id - pulse int64 - - volumeSizeLimit uint64 - - sequence sequence.Sequencer -} - -func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo, lastSeen int64) *Machine { - return &Machine{Url: server, PublicUrl: publicUrl, C1Volumes: volumes, LastSeen: lastSeen} -} - -func NewMapper(dirname string, filename string, volumeSizeLimit uint64, pulse int) (m *Mapper) { - m = &Mapper{} - m.vid2machines = make(map[storage.VolumeId][]*Machine) - m.volumeSizeLimit = volumeSizeLimit - m.Writers = *new([]storage.VolumeId) - m.Machines = make(map[string]*Machine) - m.pulse = int64(pulse) - - m.sequence = sequence.NewSequencer(dirname, filename) - - return -} -func (m *Mapper) PickForWrite(c string) (string, int, *Machine, error) { - len_writers := len(m.Writers) - if len_writers <= 0 { - log.Println("No more writable volumes!") - return "", 0, nil, errors.New("No more writable volumes!") - } - vid := m.Writers[rand.Intn(len_writers)] - machines := m.vid2machines[vid] - if machines != nil && len(machines)>0 { - fileId, count := m.sequence.NextFileId(util.ParseInt(c, 1)) - if count == 0 { - return "", 0, nil, errors.New("Strange count:" + c) - } - //always use the first server to write - return NewFileId(vid, fileId, rand.Uint32()).String(), count, machines[0], nil - } - return "", 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") -} -func (m *Mapper) Get(vid storage.VolumeId) ([]*Machine, error) { - machines := m.vid2machines[vid] - if machines == nil { - return nil, errors.New("invalid volume id " + vid.String()) - } - return machines, nil -} -func (m *Mapper) Add(machine *Machine) { - m.Machines[machine.Url] = machine - //add to vid2machine map, and Writers array - for _, v := range machine.C1Volumes { - list := m.vid2machines[v.Id] - found := false - for index, entry := range list { - if machine.Url == entry.Url { - list[index] = machine - found = true - } - } - if !found { - m.vid2machines[v.Id] = append(m.vid2machines[v.Id], machine) - } - } - m.refreshWritableVolumes() -} -func (m *Mapper) remove(machine *Machine) { - delete(m.Machines, machine.Url) - for _, v := range machine.C1Volumes { - list := m.vid2machines[v.Id] - foundIndex := -1 - for index, entry := range list { - if machine.Url == entry.Url { - foundIndex = index - } - } - m.vid2machines[v.Id] = append(m.vid2machines[v.Id][:foundIndex], m.vid2machines[v.Id][foundIndex+1:]...) - } -} - -func (m *Mapper) StartRefreshWritableVolumes() { - go func() { - for { - m.refreshWritableVolumes() - time.Sleep(time.Duration(float32(m.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() -} - -func (m *Mapper) refreshWritableVolumes() { - freshThreshHold := time.Now().Unix() - 3*m.pulse //3 times of sleep interval - //setting Writers, copy-on-write because of possible updating, this needs some future work! - var writers []storage.VolumeId - for _, machine_entry := range m.Machines { - if machine_entry.LastSeen > freshThreshHold { - for _, v := range machine_entry.C1Volumes { - if uint64(v.Size) < m.volumeSizeLimit { - writers = append(writers, v.Id) - } - } - } else { - log.Println("Warning! DataNode", machine_entry.Url, "last seen is", time.Now().Unix()-machine_entry.LastSeen, "seconds ago!") - m.remove(machine_entry) - } - } - m.Writers = writers -} diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 7849475c9..9cef51249 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -57,6 +57,15 @@ func (t *Topology) loadConfiguration(configurationFile string)error{ return e } +func (t *Topology) Lookup(vid storage.VolumeId) (*[]*DataNode) { + for _, vl := range t.replicaType2VolumeLayout { + if list := vl.Lookup(vid); list!=nil { + return list + } + } + return nil +} + func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { if t.FreeSpace() <= 0 { return false, nil, nil diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 5159e8e75..2dee803d4 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -38,6 +38,10 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } +func (vl *VolumeLayout) Lookup(vid storage.VolumeId) (*[]*DataNode) { + return &vl.vid2location[vid].list +} + func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 {