diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index 2adaadeaf..1468984cd 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -37,6 +37,7 @@ var ( defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") mReadTimeout = cmdMaster.Flag.Int("readTimeout", 1, "connection read timeout in seconds") mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") ) var topo *topology.Topology @@ -123,24 +124,24 @@ func dirStatusHandler(w http.ResponseWriter, r *http.Request) { } func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { - count := 0 - rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) - if err == nil { - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if topo.FreeSpace() < count*rt.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) - } else { - count, err = vg.GrowByCountAndType(count, rt, topo) - } - } - } - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]interface{}{"count": count}) - } + count := 0 + rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) + if err == nil { + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if topo.FreeSpace() < count*rt.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) + } else { + count, err = vg.GrowByCountAndType(count, rt, topo) + } + } + } + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": err.Error()}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]interface{}{"count": count}) + } } func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { @@ -153,24 +154,24 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { } else { count, err = vg.GrowByCountAndType(count, rt, topo) } - }else{ - err = errors.New("parameter count is not found") + } else { + err = errors.New("parameter count is not found") } } if err != nil { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "parameter replication "+err.Error()}) + writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) } else { - w.WriteHeader(http.StatusNotAcceptable) + w.WriteHeader(http.StatusNotAcceptable) writeJson(w, r, map[string]interface{}{"count": count}) } } func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = topo.ToVolumeMap() - writeJson(w, r, m) + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Volumes"] = topo.ToVolumeMap() + writeJson(w, r, m) } func runMaster(cmd *Command, args []string) bool { @@ -186,9 +187,9 @@ func runMaster(cmd *Command, args []string) bool { http.HandleFunc("/dir/join", dirJoinHandler) http.HandleFunc("/dir/status", dirStatusHandler) http.HandleFunc("/vol/grow", volumeGrowHandler) - http.HandleFunc("/vol/status", volumeStatusHandler) + http.HandleFunc("/vol/status", volumeStatusHandler) - topo.StartRefreshWritableVolumes() + topo.StartRefreshWritableVolumes(*garbageThreshold) log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport)) srv := &http.Server{ diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go index c8ad9b2e7..e87bf982f 100644 --- a/weed-fs/src/pkg/topology/topology_compact.go +++ b/weed-fs/src/pkg/topology/topology_compact.go @@ -10,12 +10,12 @@ import ( "time" ) -func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { +func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { go func(index int, url string, vid storage.VolumeId) { //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url()) - if e, ret := vacuumVolume_Check(url, vid); e != nil { + if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil { //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e) ch <- false } else { @@ -78,11 +78,11 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis } return isCommitSuccess } -func (t *Topology) Vacuum() int { +func (t *Topology) Vacuum(garbageThreshold string) int { for _, vl := range t.replicaType2VolumeLayout { if vl != nil { for vid, locationlist := range vl.vid2location { - if batchVacuumVolumeCheck(vl, vid, locationlist) { + if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { if batchVacuumVolumeCompact(vl, vid, locationlist) { batchVacuumVolumeCommit(vl, vid, locationlist) } @@ -98,10 +98,10 @@ type VacuumVolumeResult struct { Error string } -func vacuumVolume_Check(urlLocation string, vid storage.VolumeId) (error, bool) { +func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("garbageThreshold", "0.3") + values.Add("garbageThreshold", garbageThreshold) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) if err != nil { return err, false diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index 322d6c27a..9d7999f93 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -7,7 +7,7 @@ import ( "time" ) -func (t *Topology) StartRefreshWritableVolumes() { +func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { go func() { for { freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval @@ -15,12 +15,12 @@ func (t *Topology) StartRefreshWritableVolumes() { time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) } }() - go func() { + go func(garbageThreshold string) { c := time.Tick(15 * time.Minute) for _ = range c { - t.Vacuum() + t.Vacuum(garbageThreshold) } - }() + }(garbageThreshold) go func() { for { select {