diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index d8141cf71..bbe17071e 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -236,15 +236,13 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } type VolumeFileScanner4Vacuum struct { - version needle.Version - v *Volume - dst *os.File - nm *NeedleMap - newOffset int64 - now uint64 - compactionBytePerSecond int64 - lastSizeCounter int64 - lastSizeCheckTime time.Time + version needle.Version + v *Volume + dst *os.File + nm *NeedleMap + newOffset int64 + now uint64 + writeThrottler *util.WriteThrottler } func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error { @@ -274,28 +272,11 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in } delta := n.DiskSize(scanner.version) scanner.newOffset += delta - scanner.maybeSlowdown(delta) + scanner.writeThrottler.MaybeSlowdown(delta) glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", scanner.newOffset, "data_size", n.Size) } return nil } -func (scanner *VolumeFileScanner4Vacuum) maybeSlowdown(delta int64) { - if scanner.compactionBytePerSecond > 0 { - scanner.lastSizeCounter += delta - now := time.Now() - elapsedDuration := now.Sub(scanner.lastSizeCheckTime) - if elapsedDuration > 100*time.Millisecond { - overLimitBytes := scanner.lastSizeCounter - scanner.compactionBytePerSecond/10 - if overLimitBytes > 0 { - overRatio := float64(overLimitBytes) / float64(scanner.compactionBytePerSecond) - sleepTime := time.Duration(overRatio*1000) * time.Millisecond - // glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", scanner.lastSizeCounter, scanner.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio) - time.Sleep(sleepTime) - } - scanner.lastSizeCounter, scanner.lastSizeCheckTime = 0, time.Now() - } - } -} func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { var ( @@ -312,12 +293,11 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca defer idx.Close() scanner := &VolumeFileScanner4Vacuum{ - v: v, - now: uint64(time.Now().Unix()), - nm: NewBtreeNeedleMap(idx), - dst: dst, - compactionBytePerSecond: compactionBytePerSecond, - lastSizeCheckTime: time.Now(), + v: v, + now: uint64(time.Now().Unix()), + nm: NewBtreeNeedleMap(idx), + dst: dst, + writeThrottler: util.NewWriteThrottler(compactionBytePerSecond), } err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) return diff --git a/weed/util/throttler.go b/weed/util/throttler.go new file mode 100644 index 000000000..873161e37 --- /dev/null +++ b/weed/util/throttler.go @@ -0,0 +1,34 @@ +package util + +import "time" + +type WriteThrottler struct { + compactionBytePerSecond int64 + lastSizeCounter int64 + lastSizeCheckTime time.Time +} + +func NewWriteThrottler(bytesPerSecond int64) *WriteThrottler { + return &WriteThrottler{ + compactionBytePerSecond: bytesPerSecond, + lastSizeCheckTime: time.Now(), + } +} + +func (wt *WriteThrottler) MaybeSlowdown(delta int64) { + if wt.compactionBytePerSecond > 0 { + wt.lastSizeCounter += delta + now := time.Now() + elapsedDuration := now.Sub(wt.lastSizeCheckTime) + if elapsedDuration > 100*time.Millisecond { + overLimitBytes := wt.lastSizeCounter - wt.compactionBytePerSecond/10 + if overLimitBytes > 0 { + overRatio := float64(overLimitBytes) / float64(wt.compactionBytePerSecond) + sleepTime := time.Duration(overRatio*1000) * time.Millisecond + // glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", wt.lastSizeCounter, wt.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio) + time.Sleep(sleepTime) + } + wt.lastSizeCounter, wt.lastSizeCheckTime = 0, time.Now() + } + } +}