diff --git a/weed/command/compact.go b/weed/command/compact.go index 5c3038c78..0dd4efe0e 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -33,14 +33,16 @@ func runCompact(cmd *Command, args []string) bool { return false } + preallocate := *compactVolumePreallocate * (1 << 20) + vid := storage.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil, *compactVolumePreallocate*(1<<20)) + storage.NeedleMapInMemory, nil, nil, preallocate) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } if *compactMethod == 0 { - if err = v.Compact(); err != nil { + if err = v.Compact(preallocate); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 360c6bb46..8c73b6019 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -79,7 +79,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) - ms.Topo.StartRefreshWritableVolumes(garbageThreshold) + ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate) return ms } diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index b15125576..8077a0eeb 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -42,7 +42,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque gcThreshold = ms.garbageThreshold } glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(gcThreshold) + ms.Topo.Vacuum(gcThreshold, ms.preallocate) ms.dirStatusHandler(w, r) } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index cace8d181..b0620de0b 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -52,6 +52,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) + adminMux.HandleFunc("/admin/vacuum/cleanup", vs.guard.WhiteList(vs.vacuumVolumeCleanupHandler)) adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) diff --git a/weed/server/volume_server_handlers_vacuum.go b/weed/server/volume_server_handlers_vacuum.go index ef348d35c..02b8c5547 100644 --- a/weed/server/volume_server_handlers_vacuum.go +++ b/weed/server/volume_server_handlers_vacuum.go @@ -3,7 +3,9 @@ package weed_server import ( "net/http" + "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + "strconv" ) func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { @@ -16,7 +18,15 @@ func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http. glog.V(2).Infoln("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) } func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.CompactVolume(r.FormValue("volume")) + var preallocate int64 + var err error + if r.FormValue("preallocate") != "" { + preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64) + if err != nil { + glog.V(0).Infoln("Failed to parse int64 preallocate = %s: %v", r.FormValue("preallocate"), err) + } + } + err = vs.store.CompactVolume(r.FormValue("volume"), preallocate) if err == nil { writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) } else { @@ -33,3 +43,12 @@ func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http } glog.V(2).Infoln("commit compact volume =", r.FormValue("volume"), ", error =", err) } +func (vs *VolumeServer) vacuumVolumeCleanupHandler(w http.ResponseWriter, r *http.Request) { + err := vs.store.CommitCleanupVolume(r.FormValue("volume")) + if err == nil { + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) + } else { + writeJsonError(w, r, http.StatusInternalServerError, err) + } + glog.V(2).Infoln("cleanup compact volume =", r.FormValue("volume"), ", error =", err) +} diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 03825c159..3ada1f49c 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -22,13 +22,13 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString } return fmt.Errorf("volume id %d is not found during check compact", vid), false } -func (s *Store) CompactVolume(volumeIdString string) error { +func (s *Store) CompactVolume(volumeIdString string, preallocate int64) error { vid, err := NewVolumeId(volumeIdString) if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) } if v := s.findVolume(vid); v != nil { - return v.Compact() + return v.Compact(preallocate) } return fmt.Errorf("volume id %d is not found during compact", vid) } @@ -42,3 +42,13 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error { } return fmt.Errorf("volume id %d is not found during commit compact", vid) } +func (s *Store) CommitCleanupVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + } + if v := s.findVolume(vid); v != nil { + return v.cleanupCompact() + } + return fmt.Errorf("volume id %d is not found during cleaning up", vid) +} diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go index d7cae8803..b934fc59d 100644 --- a/weed/storage/volume_sync.go +++ b/weed/storage/volume_sync.go @@ -55,7 +55,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) { return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) } if lastCompactRevision != compactRevision && lastCompactRevision != 0 { - if err = v.Compact(); err != nil { + if err = v.Compact(0); err != nil { return fmt.Errorf("Compact Volume before synchronizing %v", err) } if err = v.commitCompact(); err != nil { diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 22c117c41..a9fe6c03d 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -13,7 +13,7 @@ func (v *Volume) garbageLevel() float64 { return float64(v.nm.DeletedSize()) / float64(v.ContentSize()) } -func (v *Volume) Compact() error { +func (v *Volume) Compact(preallocate int64) error { glog.V(3).Infof("Compacting ...") //no need to lock for copy on write //v.accessLock.Lock() @@ -24,7 +24,7 @@ func (v *Volume) Compact() error { v.lastCompactIndexOffset = v.nm.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", 0) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate) } func (v *Volume) Compact2() error { @@ -72,6 +72,20 @@ func (v *Volume) commitCompact() error { return nil } +func (v *Volume) cleanupCompact() error { + glog.V(0).Infof("Cleaning up vacuuming...") + + e1 := os.Remove(v.FileName() + ".cpd") + e2 := os.Remove(v.FileName() + ".cpx") + if e1 != nil { + return e1 + } + if e2 != nil { + return e2 + } + return nil +} + func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) { if _, err = file.Seek(0, 0); err != nil { return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err) diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 44ab33114..639df6e5b 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -8,7 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" ) -func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { +func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string, preallocate int64) { go func() { for { if t.IsLeader() { @@ -22,7 +22,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { c := time.Tick(15 * time.Minute) for _ = range c { if t.IsLeader() { - t.Vacuum(garbageThreshold) + t.Vacuum(garbageThreshold, preallocate) } } }(garbageThreshold) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index dd20e8cb2..12c0ef8b9 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -6,6 +6,7 @@ import ( "net/url" "time" + "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" @@ -37,13 +38,13 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist } return isCheckSuccess } -func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { +func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { vl.removeFromWritable(vid) ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { go func(index int, url string, vid storage.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) - if e := vacuumVolume_Compact(url, vid); e != nil { + if e := vacuumVolume_Compact(url, vid, preallocate); e != nil { glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e) ch <- false } else { @@ -80,7 +81,18 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis } return isCommitSuccess } -func (t *Topology) Vacuum(garbageThreshold string) int { +func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) { + for _, dn := range locationlist.list { + glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) + if e := vacuumVolume_Cleanup(dn.Url(), vid); e != nil { + glog.V(0).Infoln("Error when cleaning up", vid, "on", dn.Url(), e) + } else { + glog.V(0).Infoln("Complete cleaning up", vid, "on", dn.Url()) + } + } +} + +func (t *Topology) Vacuum(garbageThreshold string, preallocate int64) int { glog.V(0).Infof("Start vacuum on demand with threshold:%s", garbageThreshold) for _, col := range t.collectionMap.Items() { c := col.(*Collection) @@ -99,7 +111,7 @@ func (t *Topology) Vacuum(garbageThreshold string) int { glog.V(0).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) { batchVacuumVolumeCommit(volumeLayout, vid, locationlist) } } @@ -133,9 +145,10 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho } return nil, ret.Result } -func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { +func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId, preallocate int64) error { values := make(url.Values) values.Add("volume", vid.String()) + values.Add("preallocate", fmt.Sprintf("%d", preallocate)) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values) if err != nil { return err @@ -165,3 +178,19 @@ func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { } return nil } +func vacuumVolume_Cleanup(urlLocation string, vid storage.VolumeId) error { + values := make(url.Values) + values.Add("volume", vid.String()) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/cleanup", values) + if err != nil { + return err + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +}