diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 6537cea13..b65433e81 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -57,6 +57,15 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { } debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) } +func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { + err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) + if err == nil { + writeJson(w, r, map[string]interface{}{"error": "", "result": ret}) + } else { + writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false}) + } + debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) +} func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { err := store.CompactVolume(r.FormValue("volume")) if err == nil { @@ -67,9 +76,9 @@ func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { debug("compacted volume =", r.FormValue("volume"), ", error =", err) } func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { - count, err := store.CommitCompactVolume(r.FormValue("volume")) + err := store.CommitCompactVolume(r.FormValue("volume")) if err == nil { - writeJson(w, r, map[string]interface{}{"error": "", "size": count}) + writeJson(w, r, map[string]interface{}{"error": ""}) } else { writeJson(w, r, map[string]string{"error": err.Error()}) } @@ -302,6 +311,7 @@ func runVolume(cmd *Command, args []string) bool { http.HandleFunc("/", storeHandler) http.HandleFunc("/status", statusHandler) http.HandleFunc("/admin/assign_volume", assignVolumeHandler) + http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler) http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler) http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler) diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 4e83a76c8..7365022ea 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -29,17 +29,25 @@ func NewCompactSection(start Key) CompactSection { start: start, } } -func (cs *CompactSection) Set(key Key, offset uint32, size uint32) { + +//return old entry size +func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 { + ret := uint32(0) if key > cs.end { cs.end = key } if i := cs.binarySearchValues(key); i >= 0 { + ret = cs.values[i].Size + //println("key", key, "old size", ret) cs.values[i].Offset, cs.values[i].Size = offset, size } else { - needOverflow := cs.counter >= batch - needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key + needOverflow := cs.counter >= batch + needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key if needOverflow { //println("start", cs.start, "counter", cs.counter, "key", key) + if oldValue := cs.overflow[key]; oldValue != nil { + ret = oldValue.Size + } cs.overflow[key] = &NeedleValue{Key: key, Offset: offset, Size: size} } else { p := &cs.values[cs.counter] @@ -48,12 +56,23 @@ func (cs *CompactSection) Set(key Key, offset uint32, size uint32) { cs.counter++ } } + return ret } -func (cs *CompactSection) Delete(key Key) { + +//return old entry size +func (cs *CompactSection) Delete(key Key) uint32 { + ret := uint32(0) if i := cs.binarySearchValues(key); i >= 0 { - cs.values[i].Size = 0 + if cs.values[i].Size > 0 { + ret = cs.values[i].Size + cs.values[i].Size = 0 + } } - delete(cs.overflow, key) + if v := cs.overflow[key]; v != nil { + delete(cs.overflow, key) + ret = v.Size + } + return ret } func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { if v, ok := cs.overflow[key]; ok { @@ -94,21 +113,21 @@ func NewCompactMap() CompactMap { return CompactMap{} } -func (cm *CompactMap) Set(key Key, offset uint32, size uint32) { +func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { x := cm.binarySearchCompactSection(key) if x < 0 { //println(x, "creating", len(cm.list), "section1, starting", key) cm.list = append(cm.list, NewCompactSection(key)) x = len(cm.list) - 1 } - cm.list[x].Set(key, offset, size) + return cm.list[x].Set(key, offset, size) } -func (cm *CompactMap) Delete(key Key) { +func (cm *CompactMap) Delete(key Key) uint32 { x := cm.binarySearchCompactSection(key) if x < 0 { - return + return uint32(0) } - cm.list[x].Delete(key) + return cm.list[x].Delete(key) } func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) { x := cm.binarySearchCompactSection(key) @@ -123,7 +142,7 @@ func (cm *CompactMap) binarySearchCompactSection(key Key) int { return -5 } if cm.list[h].start <= key { - if cm.list[h].counter < batch || key <= cm.list[h].end{ + if cm.list[h].counter < batch || key <= cm.list[h].end { return h } else { return -4 @@ -150,9 +169,9 @@ func (cm *CompactMap) Peek() { println("[", v.Key, v.Offset, v.Size, "]") } } - for k, v := range cm.list[0].overflow { - if k < 100 { - println("o[", v.Key, v.Offset, v.Size, "]") - } - } + for k, v := range cm.list[0].overflow { + if k < 100 { + println("o[", v.Key, v.Offset, v.Size, "]") + } + } } diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index c5dca524d..ac6998337 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -11,9 +11,11 @@ type NeedleMap struct { m CompactMap //transient - bytes []byte - deletionCounter int - fileCounter int + bytes []byte + + deletionCounter int + fileCounter int + deletionByteCounter uint32 } func NewNeedleMap(file *os.File) *NeedleMap { @@ -43,13 +45,18 @@ func LoadNeedleMap(file *os.File) *NeedleMap { offset := util.BytesToUint32(bytes[i+8 : i+12]) size := util.BytesToUint32(bytes[i+12 : i+16]) if offset > 0 { - nm.m.Set(Key(key), offset, size) - //log.Println("reading key", key, "offset", offset, "size", size) + oldSize := nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) nm.fileCounter++ + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + oldSize + } } else { nm.m.Delete(Key(key)) - //log.Println("removing key", key) + //log.Println("removing key", key) nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + size } } @@ -59,11 +66,15 @@ func LoadNeedleMap(file *os.File) *NeedleMap { } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { - nm.m.Set(Key(key), offset, size) + oldSize := nm.m.Set(Key(key), offset, size) util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], offset) util.Uint32toBytes(nm.bytes[12:16], size) nm.fileCounter++ + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + oldSize + } return nm.indexFile.Write(nm.bytes) } func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { @@ -71,7 +82,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { return } func (nm *NeedleMap) Delete(key uint64) { - nm.m.Delete(Key(key)) + nm.deletionByteCounter = nm.deletionByteCounter + nm.m.Delete(Key(key)) util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], 0) util.Uint32toBytes(nm.bytes[12:16], 0) diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index fb49e9059..03d4357e4 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -69,6 +69,17 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { return nil } +func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false + } + garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) + if e != nil { + return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false + } + return nil, garbageThreshold < s.volumes[vid].garbageLevel() +} func (s *Store) CompactVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) if err != nil { @@ -76,10 +87,10 @@ func (s *Store) CompactVolume(volumeIdString string) error { } return s.volumes[vid].compact() } -func (s *Store) CommitCompactVolume(volumeIdString string) (int,error) { +func (s *Store) CommitCompactVolume(volumeIdString string) (error) { vid, err := NewVolumeId(volumeIdString) if err != nil { - return 0, errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") } return s.volumes[vid].commitCompact() } @@ -104,7 +115,7 @@ func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for k, v := range s.volumes { s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter + s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter stats = append(stats, s) } return stats @@ -113,7 +124,7 @@ func (s *Store) Join(mserver string) error { stats := new([]*VolumeInfo) for k, v := range s.volumes { s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter + s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter *stats = append(*stats, s) } bytes, _ := json.Marshal(stats) diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 4ada4e734..8c8a4ea88 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -55,13 +55,18 @@ func (v *Volume) load() error { return nil } func (v *Volume) Size() int64 { + v.accessLock.Lock() + defer v.accessLock.Unlock() stat, e := v.dataFile.Stat() if e == nil { return stat.Size() } + fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error()) return -1 } func (v *Volume) Close() { + v.accessLock.Lock() + defer v.accessLock.Unlock() v.nm.Close() v.dataFile.Close() } @@ -115,6 +120,7 @@ func (v *Volume) delete(n *Needle) uint32 { } return 0 } + func (v *Volume) read(n *Needle) (int, error) { v.accessLock.Lock() defer v.accessLock.Unlock() @@ -126,6 +132,10 @@ func (v *Volume) read(n *Needle) (int, error) { return -1, errors.New("Not Found") } +func (v *Volume) garbageLevel() float64 { + return float64(v.nm.deletionByteCounter)/float64(v.Size()) +} + func (v *Volume) compact() error { v.accessLock.Lock() defer v.accessLock.Unlock() @@ -133,21 +143,21 @@ func (v *Volume) compact() error { filePath := path.Join(v.dir, v.Id.String()) return v.copyDataAndGenerateIndexFile(filePath+".dat", filePath+".cpd", filePath+".cpx") } -func (v *Volume) commitCompact() (int, error) { +func (v *Volume) commitCompact() (error) { v.accessLock.Lock() defer v.accessLock.Unlock() v.dataFile.Close() var e error if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil { - return 0, e + return e } if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil { - return 0, e + return e } if e = v.load(); e != nil { - return 0, e + return e } - return 0, nil + return nil } func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) (err error) { @@ -183,7 +193,6 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) nv, ok := v.nm.Get(n.Id) //log.Println("file size is", n.Size, "rest", rest) if !ok || nv.Offset*8 != old_offset { - log.Println("expected offset should be", nv.Offset*8, "skipping", (rest - 16), "key", n.Id, "volume offset", old_offset, "data_size", n.Size, "rest", rest) src.Seek(int64(rest), 1) } else { if nv.Size > 0 { diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index 05b9e6205..dfedb0af3 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -9,4 +9,5 @@ type VolumeInfo struct { RepType ReplicationType FileCount int DeleteCount int + DeletedByteCount uint32 } diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index c3184b3da..81bad7da5 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -164,6 +164,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi } for _, v := range dn.volumes { if uint64(v.Size) >= volumeSizeLimit { + //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) n.GetTopology().chanFullVolumes <- &v } } diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go index e6232cec6..c8ad9b2e7 100644 --- a/weed-fs/src/pkg/topology/topology_compact.go +++ b/weed-fs/src/pkg/topology/topology_compact.go @@ -3,45 +3,90 @@ package topology import ( "encoding/json" "errors" - "fmt" + "fmt" "net/url" "pkg/storage" "pkg/util" "time" ) +func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + ch := make(chan bool, locationlist.Length()) + for index, dn := range locationlist.list { + go func(index int, url string, vid storage.VolumeId) { + //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url()) + if e, ret := vacuumVolume_Check(url, vid); e != nil { + //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e) + ch <- false + } else { + //fmt.Println(index, "Checked vacuuming", vid, "on", url) + ch <- ret + } + }(index, dn.Url(), vid) + } + isCheckSuccess := true + for _ = range locationlist.list { + select { + case canVacuum := <-ch: + isCheckSuccess = isCheckSuccess && canVacuum + case <-time.After(30 * time.Minute): + isCheckSuccess = false + break + } + } + return isCheckSuccess +} +func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + vl.removeFromWritable(vid) + ch := make(chan bool, locationlist.Length()) + for index, dn := range locationlist.list { + go func(index int, url string, vid storage.VolumeId) { + fmt.Println(index, "Start vacuuming", vid, "on", dn.Url()) + if e := vacuumVolume_Compact(url, vid); e != nil { + fmt.Println(index, "Error when vacuuming", vid, "on", url, e) + ch <- false + } else { + fmt.Println(index, "Complete vacuuming", vid, "on", url) + ch <- true + } + }(index, dn.Url(), vid) + } + isVacuumSuccess := true + for _ = range locationlist.list { + select { + case _ = <-ch: + case <-time.After(30 * time.Minute): + isVacuumSuccess = false + break + } + } + return isVacuumSuccess +} +func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + isCommitSuccess := true + for _, dn := range locationlist.list { + fmt.Println("Start Commiting vacuum", vid, "on", dn.Url()) + if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { + fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e) + isCommitSuccess = false + } else { + fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url()) + } + } + if isCommitSuccess { + vl.setVolumeWritable(vid) + } + return isCommitSuccess +} func (t *Topology) Vacuum() int { - total_counter := 0 for _, vl := range t.replicaType2VolumeLayout { if vl != nil { for vid, locationlist := range vl.vid2location { - each_volume_counter := 0 - vl.removeFromWritable(vid) - ch := make(chan int, locationlist.Length()) - for _, dn := range locationlist.list { - go func(url string, vid storage.VolumeId) { - vacuumVolume_Compact(url, vid) - }(dn.Url(), vid) - } - for _ = range locationlist.list { - select { - case count := <-ch: - each_volume_counter += count - case <-time.After(30 * time.Minute): - each_volume_counter = 0 - break + if batchVacuumVolumeCheck(vl, vid, locationlist) { + if batchVacuumVolumeCompact(vl, vid, locationlist) { + batchVacuumVolumeCommit(vl, vid, locationlist) } } - if each_volume_counter > 0 { - for _, dn := range locationlist.list { - if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { - fmt.Println("Error when committing on", dn.Url(), e) - panic(e) - } - } - vl.setVolumeWritable(vid) - total_counter += each_volume_counter - } } } } @@ -49,25 +94,42 @@ func (t *Topology) Vacuum() int { } type VacuumVolumeResult struct { - Bytes int - Error string + Result bool + Error string } -func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) (error, int) { +func vacuumVolume_Check(urlLocation string, vid storage.VolumeId) (error, bool) { + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("garbageThreshold", "0.3") + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) + if err != nil { + return err, false + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err, false + } + if ret.Error != "" { + return errors.New(ret.Error), false + } + return nil, ret.Result +} +func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { values := make(url.Values) values.Add("volume", vid.String()) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values) if err != nil { - return err, 0 + return err } var ret VacuumVolumeResult if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err, 0 + return err } if ret.Error != "" { - return errors.New(ret.Error), 0 + return errors.New(ret.Error) } - return nil, ret.Bytes + return nil } func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { values := make(url.Values) diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index 230ea1071..322d6c27a 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -15,6 +15,12 @@ func (t *Topology) StartRefreshWritableVolumes() { time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) } }() + go func() { + c := time.Tick(15 * time.Minute) + for _ = range c { + t.Vacuum() + } + }() go func() { for { select {