2014-05-20 02:24:35 +00:00
package storage
import (
2014-05-20 03:54:39 +00:00
"fmt"
2014-05-20 02:24:35 +00:00
"os"
2014-09-20 19:38:59 +00:00
"time"
2014-10-26 18:34:55 +00:00
2016-06-03 01:09:14 +00:00
"github.com/chrislusf/seaweedfs/weed/glog"
2019-04-19 04:43:36 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2018-07-22 00:39:10 +00:00
. "github.com/chrislusf/seaweedfs/weed/storage/types"
2016-09-29 05:57:23 +00:00
"github.com/chrislusf/seaweedfs/weed/util"
2014-05-20 02:24:35 +00:00
)
func ( v * Volume ) garbageLevel ( ) float64 {
2018-06-25 08:20:15 +00:00
if v . ContentSize ( ) == 0 {
return 0
}
2014-05-20 02:24:35 +00:00
return float64 ( v . nm . DeletedSize ( ) ) / float64 ( v . ContentSize ( ) )
}
2019-05-04 00:22:39 +00:00
func ( v * Volume ) Compact ( preallocate int64 , compactionBytePerSecond int64 ) error {
2018-10-19 03:47:30 +00:00
glog . V ( 3 ) . Infof ( "Compacting volume %d ..." , v . Id )
2014-09-20 19:38:59 +00:00
//no need to lock for copy on write
//v.accessLock.Lock()
//defer v.accessLock.Unlock()
//glog.V(3).Infof("Got Compaction lock...")
2014-05-20 02:24:35 +00:00
filePath := v . FileName ( )
2016-10-07 08:22:24 +00:00
v . lastCompactIndexOffset = v . nm . IndexFileSize ( )
2019-04-19 19:29:49 +00:00
v . lastCompactRevision = v . SuperBlock . CompactionRevision
2016-10-07 08:22:24 +00:00
glog . V ( 3 ) . Infof ( "creating copies for volume %d ,last offset %d..." , v . Id , v . lastCompactIndexOffset )
2019-05-04 00:22:39 +00:00
return v . copyDataAndGenerateIndexFile ( filePath + ".cpd" , filePath + ".cpx" , preallocate , compactionBytePerSecond )
2014-05-20 02:24:35 +00:00
}
2016-09-23 03:31:17 +00:00
func ( v * Volume ) Compact2 ( ) error {
2018-10-19 03:47:30 +00:00
glog . V ( 3 ) . Infof ( "Compact2 volume %d ..." , v . Id )
2016-09-23 03:31:17 +00:00
filePath := v . FileName ( )
glog . V ( 3 ) . Infof ( "creating copies for volume %d ..." , v . Id )
return v . copyDataBasedOnIndexFile ( filePath + ".cpd" , filePath + ".cpx" )
}
2019-03-25 16:16:12 +00:00
func ( v * Volume ) CommitCompact ( ) error {
2018-10-19 03:47:30 +00:00
glog . V ( 0 ) . Infof ( "Committing volume %d vacuuming..." , v . Id )
2015-05-23 17:16:01 +00:00
v . dataFileAccessLock . Lock ( )
defer v . dataFileAccessLock . Unlock ( )
2018-10-19 03:47:30 +00:00
glog . V ( 3 ) . Infof ( "Got volume %d committing lock..." , v . Id )
2019-01-06 03:52:17 +00:00
v . compactingWg . Add ( 1 )
defer v . compactingWg . Done ( )
2016-05-09 17:04:21 +00:00
v . nm . Close ( )
2018-11-04 07:28:24 +00:00
if err := v . dataFile . Close ( ) ; err != nil {
glog . V ( 0 ) . Infof ( "fail to close volume %d" , v . Id )
}
v . dataFile = nil
2016-09-29 05:57:23 +00:00
2014-05-20 02:24:35 +00:00
var e error
2016-09-29 05:57:23 +00:00
if e = v . makeupDiff ( v . FileName ( ) + ".cpd" , v . FileName ( ) + ".cpx" , v . FileName ( ) + ".dat" , v . FileName ( ) + ".idx" ) ; e != nil {
2019-03-25 16:16:12 +00:00
glog . V ( 0 ) . Infof ( "makeupDiff in CommitCompact volume %d failed %v" , v . Id , e )
2016-09-29 05:57:23 +00:00
e = os . Remove ( v . FileName ( ) + ".cpd" )
if e != nil {
return e
}
e = os . Remove ( v . FileName ( ) + ".cpx" )
if e != nil {
return e
}
} else {
var e error
if e = os . Rename ( v . FileName ( ) + ".cpd" , v . FileName ( ) + ".dat" ) ; e != nil {
2018-11-04 07:28:24 +00:00
return fmt . Errorf ( "rename %s: %v" , v . FileName ( ) + ".cpd" , e )
2016-09-29 05:57:23 +00:00
}
if e = os . Rename ( v . FileName ( ) + ".cpx" , v . FileName ( ) + ".idx" ) ; e != nil {
2018-11-04 07:28:24 +00:00
return fmt . Errorf ( "rename %s: %v" , v . FileName ( ) + ".cpx" , e )
2016-09-29 05:57:23 +00:00
}
2014-05-20 02:24:35 +00:00
}
2016-09-29 05:57:23 +00:00
2014-05-27 00:34:54 +00:00
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
2018-09-27 01:45:51 +00:00
os . RemoveAll ( v . FileName ( ) + ".ldb" )
os . RemoveAll ( v . FileName ( ) + ".bdb" )
2018-10-19 03:47:30 +00:00
glog . V ( 3 ) . Infof ( "Loading volume %d commit file..." , v . Id )
2017-01-08 19:01:46 +00:00
if e = v . load ( true , false , v . needleMapKind , 0 ) ; e != nil {
2014-05-20 02:24:35 +00:00
return e
}
return nil
}
2014-05-20 03:54:39 +00:00
2017-08-30 06:59:53 +00:00
func ( v * Volume ) cleanupCompact ( ) error {
2018-10-19 03:47:30 +00:00
glog . V ( 0 ) . Infof ( "Cleaning up volume %d vacuuming..." , v . Id )
2017-08-30 06:59:53 +00:00
e1 := os . Remove ( v . FileName ( ) + ".cpd" )
e2 := os . Remove ( v . FileName ( ) + ".cpx" )
if e1 != nil {
return e1
}
if e2 != nil {
return e2
}
return nil
}
2016-10-07 08:22:24 +00:00
func fetchCompactRevisionFromDatFile ( file * os . File ) ( compactRevision uint16 , err error ) {
2018-06-24 18:37:08 +00:00
superBlock , err := ReadSuperBlock ( file )
2016-10-07 08:22:24 +00:00
if err != nil {
return 0 , err
}
2019-04-19 19:29:49 +00:00
return superBlock . CompactionRevision , nil
2016-10-07 08:22:24 +00:00
}
2016-09-29 05:57:23 +00:00
func ( v * Volume ) makeupDiff ( newDatFileName , newIdxFileName , oldDatFileName , oldIdxFileName string ) ( err error ) {
var indexSize int64
oldIdxFile , err := os . Open ( oldIdxFileName )
defer oldIdxFile . Close ( )
oldDatFile , err := os . Open ( oldDatFileName )
defer oldDatFile . Close ( )
if indexSize , err = verifyIndexFileIntegrity ( oldIdxFile ) ; err != nil {
return fmt . Errorf ( "verifyIndexFileIntegrity %s failed: %v" , oldIdxFileName , err )
}
2016-10-07 08:22:24 +00:00
if indexSize == 0 || uint64 ( indexSize ) <= v . lastCompactIndexOffset {
2016-09-29 05:57:23 +00:00
return nil
}
2016-10-07 08:22:24 +00:00
oldDatCompactRevision , err := fetchCompactRevisionFromDatFile ( oldDatFile )
if err != nil {
2018-06-24 01:24:59 +00:00
return fmt . Errorf ( "fetchCompactRevisionFromDatFile src %s failed: %v" , oldDatFile . Name ( ) , err )
2016-10-07 08:22:24 +00:00
}
if oldDatCompactRevision != v . lastCompactRevision {
return fmt . Errorf ( "current old dat file's compact revision %d is not the expected one %d" , oldDatCompactRevision , v . lastCompactRevision )
}
type keyField struct {
2018-07-08 09:28:04 +00:00
offset Offset
2016-10-07 08:22:24 +00:00
size uint32
}
2018-07-08 09:28:04 +00:00
incrementedHasUpdatedIndexEntry := make ( map [ NeedleId ] keyField )
2016-10-07 08:22:24 +00:00
2019-04-19 07:39:34 +00:00
for idxOffset := indexSize - NeedleMapEntrySize ; uint64 ( idxOffset ) >= v . lastCompactIndexOffset ; idxOffset -= NeedleMapEntrySize {
2016-09-29 05:57:23 +00:00
var IdxEntry [ ] byte
2019-01-17 01:17:19 +00:00
if IdxEntry , err = readIndexEntryAtOffset ( oldIdxFile , idxOffset ) ; err != nil {
return fmt . Errorf ( "readIndexEntry %s at offset %d failed: %v" , oldIdxFileName , idxOffset , err )
2016-09-29 05:57:23 +00:00
}
2018-07-08 09:28:04 +00:00
key , offset , size := IdxFileEntry ( IdxEntry )
2018-07-07 07:48:58 +00:00
glog . V ( 4 ) . Infof ( "key %d offset %d size %d" , key , offset , size )
2016-10-07 08:22:24 +00:00
if _ , found := incrementedHasUpdatedIndexEntry [ key ] ; ! found {
incrementedHasUpdatedIndexEntry [ key ] = keyField {
2016-09-29 05:57:23 +00:00
offset : offset ,
size : size ,
}
}
}
2018-06-24 01:24:59 +00:00
// no updates during commit step
if len ( incrementedHasUpdatedIndexEntry ) == 0 {
return nil
}
2016-09-29 05:57:23 +00:00
2018-06-24 01:24:59 +00:00
// deal with updates during commit step
var (
dst , idx * os . File
)
if dst , err = os . OpenFile ( newDatFileName , os . O_RDWR , 0644 ) ; err != nil {
return fmt . Errorf ( "open dat file %s failed: %v" , newDatFileName , err )
}
defer dst . Close ( )
2016-09-29 05:57:23 +00:00
2018-06-24 01:24:59 +00:00
if idx , err = os . OpenFile ( newIdxFileName , os . O_RDWR , 0644 ) ; err != nil {
return fmt . Errorf ( "open idx file %s failed: %v" , newIdxFileName , err )
}
defer idx . Close ( )
var newDatCompactRevision uint16
newDatCompactRevision , err = fetchCompactRevisionFromDatFile ( dst )
if err != nil {
return fmt . Errorf ( "fetchCompactRevisionFromDatFile dst %s failed: %v" , dst . Name ( ) , err )
}
if oldDatCompactRevision + 1 != newDatCompactRevision {
return fmt . Errorf ( "oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d" , oldDatFileName , oldDatCompactRevision , newDatFileName , newDatCompactRevision )
}
2016-10-07 08:22:24 +00:00
2019-01-16 09:48:59 +00:00
idxEntryBytes := make ( [ ] byte , NeedleIdSize + OffsetSize + SizeSize )
for key , increIdxEntry := range incrementedHasUpdatedIndexEntry {
NeedleIdToBytes ( idxEntryBytes [ 0 : NeedleIdSize ] , key )
OffsetToBytes ( idxEntryBytes [ NeedleIdSize : NeedleIdSize + OffsetSize ] , increIdxEntry . offset )
util . Uint32toBytes ( idxEntryBytes [ NeedleIdSize + OffsetSize : NeedleIdSize + OffsetSize + SizeSize ] , increIdxEntry . size )
2016-09-29 05:57:23 +00:00
2018-06-24 01:24:59 +00:00
var offset int64
if offset , err = dst . Seek ( 0 , 2 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to seek the end of file: %v" , err )
return
}
//ensure file writing starting from aligned positions
if offset % NeedlePaddingSize != 0 {
offset = offset + ( NeedlePaddingSize - offset % NeedlePaddingSize )
if offset , err = v . dataFile . Seek ( offset , 0 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to align in datafile %s: %v" , v . dataFile . Name ( ) , err )
2016-09-29 05:57:23 +00:00
return
}
2018-06-24 01:24:59 +00:00
}
2016-10-07 08:22:24 +00:00
2018-06-24 01:24:59 +00:00
//updated needle
2019-04-09 02:40:56 +00:00
if ! increIdxEntry . offset . IsZero ( ) && increIdxEntry . size != 0 && increIdxEntry . size != TombstoneFileSize {
2018-06-24 01:24:59 +00:00
//even the needle cache in memory is hit, the need_bytes is correct
2019-04-09 02:40:56 +00:00
glog . V ( 4 ) . Infof ( "file %d offset %d size %d" , key , increIdxEntry . offset . ToAcutalOffset ( ) , increIdxEntry . size )
2019-01-16 09:48:59 +00:00
var needleBytes [ ] byte
2019-04-19 04:43:36 +00:00
needleBytes , err = needle . ReadNeedleBlob ( oldDatFile , increIdxEntry . offset . ToAcutalOffset ( ) , increIdxEntry . size , v . Version ( ) )
2018-06-24 01:24:59 +00:00
if err != nil {
2019-04-09 02:40:56 +00:00
return fmt . Errorf ( "ReadNeedleBlob %s key %d offset %d size %d failed: %v" , oldDatFile . Name ( ) , key , increIdxEntry . offset . ToAcutalOffset ( ) , increIdxEntry . size , err )
2016-10-07 08:22:24 +00:00
}
2019-01-16 09:48:59 +00:00
dst . Write ( needleBytes )
util . Uint32toBytes ( idxEntryBytes [ 8 : 12 ] , uint32 ( offset / NeedlePaddingSize ) )
2018-06-24 01:24:59 +00:00
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
2019-04-19 04:43:36 +00:00
fakeDelNeedle := new ( needle . Needle )
2018-06-24 01:24:59 +00:00
fakeDelNeedle . Id = key
fakeDelNeedle . Cookie = 0x12345678
2018-07-24 09:44:33 +00:00
fakeDelNeedle . AppendAtNs = uint64 ( time . Now ( ) . UnixNano ( ) )
2018-12-31 23:08:32 +00:00
_ , _ , _ , err = fakeDelNeedle . Append ( dst , v . Version ( ) )
2018-06-24 01:24:59 +00:00
if err != nil {
return fmt . Errorf ( "append deleted %d failed: %v" , key , err )
2016-10-07 08:22:24 +00:00
}
2019-01-16 09:48:59 +00:00
util . Uint32toBytes ( idxEntryBytes [ 8 : 12 ] , uint32 ( 0 ) )
2018-06-24 01:24:59 +00:00
}
if _ , err := idx . Seek ( 0 , 2 ) ; err != nil {
return fmt . Errorf ( "cannot seek end of indexfile %s: %v" ,
newIdxFileName , err )
2016-09-29 05:57:23 +00:00
}
2019-01-16 09:48:59 +00:00
_ , err = idx . Write ( idxEntryBytes )
2016-09-29 05:57:23 +00:00
}
2016-09-27 05:30:44 +00:00
return nil
2016-09-27 05:26:39 +00:00
}
2019-01-16 09:48:59 +00:00
type VolumeFileScanner4Vacuum struct {
2019-05-04 00:22:39 +00:00
version needle . Version
v * Volume
dst * os . File
nm * NeedleMap
newOffset int64
now uint64
compactionBytePerSecond int64
lastSizeCounter int64
lastSizeCheckTime time . Time
2019-01-16 09:48:59 +00:00
}
func ( scanner * VolumeFileScanner4Vacuum ) VisitSuperBlock ( superBlock SuperBlock ) error {
scanner . version = superBlock . Version ( )
2019-04-19 19:29:49 +00:00
superBlock . CompactionRevision ++
2019-01-16 09:48:59 +00:00
_ , err := scanner . dst . Write ( superBlock . Bytes ( ) )
scanner . newOffset = int64 ( superBlock . BlockSize ( ) )
return err
}
func ( scanner * VolumeFileScanner4Vacuum ) ReadNeedleBody ( ) bool {
return true
}
2019-04-19 04:43:36 +00:00
func ( scanner * VolumeFileScanner4Vacuum ) VisitNeedle ( n * needle . Needle , offset int64 ) error {
2019-01-16 09:48:59 +00:00
if n . HasTtl ( ) && scanner . now >= n . LastModified + uint64 ( scanner . v . Ttl . Minutes ( ) * 60 ) {
return nil
}
nv , ok := scanner . v . nm . Get ( n . Id )
glog . V ( 4 ) . Infoln ( "needle expected offset " , offset , "ok" , ok , "nv" , nv )
2019-04-09 02:40:56 +00:00
if ok && nv . Offset . ToAcutalOffset ( ) == offset && nv . Size > 0 && nv . Size != TombstoneFileSize {
if err := scanner . nm . Put ( n . Id , ToOffset ( scanner . newOffset ) , n . Size ) ; err != nil {
2019-01-16 09:48:59 +00:00
return fmt . Errorf ( "cannot put needle: %s" , err )
}
if _ , _ , _ , err := n . Append ( scanner . dst , scanner . v . Version ( ) ) ; err != nil {
return fmt . Errorf ( "cannot append needle: %s" , err )
}
2019-05-04 00:22:39 +00:00
delta := n . DiskSize ( scanner . version )
scanner . newOffset += delta
scanner . maybeSlowdown ( delta )
2019-01-16 09:48:59 +00:00
glog . V ( 4 ) . Infoln ( "saving key" , n . Id , "volume offset" , offset , "=>" , scanner . newOffset , "data_size" , n . Size )
}
return nil
}
2019-05-04 00:22:39 +00:00
func ( scanner * VolumeFileScanner4Vacuum ) maybeSlowdown ( delta int64 ) {
if scanner . compactionBytePerSecond > 0 {
scanner . lastSizeCounter += delta
now := time . Now ( )
elapsedDuration := now . Sub ( scanner . lastSizeCheckTime )
if elapsedDuration > 100 * time . Millisecond {
overLimitBytes := scanner . lastSizeCounter - scanner . compactionBytePerSecond / 10
if overLimitBytes > 0 {
overRatio := float64 ( overLimitBytes ) / float64 ( scanner . compactionBytePerSecond )
sleepTime := time . Duration ( overRatio * 1000 ) * time . Millisecond
// glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", scanner.lastSizeCounter, scanner.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio)
time . Sleep ( sleepTime )
}
scanner . lastSizeCounter , scanner . lastSizeCheckTime = 0 , time . Now ( )
}
}
}
2019-01-16 09:48:59 +00:00
2019-05-04 00:22:39 +00:00
func ( v * Volume ) copyDataAndGenerateIndexFile ( dstName , idxName string , preallocate int64 , compactionBytePerSecond int64 ) ( err error ) {
2014-05-20 03:54:39 +00:00
var (
dst , idx * os . File
)
2017-01-08 19:01:46 +00:00
if dst , err = createVolumeFile ( dstName , preallocate ) ; err != nil {
2014-05-20 03:54:39 +00:00
return
}
defer dst . Close ( )
if idx , err = os . OpenFile ( idxName , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 ) ; err != nil {
return
}
defer idx . Close ( )
2019-01-16 09:48:59 +00:00
scanner := & VolumeFileScanner4Vacuum {
2019-05-04 00:22:39 +00:00
v : v ,
now : uint64 ( time . Now ( ) . Unix ( ) ) ,
nm : NewBtreeNeedleMap ( idx ) ,
dst : dst ,
compactionBytePerSecond : compactionBytePerSecond ,
lastSizeCheckTime : time . Now ( ) ,
2019-01-16 09:48:59 +00:00
}
err = ScanVolumeFile ( v . dir , v . Collection , v . Id , v . needleMapKind , scanner )
2014-05-20 03:54:39 +00:00
return
}
2016-09-23 03:31:17 +00:00
func ( v * Volume ) copyDataBasedOnIndexFile ( dstName , idxName string ) ( err error ) {
var (
dst , idx , oldIndexFile * os . File
)
if dst , err = os . OpenFile ( dstName , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 ) ; err != nil {
return
}
defer dst . Close ( )
if idx , err = os . OpenFile ( idxName , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 ) ; err != nil {
return
}
defer idx . Close ( )
if oldIndexFile , err = os . OpenFile ( v . FileName ( ) + ".idx" , os . O_RDONLY , 0644 ) ; err != nil {
return
}
defer oldIndexFile . Close ( )
2017-05-27 05:51:25 +00:00
nm := NewBtreeNeedleMap ( idx )
2016-09-23 03:31:17 +00:00
now := uint64 ( time . Now ( ) . Unix ( ) )
2019-04-19 19:29:49 +00:00
v . SuperBlock . CompactionRevision ++
2016-09-23 03:31:17 +00:00
dst . Write ( v . SuperBlock . Bytes ( ) )
2019-01-16 09:48:59 +00:00
newOffset := int64 ( v . SuperBlock . BlockSize ( ) )
2016-09-23 03:31:17 +00:00
2018-07-08 09:28:04 +00:00
WalkIndexFile ( oldIndexFile , func ( key NeedleId , offset Offset , size uint32 ) error {
2019-04-09 02:40:56 +00:00
if offset . IsZero ( ) || size == TombstoneFileSize {
2016-09-23 03:31:17 +00:00
return nil
}
nv , ok := v . nm . Get ( key )
if ! ok {
return nil
}
2019-04-19 04:43:36 +00:00
n := new ( needle . Needle )
2019-04-09 02:40:56 +00:00
err := n . ReadData ( v . dataFile , offset . ToAcutalOffset ( ) , size , v . Version ( ) )
2019-03-19 12:34:43 +00:00
if err != nil {
return nil
}
2016-09-23 03:31:17 +00:00
if n . HasTtl ( ) && now >= n . LastModified + uint64 ( v . Ttl . Minutes ( ) * 60 ) {
return nil
}
glog . V ( 4 ) . Infoln ( "needle expected offset " , offset , "ok" , ok , "nv" , nv )
if nv . Offset == offset && nv . Size > 0 {
2019-04-09 02:40:56 +00:00
if err = nm . Put ( n . Id , ToOffset ( newOffset ) , n . Size ) ; err != nil {
2016-09-23 03:31:17 +00:00
return fmt . Errorf ( "cannot put needle: %s" , err )
}
2018-12-31 23:08:32 +00:00
if _ , _ , _ , err = n . Append ( dst , v . Version ( ) ) ; err != nil {
2016-09-23 03:31:17 +00:00
return fmt . Errorf ( "cannot append needle: %s" , err )
}
2019-01-16 09:48:59 +00:00
newOffset += n . DiskSize ( v . Version ( ) )
glog . V ( 3 ) . Infoln ( "saving key" , n . Id , "volume offset" , offset , "=>" , newOffset , "data_size" , n . Size )
2016-09-23 03:31:17 +00:00
}
return nil
} )
return
}