2015-03-27 23:34:58 +00:00
package storage
import (
"fmt"
"os"
"path/filepath"
2022-10-12 04:13:25 +00:00
"strings"
2022-11-14 08:19:27 +00:00
"sync"
"time"
2015-03-27 23:34:58 +00:00
2020-05-22 17:54:42 +00:00
"github.com/syndtr/goleveldb/leveldb/errors"
2019-05-22 05:41:20 +00:00
"github.com/syndtr/goleveldb/leveldb/opt"
2022-07-29 07:17:28 +00:00
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/util"
2020-05-22 17:54:42 +00:00
"github.com/syndtr/goleveldb/leveldb"
2022-07-29 07:17:28 +00:00
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
2015-03-27 23:34:58 +00:00
)
2022-08-24 06:53:35 +00:00
// mark it every watermarkBatchSize operations
2022-07-20 01:26:06 +00:00
const watermarkBatchSize = 10000
var watermarkKey = [ ] byte ( "idx_entry_watermark" )
2022-07-19 01:20:45 +00:00
2015-03-27 23:34:58 +00:00
type LevelDbNeedleMap struct {
2019-05-05 04:33:05 +00:00
baseNeedleMapper
2022-11-14 08:19:27 +00:00
dbFileName string
db * leveldb . DB
ldbOpts * opt . Options
ldbAccessLock sync . RWMutex
exitChan chan bool
// no need to use atomic
accessFlag int64
ldbTimeout int64
2022-07-20 01:26:06 +00:00
recordCount uint64
2015-03-27 23:34:58 +00:00
}
2022-11-14 08:19:27 +00:00
func NewLevelDbNeedleMap ( dbFileName string , indexFile * os . File , opts * opt . Options , ldbTimeout int64 ) ( m * LevelDbNeedleMap , err error ) {
2015-05-26 07:58:41 +00:00
m = & LevelDbNeedleMap { dbFileName : dbFileName }
m . indexFile = indexFile
2015-03-27 23:34:58 +00:00
if ! isLevelDbFresh ( dbFileName , indexFile ) {
2022-07-19 05:17:52 +00:00
glog . V ( 1 ) . Infof ( "Start to Generate %s from %s" , dbFileName , indexFile . Name ( ) )
Add boltdb for volume needle map
boltdb is fairly slow to write, about 6 minutes for recreating index
for 1553934 files. Boltdb loads 1,553,934 x 16 = 24,862,944bytes from
disk, and generate the boltdb as large as 134,217,728 bytes in 6
minutes.
To compare, for leveldb, it recreates index in leveldb as large as
27,188,148 bytes in 8 seconds.
For in memory version, it loads the index in
To test the memory consumption, the leveldb or boltdb index are
created. And the server is restarted. Using the benchmark tool to read
lots of files. There are 7 volumes in benchmark collection, each with
about 1553K files.
For leveldb, the memory starts at 142,884KB, and stays at 179,340KB.
For boltdb, the memory starts at 73,756KB, and stays at 144,564KB.
For in-memory, the memory starts at 368,152KB, and stays at 448,032KB.
2015-03-29 18:04:32 +00:00
generateLevelDbFile ( dbFileName , indexFile )
2022-07-19 05:17:52 +00:00
glog . V ( 1 ) . Infof ( "Finished Generating %s from %s" , dbFileName , indexFile . Name ( ) )
2015-03-27 23:34:58 +00:00
}
2021-02-20 20:39:25 +00:00
if stat , err := indexFile . Stat ( ) ; err != nil {
glog . Fatalf ( "stat file %s: %v" , indexFile . Name ( ) , err )
} else {
m . indexFileOffset = stat . Size ( )
}
2022-07-19 05:17:52 +00:00
glog . V ( 1 ) . Infof ( "Opening %s..." , dbFileName )
2019-04-09 16:42:06 +00:00
2022-11-14 08:19:27 +00:00
if m . ldbTimeout == 0 {
if m . db , err = leveldb . OpenFile ( dbFileName , opts ) ; err != nil {
if errors . IsCorrupted ( err ) {
m . db , err = leveldb . RecoverFile ( dbFileName , opts )
}
if err != nil {
return
}
2020-05-22 17:54:42 +00:00
}
2022-11-14 08:19:27 +00:00
glog . V ( 0 ) . Infof ( "Loading %s... , watermark: %d" , dbFileName , getWatermark ( m . db ) )
m . recordCount = uint64 ( m . indexFileOffset / NeedleMapEntrySize )
watermark := ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
err = setWatermark ( m . db , watermark )
2020-05-26 07:03:44 +00:00
if err != nil {
2022-11-14 08:19:27 +00:00
glog . Fatalf ( "set watermark for %s error: %s\n" , dbFileName , err )
2020-05-26 07:03:44 +00:00
return
}
2015-03-27 23:34:58 +00:00
}
2018-07-07 07:51:17 +00:00
mm , indexLoadError := newNeedleMapMetricFromIndexFile ( indexFile )
2015-03-27 23:34:58 +00:00
if indexLoadError != nil {
return nil , indexLoadError
}
2018-07-07 07:51:17 +00:00
m . mapMetric = * mm
2022-11-14 08:19:27 +00:00
m . ldbTimeout = ldbTimeout
if m . ldbTimeout > 0 {
m . ldbOpts = opts
m . exitChan = make ( chan bool , 1 )
m . accessFlag = 0
go lazyLoadingRoutine ( m )
}
2015-03-27 23:34:58 +00:00
return
}
func isLevelDbFresh ( dbFileName string , indexFile * os . File ) bool {
// normally we always write to index file first
dbLogFile , err := os . Open ( filepath . Join ( dbFileName , "LOG" ) )
if err != nil {
return false
}
defer dbLogFile . Close ( )
dbStat , dbStatErr := dbLogFile . Stat ( )
indexStat , indexStatErr := indexFile . Stat ( )
if dbStatErr != nil || indexStatErr != nil {
glog . V ( 0 ) . Infof ( "Can not stat file: %v and %v" , dbStatErr , indexStatErr )
return false
}
return dbStat . ModTime ( ) . After ( indexStat . ModTime ( ) )
}
Add boltdb for volume needle map
boltdb is fairly slow to write, about 6 minutes for recreating index
for 1553934 files. Boltdb loads 1,553,934 x 16 = 24,862,944bytes from
disk, and generate the boltdb as large as 134,217,728 bytes in 6
minutes.
To compare, for leveldb, it recreates index in leveldb as large as
27,188,148 bytes in 8 seconds.
For in memory version, it loads the index in
To test the memory consumption, the leveldb or boltdb index are
created. And the server is restarted. Using the benchmark tool to read
lots of files. There are 7 volumes in benchmark collection, each with
about 1553K files.
For leveldb, the memory starts at 142,884KB, and stays at 179,340KB.
For boltdb, the memory starts at 73,756KB, and stays at 144,564KB.
For in-memory, the memory starts at 368,152KB, and stays at 448,032KB.
2015-03-29 18:04:32 +00:00
func generateLevelDbFile ( dbFileName string , indexFile * os . File ) error {
2015-03-27 23:34:58 +00:00
db , err := leveldb . OpenFile ( dbFileName , nil )
if err != nil {
return err
}
defer db . Close ( )
2022-07-19 01:20:45 +00:00
2022-07-20 01:26:06 +00:00
watermark := getWatermark ( db )
2022-07-19 01:20:45 +00:00
if stat , err := indexFile . Stat ( ) ; err != nil {
glog . Fatalf ( "stat file %s: %v" , indexFile . Name ( ) , err )
return err
} else {
2022-10-13 05:59:07 +00:00
if watermark * NeedleMapEntrySize > uint64 ( stat . Size ( ) ) {
2022-07-20 01:26:06 +00:00
glog . Warningf ( "wrong watermark %d for filesize %d" , watermark , stat . Size ( ) )
2022-07-19 01:20:45 +00:00
}
2022-10-13 05:59:07 +00:00
glog . V ( 0 ) . Infof ( "generateLevelDbFile %s, watermark %d, num of entries:%d" , dbFileName , watermark , ( uint64 ( stat . Size ( ) ) - watermark * NeedleMapEntrySize ) / NeedleMapEntrySize )
2022-07-19 01:20:45 +00:00
}
2022-07-20 01:26:06 +00:00
return idx . WalkIndexFile ( indexFile , watermark , func ( key NeedleId , offset Offset , size Size ) error {
2020-09-12 19:42:36 +00:00
if ! offset . IsZero ( ) && size . IsValid ( ) {
2022-07-19 05:17:52 +00:00
levelDbWrite ( db , key , offset , size , false , 0 )
2015-03-27 23:34:58 +00:00
} else {
levelDbDelete ( db , key )
}
return nil
} )
}
2019-04-19 04:43:36 +00:00
func ( m * LevelDbNeedleMap ) Get ( key NeedleId ) ( element * needle_map . NeedleValue , ok bool ) {
2018-07-08 09:28:04 +00:00
bytes := make ( [ ] byte , NeedleIdSize )
2022-11-14 08:19:27 +00:00
if m . ldbTimeout > 0 {
m . ldbAccessLock . RLock ( )
defer m . ldbAccessLock . RUnlock ( )
loadErr := reloadLdb ( m )
if loadErr != nil {
return nil , false
}
}
2018-07-08 09:28:04 +00:00
NeedleIdToBytes ( bytes [ 0 : NeedleIdSize ] , key )
2015-03-27 23:34:58 +00:00
data , err := m . db . Get ( bytes , nil )
2018-07-08 09:28:04 +00:00
if err != nil || len ( data ) != OffsetSize + SizeSize {
2015-03-27 23:34:58 +00:00
return nil , false
}
2018-07-08 09:28:04 +00:00
offset := BytesToOffset ( data [ 0 : OffsetSize ] )
2020-08-19 00:04:28 +00:00
size := BytesToSize ( data [ OffsetSize : OffsetSize + SizeSize ] )
2019-06-21 08:14:10 +00:00
return & needle_map . NeedleValue { Key : key , Offset : offset , Size : size } , true
2015-03-27 23:34:58 +00:00
}
2020-08-19 00:04:28 +00:00
func ( m * LevelDbNeedleMap ) Put ( key NeedleId , offset Offset , size Size ) error {
var oldSize Size
2022-07-20 01:26:06 +00:00
var watermark uint64
2022-11-14 08:19:27 +00:00
if m . ldbTimeout > 0 {
m . ldbAccessLock . RLock ( )
defer m . ldbAccessLock . RUnlock ( )
loadErr := reloadLdb ( m )
if loadErr != nil {
return loadErr
}
}
2015-03-27 23:34:58 +00:00
if oldNeedle , ok := m . Get ( key ) ; ok {
oldSize = oldNeedle . Size
}
m . logPut ( key , oldSize , size )
// write to index file first
2015-05-26 07:58:41 +00:00
if err := m . appendToIndexFile ( key , offset , size ) ; err != nil {
2015-03-27 23:34:58 +00:00
return fmt . Errorf ( "cannot write to indexfile %s: %v" , m . indexFile . Name ( ) , err )
}
2022-07-20 01:26:06 +00:00
m . recordCount ++
if m . recordCount % watermarkBatchSize != 0 {
watermark = 0
2022-07-19 01:20:45 +00:00
} else {
2022-07-20 01:26:06 +00:00
watermark = ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
glog . V ( 1 ) . Infof ( "put cnt:%d for %s,watermark: %d" , m . recordCount , m . dbFileName , watermark )
2022-07-19 01:20:45 +00:00
}
2022-07-20 01:26:06 +00:00
return levelDbWrite ( m . db , key , offset , size , watermark == 0 , watermark )
2015-03-27 23:34:58 +00:00
}
2022-07-20 01:26:06 +00:00
func getWatermark ( db * leveldb . DB ) uint64 {
data , err := db . Get ( watermarkKey , nil )
2022-07-19 01:20:45 +00:00
if err != nil || len ( data ) != 8 {
2022-09-13 17:33:28 +00:00
glog . V ( 1 ) . Infof ( "read previous watermark from db: %v, %d" , err , len ( data ) )
2022-07-19 01:20:45 +00:00
return 0
}
return util . BytesToUint64 ( data )
}
2022-07-20 01:26:06 +00:00
func setWatermark ( db * leveldb . DB , watermark uint64 ) error {
2022-08-24 06:53:35 +00:00
glog . V ( 3 ) . Infof ( "set watermark %d" , watermark )
2022-07-20 01:26:06 +00:00
var wmBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( wmBytes , watermark )
if err := db . Put ( watermarkKey , wmBytes , nil ) ; err != nil {
return fmt . Errorf ( "failed to setWatermark: %v" , err )
2022-07-19 01:20:45 +00:00
}
return nil
}
2022-07-20 01:26:06 +00:00
func levelDbWrite ( db * leveldb . DB , key NeedleId , offset Offset , size Size , updateWatermark bool , watermark uint64 ) error {
2018-07-08 09:28:04 +00:00
2019-07-21 20:50:24 +00:00
bytes := needle_map . ToBytes ( key , offset , size )
2018-07-08 09:28:04 +00:00
if err := db . Put ( bytes [ 0 : NeedleIdSize ] , bytes [ NeedleIdSize : NeedleIdSize + OffsetSize + SizeSize ] , nil ) ; err != nil {
2015-03-27 23:34:58 +00:00
return fmt . Errorf ( "failed to write leveldb: %v" , err )
}
2022-07-20 01:26:06 +00:00
// set watermark
if updateWatermark {
return setWatermark ( db , watermark )
2022-07-19 01:20:45 +00:00
}
2015-03-27 23:34:58 +00:00
return nil
}
2022-10-12 04:13:25 +00:00
2018-07-08 09:28:04 +00:00
func levelDbDelete ( db * leveldb . DB , key NeedleId ) error {
bytes := make ( [ ] byte , NeedleIdSize )
NeedleIdToBytes ( bytes , key )
2015-03-27 23:34:58 +00:00
return db . Delete ( bytes , nil )
}
2020-09-12 19:42:36 +00:00
func ( m * LevelDbNeedleMap ) Delete ( key NeedleId , offset Offset ) error {
2022-07-20 01:26:06 +00:00
var watermark uint64
2022-11-14 08:19:27 +00:00
if m . ldbTimeout > 0 {
m . ldbAccessLock . RLock ( )
defer m . ldbAccessLock . RUnlock ( )
loadErr := reloadLdb ( m )
if loadErr != nil {
return loadErr
}
}
2020-08-19 02:22:16 +00:00
oldNeedle , found := m . Get ( key )
if ! found || oldNeedle . Size . IsDeleted ( ) {
return nil
2015-03-27 23:34:58 +00:00
}
2020-08-19 02:22:16 +00:00
m . logDelete ( oldNeedle . Size )
2015-03-27 23:34:58 +00:00
// write to index file first
2020-09-12 19:42:36 +00:00
if err := m . appendToIndexFile ( key , offset , TombstoneFileSize ) ; err != nil {
2015-03-27 23:34:58 +00:00
return err
}
2022-07-20 01:26:06 +00:00
m . recordCount ++
if m . recordCount % watermarkBatchSize != 0 {
watermark = 0
2022-07-19 01:20:45 +00:00
} else {
2022-07-20 01:26:06 +00:00
watermark = ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
2022-07-19 01:20:45 +00:00
}
2022-07-20 01:26:06 +00:00
return levelDbWrite ( m . db , key , oldNeedle . Offset , - oldNeedle . Size , watermark == 0 , watermark )
2015-03-27 23:34:58 +00:00
}
func ( m * LevelDbNeedleMap ) Close ( ) {
2022-08-24 06:53:35 +00:00
if m . indexFile != nil {
indexFileName := m . indexFile . Name ( )
if err := m . indexFile . Sync ( ) ; err != nil {
glog . Warningf ( "sync file %s failed: %v" , indexFileName , err )
}
if err := m . indexFile . Close ( ) ; err != nil {
glog . Warningf ( "close index file %s failed: %v" , indexFileName , err )
}
2020-02-04 18:37:14 +00:00
}
2021-03-07 09:49:06 +00:00
if m . db != nil {
if err := m . db . Close ( ) ; err != nil {
glog . Warningf ( "close levelDB failed: %v" , err )
}
2020-02-04 18:37:14 +00:00
}
2022-11-14 08:19:27 +00:00
if m . ldbTimeout > 0 {
m . exitChan <- true
}
2015-03-27 23:34:58 +00:00
}
func ( m * LevelDbNeedleMap ) Destroy ( ) error {
m . Close ( )
os . Remove ( m . indexFile . Name ( ) )
2017-08-30 14:55:03 +00:00
return os . RemoveAll ( m . dbFileName )
2015-03-27 23:34:58 +00:00
}
2022-08-24 06:53:35 +00:00
2022-11-14 08:19:27 +00:00
func ( m * LevelDbNeedleMap ) UpdateNeedleMap ( v * Volume , indexFile * os . File , opts * opt . Options , ldbTimeout int64 ) error {
2022-08-24 06:53:35 +00:00
if v . nm != nil {
v . nm . Close ( )
v . nm = nil
}
defer func ( ) {
if v . tmpNm != nil {
v . tmpNm . Close ( )
v . tmpNm = nil
}
} ( )
levelDbFile := v . FileName ( ".ldb" )
m . indexFile = indexFile
err := os . RemoveAll ( levelDbFile )
if err != nil {
return err
}
if err = os . Rename ( v . FileName ( ".cpldb" ) , levelDbFile ) ; err != nil {
return fmt . Errorf ( "rename %s: %v" , levelDbFile , err )
}
db , err := leveldb . OpenFile ( levelDbFile , opts )
if err != nil {
if errors . IsCorrupted ( err ) {
db , err = leveldb . RecoverFile ( levelDbFile , opts )
}
if err != nil {
return err
}
}
m . db = db
stat , e := indexFile . Stat ( )
if e != nil {
glog . Fatalf ( "stat file %s: %v" , indexFile . Name ( ) , e )
return e
}
m . indexFileOffset = stat . Size ( )
2022-10-13 05:59:07 +00:00
m . recordCount = uint64 ( stat . Size ( ) / NeedleMapEntrySize )
2022-08-24 06:53:35 +00:00
//set watermark
watermark := ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
err = setWatermark ( db , uint64 ( watermark ) )
if err != nil {
glog . Fatalf ( "setting watermark failed %s: %v" , indexFile . Name ( ) , err )
return err
}
v . nm = m
v . tmpNm = nil
2022-11-14 08:19:27 +00:00
m . ldbTimeout = ldbTimeout
if m . ldbTimeout > 0 {
m . ldbOpts = opts
m . exitChan = make ( chan bool , 1 )
m . accessFlag = 0
go lazyLoadingRoutine ( m )
}
2022-08-24 06:53:35 +00:00
return e
}
func ( m * LevelDbNeedleMap ) DoOffsetLoading ( v * Volume , indexFile * os . File , startFrom uint64 ) ( err error ) {
glog . V ( 0 ) . Infof ( "loading idx to leveldb from offset %d for file: %s" , startFrom , indexFile . Name ( ) )
dbFileName := v . FileName ( ".cpldb" )
db , dbErr := leveldb . OpenFile ( dbFileName , nil )
defer func ( ) {
if dbErr == nil {
db . Close ( )
}
if err != nil {
os . RemoveAll ( dbFileName )
}
} ( )
if dbErr != nil {
if errors . IsCorrupted ( err ) {
db , dbErr = leveldb . RecoverFile ( dbFileName , nil )
}
if dbErr != nil {
return dbErr
}
}
err = idx . WalkIndexFile ( indexFile , startFrom , func ( key NeedleId , offset Offset , size Size ) ( e error ) {
2022-10-12 04:13:25 +00:00
m . mapMetric . FileCounter ++
bytes := make ( [ ] byte , NeedleIdSize )
NeedleIdToBytes ( bytes [ 0 : NeedleIdSize ] , key )
// fresh loading
if startFrom == 0 {
m . mapMetric . FileByteCounter += uint64 ( size )
e = levelDbWrite ( db , key , offset , size , false , 0 )
return e
}
// increment loading
data , err := db . Get ( bytes , nil )
if err != nil {
if ! strings . Contains ( strings . ToLower ( err . Error ( ) ) , "not found" ) {
// unexpected error
return err
}
// new needle, unlikely happen
m . mapMetric . FileByteCounter += uint64 ( size )
2022-08-24 06:53:35 +00:00
e = levelDbWrite ( db , key , offset , size , false , 0 )
} else {
2022-10-12 04:13:25 +00:00
// needle is found
oldSize := BytesToSize ( data [ OffsetSize : OffsetSize + SizeSize ] )
oldOffset := BytesToOffset ( data [ 0 : OffsetSize ] )
if ! offset . IsZero ( ) && size . IsValid ( ) {
// updated needle
m . mapMetric . FileByteCounter += uint64 ( size )
if ! oldOffset . IsZero ( ) && oldSize . IsValid ( ) {
m . mapMetric . DeletionCounter ++
m . mapMetric . DeletionByteCounter += uint64 ( oldSize )
}
e = levelDbWrite ( db , key , offset , size , false , 0 )
} else {
// deleted needle
m . mapMetric . DeletionCounter ++
m . mapMetric . DeletionByteCounter += uint64 ( oldSize )
e = levelDbDelete ( db , key )
}
2022-08-24 06:53:35 +00:00
}
return e
} )
2022-10-12 04:13:25 +00:00
return err
2022-08-24 06:53:35 +00:00
}
2022-11-14 08:19:27 +00:00
func reloadLdb ( m * LevelDbNeedleMap ) ( err error ) {
if m . db != nil {
return nil
}
glog . V ( 1 ) . Infof ( "reloading leveldb %s" , m . dbFileName )
m . accessFlag = 1
if m . db , err = leveldb . OpenFile ( m . dbFileName , m . ldbOpts ) ; err != nil {
if errors . IsCorrupted ( err ) {
m . db , err = leveldb . RecoverFile ( m . dbFileName , m . ldbOpts )
}
if err != nil {
glog . Fatalf ( "RecoverFile %s failed:%v" , m . dbFileName , err )
return err
}
}
return nil
}
func unloadLdb ( m * LevelDbNeedleMap ) ( err error ) {
m . ldbAccessLock . Lock ( )
defer m . ldbAccessLock . Unlock ( )
if m . db != nil {
glog . V ( 1 ) . Infof ( "reached max idle count, unload leveldb, %s" , m . dbFileName )
m . db . Close ( )
m . db = nil
}
return nil
}
func lazyLoadingRoutine ( m * LevelDbNeedleMap ) ( err error ) {
glog . V ( 1 ) . Infof ( "lazyLoadingRoutine %s" , m . dbFileName )
var accessRecord int64
accessRecord = 1
for {
select {
case exit := <- m . exitChan :
if exit {
glog . V ( 1 ) . Infof ( "exit from lazyLoadingRoutine" )
return nil
}
case <- time . After ( time . Hour * 1 ) :
glog . V ( 1 ) . Infof ( "timeout %s" , m . dbFileName )
if m . accessFlag == 0 {
accessRecord ++
glog . V ( 1 ) . Infof ( "accessRecord++" )
if accessRecord >= m . ldbTimeout {
unloadLdb ( m )
}
} else {
glog . V ( 1 ) . Infof ( "reset accessRecord %s" , m . dbFileName )
// reset accessRecord
accessRecord = 0
}
continue
}
}
}