mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
preallocate disk space during compaction also, add cleanup for failed compaction
This commit is contained in:
parent
f7c22f0159
commit
58344980e4
|
@ -33,14 +33,16 @@ func runCompact(cmd *Command, args []string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
preallocate := *compactVolumePreallocate * (1 << 20)
|
||||
|
||||
vid := storage.VolumeId(*compactVolumeId)
|
||||
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
|
||||
storage.NeedleMapInMemory, nil, nil, *compactVolumePreallocate*(1<<20))
|
||||
storage.NeedleMapInMemory, nil, nil, preallocate)
|
||||
if err != nil {
|
||||
glog.Fatalf("Load Volume [ERROR] %s\n", err)
|
||||
}
|
||||
if *compactMethod == 0 {
|
||||
if err = v.Compact(); err != nil {
|
||||
if err = v.Compact(preallocate); err != nil {
|
||||
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -79,7 +79,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
|
|||
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
|
||||
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
|
||||
|
||||
ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
|
||||
ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate)
|
||||
|
||||
return ms
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
|
|||
gcThreshold = ms.garbageThreshold
|
||||
}
|
||||
glog.Infoln("garbageThreshold =", gcThreshold)
|
||||
ms.Topo.Vacuum(gcThreshold)
|
||||
ms.Topo.Vacuum(gcThreshold, ms.preallocate)
|
||||
ms.dirStatusHandler(w, r)
|
||||
}
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
|||
adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler))
|
||||
adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
|
||||
adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
|
||||
adminMux.HandleFunc("/admin/vacuum/cleanup", vs.guard.WhiteList(vs.vacuumVolumeCleanupHandler))
|
||||
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
|
||||
adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
|
||||
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
|
||||
|
|
|
@ -3,7 +3,9 @@ package weed_server
|
|||
import (
|
||||
"net/http"
|
||||
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -16,7 +18,15 @@ func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.
|
|||
glog.V(2).Infoln("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
|
||||
}
|
||||
func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
|
||||
err := vs.store.CompactVolume(r.FormValue("volume"))
|
||||
var preallocate int64
|
||||
var err error
|
||||
if r.FormValue("preallocate") != "" {
|
||||
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
|
||||
if err != nil {
|
||||
glog.V(0).Infoln("Failed to parse int64 preallocate = %s: %v", r.FormValue("preallocate"), err)
|
||||
}
|
||||
}
|
||||
err = vs.store.CompactVolume(r.FormValue("volume"), preallocate)
|
||||
if err == nil {
|
||||
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
|
||||
} else {
|
||||
|
@ -33,3 +43,12 @@ func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http
|
|||
}
|
||||
glog.V(2).Infoln("commit compact volume =", r.FormValue("volume"), ", error =", err)
|
||||
}
|
||||
func (vs *VolumeServer) vacuumVolumeCleanupHandler(w http.ResponseWriter, r *http.Request) {
|
||||
err := vs.store.CommitCleanupVolume(r.FormValue("volume"))
|
||||
if err == nil {
|
||||
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
|
||||
} else {
|
||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||
}
|
||||
glog.V(2).Infoln("cleanup compact volume =", r.FormValue("volume"), ", error =", err)
|
||||
}
|
||||
|
|
|
@ -22,13 +22,13 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString
|
|||
}
|
||||
return fmt.Errorf("volume id %d is not found during check compact", vid), false
|
||||
}
|
||||
func (s *Store) CompactVolume(volumeIdString string) error {
|
||||
func (s *Store) CompactVolume(volumeIdString string, preallocate int64) error {
|
||||
vid, err := NewVolumeId(volumeIdString)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
|
||||
}
|
||||
if v := s.findVolume(vid); v != nil {
|
||||
return v.Compact()
|
||||
return v.Compact(preallocate)
|
||||
}
|
||||
return fmt.Errorf("volume id %d is not found during compact", vid)
|
||||
}
|
||||
|
@ -42,3 +42,13 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error {
|
|||
}
|
||||
return fmt.Errorf("volume id %d is not found during commit compact", vid)
|
||||
}
|
||||
func (s *Store) CommitCleanupVolume(volumeIdString string) error {
|
||||
vid, err := NewVolumeId(volumeIdString)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
|
||||
}
|
||||
if v := s.findVolume(vid); v != nil {
|
||||
return v.cleanupCompact()
|
||||
}
|
||||
return fmt.Errorf("volume id %d is not found during cleaning up", vid)
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) {
|
|||
return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
|
||||
}
|
||||
if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
|
||||
if err = v.Compact(); err != nil {
|
||||
if err = v.Compact(0); err != nil {
|
||||
return fmt.Errorf("Compact Volume before synchronizing %v", err)
|
||||
}
|
||||
if err = v.commitCompact(); err != nil {
|
||||
|
|
|
@ -13,7 +13,7 @@ func (v *Volume) garbageLevel() float64 {
|
|||
return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
|
||||
}
|
||||
|
||||
func (v *Volume) Compact() error {
|
||||
func (v *Volume) Compact(preallocate int64) error {
|
||||
glog.V(3).Infof("Compacting ...")
|
||||
//no need to lock for copy on write
|
||||
//v.accessLock.Lock()
|
||||
|
@ -24,7 +24,7 @@ func (v *Volume) Compact() error {
|
|||
v.lastCompactIndexOffset = v.nm.IndexFileSize()
|
||||
v.lastCompactRevision = v.SuperBlock.CompactRevision
|
||||
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
||||
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", 0)
|
||||
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate)
|
||||
}
|
||||
|
||||
func (v *Volume) Compact2() error {
|
||||
|
@ -72,6 +72,20 @@ func (v *Volume) commitCompact() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *Volume) cleanupCompact() error {
|
||||
glog.V(0).Infof("Cleaning up vacuuming...")
|
||||
|
||||
e1 := os.Remove(v.FileName() + ".cpd")
|
||||
e2 := os.Remove(v.FileName() + ".cpx")
|
||||
if e1 != nil {
|
||||
return e1
|
||||
}
|
||||
if e2 != nil {
|
||||
return e2
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
|
||||
if _, err = file.Seek(0, 0); err != nil {
|
||||
return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
)
|
||||
|
||||
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
|
||||
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string, preallocate int64) {
|
||||
go func() {
|
||||
for {
|
||||
if t.IsLeader() {
|
||||
|
@ -22,7 +22,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
|
|||
c := time.Tick(15 * time.Minute)
|
||||
for _ = range c {
|
||||
if t.IsLeader() {
|
||||
t.Vacuum(garbageThreshold)
|
||||
t.Vacuum(garbageThreshold, preallocate)
|
||||
}
|
||||
}
|
||||
}(garbageThreshold)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"net/url"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
|
@ -37,13 +38,13 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
|
|||
}
|
||||
return isCheckSuccess
|
||||
}
|
||||
func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
|
||||
func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
|
||||
vl.removeFromWritable(vid)
|
||||
ch := make(chan bool, locationlist.Length())
|
||||
for index, dn := range locationlist.list {
|
||||
go func(index int, url string, vid storage.VolumeId) {
|
||||
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
|
||||
if e := vacuumVolume_Compact(url, vid); e != nil {
|
||||
if e := vacuumVolume_Compact(url, vid, preallocate); e != nil {
|
||||
glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e)
|
||||
ch <- false
|
||||
} else {
|
||||
|
@ -80,7 +81,18 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
|
|||
}
|
||||
return isCommitSuccess
|
||||
}
|
||||
func (t *Topology) Vacuum(garbageThreshold string) int {
|
||||
func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
|
||||
for _, dn := range locationlist.list {
|
||||
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
|
||||
if e := vacuumVolume_Cleanup(dn.Url(), vid); e != nil {
|
||||
glog.V(0).Infoln("Error when cleaning up", vid, "on", dn.Url(), e)
|
||||
} else {
|
||||
glog.V(0).Infoln("Complete cleaning up", vid, "on", dn.Url())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Topology) Vacuum(garbageThreshold string, preallocate int64) int {
|
||||
glog.V(0).Infof("Start vacuum on demand with threshold:%s", garbageThreshold)
|
||||
for _, col := range t.collectionMap.Items() {
|
||||
c := col.(*Collection)
|
||||
|
@ -99,7 +111,7 @@ func (t *Topology) Vacuum(garbageThreshold string) int {
|
|||
|
||||
glog.V(0).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
|
||||
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
|
||||
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
|
||||
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) {
|
||||
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
|
||||
}
|
||||
}
|
||||
|
@ -133,9 +145,10 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho
|
|||
}
|
||||
return nil, ret.Result
|
||||
}
|
||||
func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
|
||||
func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId, preallocate int64) error {
|
||||
values := make(url.Values)
|
||||
values.Add("volume", vid.String())
|
||||
values.Add("preallocate", fmt.Sprintf("%d", preallocate))
|
||||
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -165,3 +178,19 @@ func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
func vacuumVolume_Cleanup(urlLocation string, vid storage.VolumeId) error {
|
||||
values := make(url.Values)
|
||||
values.Add("volume", vid.String())
|
||||
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/cleanup", values)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var ret VacuumVolumeResult
|
||||
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
|
||||
return err
|
||||
}
|
||||
if ret.Error != "" {
|
||||
return errors.New(ret.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue