2014-05-20 02:24:35 +00:00
package storage
import (
2014-05-20 03:54:39 +00:00
"fmt"
2014-05-20 02:24:35 +00:00
"os"
2014-09-20 19:38:59 +00:00
"time"
2014-10-26 18:34:55 +00:00
2019-09-12 13:18:21 +00:00
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/stats"
2019-10-29 07:35:16 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/backend"
2019-09-12 13:18:21 +00:00
idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
2014-05-20 02:24:35 +00:00
)
func ( v * Volume ) garbageLevel ( ) float64 {
2018-06-25 08:20:15 +00:00
if v . ContentSize ( ) == 0 {
return 0
}
2019-08-14 08:08:01 +00:00
return float64 ( v . DeletedSize ( ) ) / float64 ( v . ContentSize ( ) )
2014-05-20 02:24:35 +00:00
}
2019-09-03 16:00:59 +00:00
func ( v * Volume ) Compact ( preallocate int64 , compactionBytePerSecond int64 ) error {
2019-09-03 14:05:43 +00:00
2019-10-30 06:18:01 +00:00
if v . MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory
2019-09-03 14:05:43 +00:00
glog . V ( 3 ) . Infof ( "Compacting volume %d ..." , v . Id )
//no need to lock for copy on write
//v.accessLock.Lock()
//defer v.accessLock.Unlock()
//glog.V(3).Infof("Got Compaction lock...")
v . isCompacting = true
defer func ( ) {
v . isCompacting = false
} ( )
filePath := v . FileName ( )
v . lastCompactIndexOffset = v . IndexFileSize ( )
v . lastCompactRevision = v . SuperBlock . CompactionRevision
glog . V ( 3 ) . Infof ( "creating copies for volume %d ,last offset %d..." , v . Id , v . lastCompactIndexOffset )
2019-09-03 16:00:59 +00:00
return v . copyDataAndGenerateIndexFile ( filePath + ".cpd" , filePath + ".cpx" , preallocate , compactionBytePerSecond )
2019-09-03 14:05:43 +00:00
} else {
return nil
}
2014-05-20 02:24:35 +00:00
}
2016-09-23 03:31:17 +00:00
func ( v * Volume ) Compact2 ( ) error {
2019-08-12 07:53:50 +00:00
2019-10-30 06:18:01 +00:00
if v . MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory
2019-09-03 14:05:43 +00:00
glog . V ( 3 ) . Infof ( "Compact2 volume %d ..." , v . Id )
2019-08-12 07:53:50 +00:00
2019-09-03 14:05:43 +00:00
v . isCompacting = true
defer func ( ) {
v . isCompacting = false
} ( )
filePath := v . FileName ( )
glog . V ( 3 ) . Infof ( "creating copies for volume %d ..." , v . Id )
return v . copyDataBasedOnIndexFile ( filePath + ".cpd" , filePath + ".cpx" )
} else {
return nil
}
2016-09-23 03:31:17 +00:00
}
2019-03-25 16:16:12 +00:00
func ( v * Volume ) CommitCompact ( ) error {
2019-10-30 06:18:01 +00:00
if v . MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory
2019-09-03 14:05:43 +00:00
glog . V ( 0 ) . Infof ( "Committing volume %d vacuuming..." , v . Id )
v . isCompacting = true
defer func ( ) {
v . isCompacting = false
} ( )
v . dataFileAccessLock . Lock ( )
defer v . dataFileAccessLock . Unlock ( )
glog . V ( 3 ) . Infof ( "Got volume %d committing lock..." , v . Id )
v . nm . Close ( )
2019-10-29 07:35:16 +00:00
if err := v . DataBackend . Close ( ) ; err != nil {
2019-09-03 14:05:43 +00:00
glog . V ( 0 ) . Infof ( "fail to close volume %d" , v . Id )
2016-09-29 05:57:23 +00:00
}
2019-10-29 07:35:16 +00:00
v . DataBackend = nil
2019-09-03 14:05:43 +00:00
stats . VolumeServerVolumeCounter . WithLabelValues ( v . Collection , "volume" ) . Dec ( )
2016-09-29 05:57:23 +00:00
var e error
2019-09-03 14:05:43 +00:00
if e = v . makeupDiff ( v . FileName ( ) + ".cpd" , v . FileName ( ) + ".cpx" , v . FileName ( ) + ".dat" , v . FileName ( ) + ".idx" ) ; e != nil {
glog . V ( 0 ) . Infof ( "makeupDiff in CommitCompact volume %d failed %v" , v . Id , e )
e = os . Remove ( v . FileName ( ) + ".cpd" )
if e != nil {
return e
}
e = os . Remove ( v . FileName ( ) + ".cpx" )
if e != nil {
return e
}
} else {
var e error
if e = os . Rename ( v . FileName ( ) + ".cpd" , v . FileName ( ) + ".dat" ) ; e != nil {
return fmt . Errorf ( "rename %s: %v" , v . FileName ( ) + ".cpd" , e )
}
if e = os . Rename ( v . FileName ( ) + ".cpx" , v . FileName ( ) + ".idx" ) ; e != nil {
return fmt . Errorf ( "rename %s: %v" , v . FileName ( ) + ".cpx" , e )
}
2016-09-29 05:57:23 +00:00
}
2019-09-03 14:05:43 +00:00
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
2018-09-27 01:45:51 +00:00
2019-09-03 14:05:43 +00:00
os . RemoveAll ( v . FileName ( ) + ".ldb" )
os . RemoveAll ( v . FileName ( ) + ".bdb" )
2018-09-27 01:45:51 +00:00
2019-09-03 14:05:43 +00:00
glog . V ( 3 ) . Infof ( "Loading volume %d commit file..." , v . Id )
2019-09-03 16:00:59 +00:00
if e = v . load ( true , false , v . needleMapKind , 0 ) ; e != nil {
2019-09-03 14:05:43 +00:00
return e
}
2014-05-20 02:24:35 +00:00
}
return nil
}
2014-05-20 03:54:39 +00:00
2017-08-30 06:59:53 +00:00
func ( v * Volume ) cleanupCompact ( ) error {
2018-10-19 03:47:30 +00:00
glog . V ( 0 ) . Infof ( "Cleaning up volume %d vacuuming..." , v . Id )
2017-08-30 06:59:53 +00:00
e1 := os . Remove ( v . FileName ( ) + ".cpd" )
e2 := os . Remove ( v . FileName ( ) + ".cpx" )
if e1 != nil {
return e1
}
if e2 != nil {
return e2
}
return nil
}
2019-10-29 07:35:16 +00:00
func fetchCompactRevisionFromDatFile ( datBackend backend . DataStorageBackend ) ( compactRevision uint16 , err error ) {
superBlock , err := ReadSuperBlock ( datBackend )
2016-10-07 08:22:24 +00:00
if err != nil {
return 0 , err
}
2019-04-19 19:29:49 +00:00
return superBlock . CompactionRevision , nil
2016-10-07 08:22:24 +00:00
}
2016-09-29 05:57:23 +00:00
func ( v * Volume ) makeupDiff ( newDatFileName , newIdxFileName , oldDatFileName , oldIdxFileName string ) ( err error ) {
var indexSize int64
oldIdxFile , err := os . Open ( oldIdxFileName )
defer oldIdxFile . Close ( )
oldDatFile , err := os . Open ( oldDatFileName )
2019-10-30 05:37:36 +00:00
oldDatBackend := backend . NewDiskFile ( oldDatFile )
2019-10-29 07:35:16 +00:00
defer oldDatBackend . Close ( )
2016-09-29 05:57:23 +00:00
if indexSize , err = verifyIndexFileIntegrity ( oldIdxFile ) ; err != nil {
return fmt . Errorf ( "verifyIndexFileIntegrity %s failed: %v" , oldIdxFileName , err )
}
2016-10-07 08:22:24 +00:00
if indexSize == 0 || uint64 ( indexSize ) <= v . lastCompactIndexOffset {
2016-09-29 05:57:23 +00:00
return nil
}
2019-10-29 07:35:16 +00:00
oldDatCompactRevision , err := fetchCompactRevisionFromDatFile ( oldDatBackend )
2016-10-07 08:22:24 +00:00
if err != nil {
2018-06-24 01:24:59 +00:00
return fmt . Errorf ( "fetchCompactRevisionFromDatFile src %s failed: %v" , oldDatFile . Name ( ) , err )
2016-10-07 08:22:24 +00:00
}
if oldDatCompactRevision != v . lastCompactRevision {
return fmt . Errorf ( "current old dat file's compact revision %d is not the expected one %d" , oldDatCompactRevision , v . lastCompactRevision )
}
type keyField struct {
2018-07-08 09:28:04 +00:00
offset Offset
2016-10-07 08:22:24 +00:00
size uint32
}
2018-07-08 09:28:04 +00:00
incrementedHasUpdatedIndexEntry := make ( map [ NeedleId ] keyField )
2016-10-07 08:22:24 +00:00
2019-04-19 07:39:34 +00:00
for idxOffset := indexSize - NeedleMapEntrySize ; uint64 ( idxOffset ) >= v . lastCompactIndexOffset ; idxOffset -= NeedleMapEntrySize {
2016-09-29 05:57:23 +00:00
var IdxEntry [ ] byte
2019-01-17 01:17:19 +00:00
if IdxEntry , err = readIndexEntryAtOffset ( oldIdxFile , idxOffset ) ; err != nil {
return fmt . Errorf ( "readIndexEntry %s at offset %d failed: %v" , oldIdxFileName , idxOffset , err )
2016-09-29 05:57:23 +00:00
}
2019-05-22 05:41:20 +00:00
key , offset , size := idx2 . IdxFileEntry ( IdxEntry )
2018-07-07 07:48:58 +00:00
glog . V ( 4 ) . Infof ( "key %d offset %d size %d" , key , offset , size )
2016-10-07 08:22:24 +00:00
if _ , found := incrementedHasUpdatedIndexEntry [ key ] ; ! found {
incrementedHasUpdatedIndexEntry [ key ] = keyField {
2016-09-29 05:57:23 +00:00
offset : offset ,
size : size ,
}
}
}
2018-06-24 01:24:59 +00:00
// no updates during commit step
if len ( incrementedHasUpdatedIndexEntry ) == 0 {
return nil
}
2016-09-29 05:57:23 +00:00
2018-06-24 01:24:59 +00:00
// deal with updates during commit step
var (
dst , idx * os . File
)
if dst , err = os . OpenFile ( newDatFileName , os . O_RDWR , 0644 ) ; err != nil {
return fmt . Errorf ( "open dat file %s failed: %v" , newDatFileName , err )
}
2019-10-30 05:37:36 +00:00
dstDatBackend := backend . NewDiskFile ( dst )
2019-10-29 07:35:16 +00:00
defer dstDatBackend . Close ( )
2016-09-29 05:57:23 +00:00
2018-06-24 01:24:59 +00:00
if idx , err = os . OpenFile ( newIdxFileName , os . O_RDWR , 0644 ) ; err != nil {
return fmt . Errorf ( "open idx file %s failed: %v" , newIdxFileName , err )
}
defer idx . Close ( )
var newDatCompactRevision uint16
2019-10-29 07:35:16 +00:00
newDatCompactRevision , err = fetchCompactRevisionFromDatFile ( dstDatBackend )
2018-06-24 01:24:59 +00:00
if err != nil {
return fmt . Errorf ( "fetchCompactRevisionFromDatFile dst %s failed: %v" , dst . Name ( ) , err )
}
if oldDatCompactRevision + 1 != newDatCompactRevision {
return fmt . Errorf ( "oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d" , oldDatFileName , oldDatCompactRevision , newDatFileName , newDatCompactRevision )
}
2016-10-07 08:22:24 +00:00
2019-01-16 09:48:59 +00:00
for key , increIdxEntry := range incrementedHasUpdatedIndexEntry {
2019-07-21 20:50:24 +00:00
idxEntryBytes := needle_map . ToBytes ( key , increIdxEntry . offset , increIdxEntry . size )
2016-09-29 05:57:23 +00:00
2018-06-24 01:24:59 +00:00
var offset int64
if offset , err = dst . Seek ( 0 , 2 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to seek the end of file: %v" , err )
return
}
//ensure file writing starting from aligned positions
if offset % NeedlePaddingSize != 0 {
offset = offset + ( NeedlePaddingSize - offset % NeedlePaddingSize )
2019-07-21 20:50:42 +00:00
if offset , err = dst . Seek ( offset , 0 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to align in datafile %s: %v" , dst . Name ( ) , err )
2016-09-29 05:57:23 +00:00
return
}
2018-06-24 01:24:59 +00:00
}
2016-10-07 08:22:24 +00:00
2018-06-24 01:24:59 +00:00
//updated needle
2019-04-09 02:40:56 +00:00
if ! increIdxEntry . offset . IsZero ( ) && increIdxEntry . size != 0 && increIdxEntry . size != TombstoneFileSize {
2018-06-24 01:24:59 +00:00
//even the needle cache in memory is hit, the need_bytes is correct
2019-04-09 02:40:56 +00:00
glog . V ( 4 ) . Infof ( "file %d offset %d size %d" , key , increIdxEntry . offset . ToAcutalOffset ( ) , increIdxEntry . size )
2019-01-16 09:48:59 +00:00
var needleBytes [ ] byte
2019-10-29 07:35:16 +00:00
needleBytes , err = needle . ReadNeedleBlob ( oldDatBackend , increIdxEntry . offset . ToAcutalOffset ( ) , increIdxEntry . size , v . Version ( ) )
2018-06-24 01:24:59 +00:00
if err != nil {
2019-04-09 02:40:56 +00:00
return fmt . Errorf ( "ReadNeedleBlob %s key %d offset %d size %d failed: %v" , oldDatFile . Name ( ) , key , increIdxEntry . offset . ToAcutalOffset ( ) , increIdxEntry . size , err )
2016-10-07 08:22:24 +00:00
}
2019-01-16 09:48:59 +00:00
dst . Write ( needleBytes )
util . Uint32toBytes ( idxEntryBytes [ 8 : 12 ] , uint32 ( offset / NeedlePaddingSize ) )
2018-06-24 01:24:59 +00:00
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
2019-04-19 04:43:36 +00:00
fakeDelNeedle := new ( needle . Needle )
2018-06-24 01:24:59 +00:00
fakeDelNeedle . Id = key
fakeDelNeedle . Cookie = 0x12345678
2018-07-24 09:44:33 +00:00
fakeDelNeedle . AppendAtNs = uint64 ( time . Now ( ) . UnixNano ( ) )
2019-10-29 07:35:16 +00:00
_ , _ , _ , err = fakeDelNeedle . Append ( dstDatBackend , v . Version ( ) )
2018-06-24 01:24:59 +00:00
if err != nil {
return fmt . Errorf ( "append deleted %d failed: %v" , key , err )
2016-10-07 08:22:24 +00:00
}
2019-01-16 09:48:59 +00:00
util . Uint32toBytes ( idxEntryBytes [ 8 : 12 ] , uint32 ( 0 ) )
2018-06-24 01:24:59 +00:00
}
if _ , err := idx . Seek ( 0 , 2 ) ; err != nil {
return fmt . Errorf ( "cannot seek end of indexfile %s: %v" ,
newIdxFileName , err )
2016-09-29 05:57:23 +00:00
}
2019-01-16 09:48:59 +00:00
_ , err = idx . Write ( idxEntryBytes )
2016-09-29 05:57:23 +00:00
}
2016-09-27 05:30:44 +00:00
return nil
2016-09-27 05:26:39 +00:00
}
2019-01-16 09:48:59 +00:00
type VolumeFileScanner4Vacuum struct {
2019-05-06 20:56:08 +00:00
version needle . Version
v * Volume
2019-10-29 07:35:16 +00:00
dstBackend backend . DataStorageBackend
2019-05-06 20:56:08 +00:00
nm * NeedleMap
newOffset int64
now uint64
writeThrottler * util . WriteThrottler
2019-01-16 09:48:59 +00:00
}
func ( scanner * VolumeFileScanner4Vacuum ) VisitSuperBlock ( superBlock SuperBlock ) error {
scanner . version = superBlock . Version ( )
2019-04-19 19:29:49 +00:00
superBlock . CompactionRevision ++
2019-10-29 07:35:16 +00:00
_ , err := scanner . dstBackend . WriteAt ( superBlock . Bytes ( ) , 0 )
2019-01-16 09:48:59 +00:00
scanner . newOffset = int64 ( superBlock . BlockSize ( ) )
return err
}
func ( scanner * VolumeFileScanner4Vacuum ) ReadNeedleBody ( ) bool {
return true
}
2019-10-22 07:50:30 +00:00
func ( scanner * VolumeFileScanner4Vacuum ) VisitNeedle ( n * needle . Needle , offset int64 , needleHeader , needleBody [ ] byte ) error {
2019-01-16 09:48:59 +00:00
if n . HasTtl ( ) && scanner . now >= n . LastModified + uint64 ( scanner . v . Ttl . Minutes ( ) * 60 ) {
return nil
}
nv , ok := scanner . v . nm . Get ( n . Id )
glog . V ( 4 ) . Infoln ( "needle expected offset " , offset , "ok" , ok , "nv" , nv )
2019-04-09 02:40:56 +00:00
if ok && nv . Offset . ToAcutalOffset ( ) == offset && nv . Size > 0 && nv . Size != TombstoneFileSize {
if err := scanner . nm . Put ( n . Id , ToOffset ( scanner . newOffset ) , n . Size ) ; err != nil {
2019-01-16 09:48:59 +00:00
return fmt . Errorf ( "cannot put needle: %s" , err )
}
2019-10-29 07:35:16 +00:00
if _ , _ , _ , err := n . Append ( scanner . dstBackend , scanner . v . Version ( ) ) ; err != nil {
2019-01-16 09:48:59 +00:00
return fmt . Errorf ( "cannot append needle: %s" , err )
}
2019-05-04 00:22:39 +00:00
delta := n . DiskSize ( scanner . version )
scanner . newOffset += delta
2019-05-06 20:56:08 +00:00
scanner . writeThrottler . MaybeSlowdown ( delta )
2019-01-16 09:48:59 +00:00
glog . V ( 4 ) . Infoln ( "saving key" , n . Id , "volume offset" , offset , "=>" , scanner . newOffset , "data_size" , n . Size )
}
return nil
}
2019-09-03 16:00:59 +00:00
func ( v * Volume ) copyDataAndGenerateIndexFile ( dstName , idxName string , preallocate int64 , compactionBytePerSecond int64 ) ( err error ) {
2014-05-20 03:54:39 +00:00
var (
2019-11-09 08:10:59 +00:00
dst backend . DataStorageBackend
idx * os . File
2014-05-20 03:54:39 +00:00
)
2019-10-30 06:34:38 +00:00
if dst , err = createVolumeFile ( dstName , preallocate , 0 ) ; err != nil {
2014-05-20 03:54:39 +00:00
return
}
defer dst . Close ( )
if idx , err = os . OpenFile ( idxName , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 ) ; err != nil {
return
}
defer idx . Close ( )
2019-01-16 09:48:59 +00:00
scanner := & VolumeFileScanner4Vacuum {
2019-05-06 20:56:08 +00:00
v : v ,
now : uint64 ( time . Now ( ) . Unix ( ) ) ,
nm : NewBtreeNeedleMap ( idx ) ,
2019-11-09 08:10:59 +00:00
dstBackend : dst ,
2019-05-06 20:56:08 +00:00
writeThrottler : util . NewWriteThrottler ( compactionBytePerSecond ) ,
2019-01-16 09:48:59 +00:00
}
err = ScanVolumeFile ( v . dir , v . Collection , v . Id , v . needleMapKind , scanner )
2014-05-20 03:54:39 +00:00
return
}
2016-09-23 03:31:17 +00:00
func ( v * Volume ) copyDataBasedOnIndexFile ( dstName , idxName string ) ( err error ) {
var (
dst , idx , oldIndexFile * os . File
)
if dst , err = os . OpenFile ( dstName , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 ) ; err != nil {
return
}
2019-10-30 05:37:36 +00:00
dstDatBackend := backend . NewDiskFile ( dst )
2019-10-29 07:35:16 +00:00
defer dstDatBackend . Close ( )
2016-09-23 03:31:17 +00:00
if idx , err = os . OpenFile ( idxName , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 ) ; err != nil {
return
}
defer idx . Close ( )
if oldIndexFile , err = os . OpenFile ( v . FileName ( ) + ".idx" , os . O_RDONLY , 0644 ) ; err != nil {
return
}
defer oldIndexFile . Close ( )
2017-05-27 05:51:25 +00:00
nm := NewBtreeNeedleMap ( idx )
2016-09-23 03:31:17 +00:00
now := uint64 ( time . Now ( ) . Unix ( ) )
2019-04-19 19:29:49 +00:00
v . SuperBlock . CompactionRevision ++
2016-09-23 03:31:17 +00:00
dst . Write ( v . SuperBlock . Bytes ( ) )
2019-01-16 09:48:59 +00:00
newOffset := int64 ( v . SuperBlock . BlockSize ( ) )
2016-09-23 03:31:17 +00:00
2019-05-22 05:41:20 +00:00
idx2 . WalkIndexFile ( oldIndexFile , func ( key NeedleId , offset Offset , size uint32 ) error {
2019-04-09 02:40:56 +00:00
if offset . IsZero ( ) || size == TombstoneFileSize {
2016-09-23 03:31:17 +00:00
return nil
}
nv , ok := v . nm . Get ( key )
if ! ok {
return nil
}
2019-04-19 04:43:36 +00:00
n := new ( needle . Needle )
2019-10-29 07:35:16 +00:00
err := n . ReadData ( v . DataBackend , offset . ToAcutalOffset ( ) , size , v . Version ( ) )
2019-03-19 12:34:43 +00:00
if err != nil {
return nil
}
2016-09-23 03:31:17 +00:00
if n . HasTtl ( ) && now >= n . LastModified + uint64 ( v . Ttl . Minutes ( ) * 60 ) {
return nil
}
glog . V ( 4 ) . Infoln ( "needle expected offset " , offset , "ok" , ok , "nv" , nv )
if nv . Offset == offset && nv . Size > 0 {
2019-04-09 02:40:56 +00:00
if err = nm . Put ( n . Id , ToOffset ( newOffset ) , n . Size ) ; err != nil {
2016-09-23 03:31:17 +00:00
return fmt . Errorf ( "cannot put needle: %s" , err )
}
2019-10-29 07:35:16 +00:00
if _ , _ , _ , err = n . Append ( dstDatBackend , v . Version ( ) ) ; err != nil {
2016-09-23 03:31:17 +00:00
return fmt . Errorf ( "cannot append needle: %s" , err )
}
2019-01-16 09:48:59 +00:00
newOffset += n . DiskSize ( v . Version ( ) )
glog . V ( 3 ) . Infoln ( "saving key" , n . Id , "volume offset" , offset , "=>" , newOffset , "data_size" , n . Size )
2016-09-23 03:31:17 +00:00
}
return nil
} )
return
}