diff --git a/weed/command/backup.go b/weed/command/backup.go index 3b14705d7..58eda638e 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/security" @@ -113,7 +114,7 @@ func runBackup(cmd *Command, args []string) bool { return true } } - v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) + v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true @@ -138,7 +139,7 @@ func runBackup(cmd *Command, args []string) bool { // remove the old data v.Destroy() // recreate an empty volume - v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) + v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index 6c390ea1f..6d1f5d5f1 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -41,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool { preallocate := *compactVolumePreallocate * (1 << 20) vid := needle.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0) + v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0, 0) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/weed/command/server.go b/weed/command/server.go index 526841e28..24c529e62 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -125,6 +125,7 @@ func init() { serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") + serverOptions.v.ldbTimeout = cmdServer.Flag.Int64("volume.index.leveldbTimeout", 0, "alive time for leveldb (default to 0). If leveldb of volume is not accessed in ldbTimeout hours, it will be off loaded to reduce opened files and memory consumption.") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") diff --git a/weed/command/volume.go b/weed/command/volume.go index e0bc8b8fe..30337a62a 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -68,6 +68,7 @@ type VolumeServerOptions struct { inflightUploadDataTimeout *time.Duration hasSlowRead *bool readBufferSizeMB *int + ldbTimeout *int64 } func init() { @@ -92,6 +93,7 @@ func init() { v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") + v.ldbTimeout = cmdVolume.Flag.Int64("index.leveldbTimeout", 0, "alive time for leveldb (default to 0). If leveldb of volume is not accessed in ldbTimeout hours, it will be off loaded to reduce opened files and memory consumption.") v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size") v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") @@ -249,6 +251,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.inflightUploadDataTimeout, *v.hasSlowRead, *v.readBufferSizeMB, + *v.ldbTimeout, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 790e6e32a..24a9650e7 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,10 +3,11 @@ package weed_server import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage" "path/filepath" "time" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -50,6 +51,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p req.Preallocate, req.MemoryMapMaxSizeMb, types.ToDiskType(req.DiskType), + vs.ldbTimout, ) if err != nil { diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 8c6c63c56..5d7e5c7b0 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -41,6 +41,7 @@ type VolumeServer struct { grpcDialOption grpc.DialOption needleMapKind storage.NeedleMapKind + ldbTimout int64 FixJpgOrientation bool ReadMode string compactionBytePerSecond int64 @@ -68,6 +69,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, inflightUploadDataTimeout time.Duration, hasSlowRead bool, readBufferSizeMB int, + ldbTimeout int64, ) *VolumeServer { v := util.GetViper() @@ -99,12 +101,13 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, inflightUploadDataTimeout: inflightUploadDataTimeout, hasSlowRead: hasSlowRead, readBufferSizeMB: readBufferSizeMB, + ldbTimout: ldbTimeout, } vs.SeedMasterNodes = masterNodes vs.checkWithMaster() - vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes) + vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index b3be04703..0629acff7 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -114,7 +114,7 @@ func getValidVolumeName(basename string) string { return "" } -func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool) bool { +func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool, ldbTimeout int64) bool { basename := dirEntry.Name() if dirEntry.IsDir() { return false @@ -158,7 +158,7 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne } // load the volume - v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0) + v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout) if e != nil { glog.V(0).Infof("new volume %s error %s", volumeName, e) return false @@ -172,7 +172,7 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne return true } -func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) { +func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int, ldbTimeout int64) { task_queue := make(chan os.DirEntry, 10*concurrency) go func() { @@ -198,7 +198,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con go func() { defer wg.Done() for fi := range task_queue { - _ = l.loadExistingVolume(fi, needleMapKind, true) + _ = l.loadExistingVolume(fi, needleMapKind, true, ldbTimeout) } }() } @@ -206,7 +206,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con } -func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind, ldbTimeout int64) { workerNum := runtime.NumCPU() val, ok := os.LookupEnv("GOMAXPROCS") @@ -222,7 +222,7 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { workerNum = 10 } } - l.concurrentLoadingVolumes(needleMapKind, workerNum) + l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) l.loadAllEcShards() @@ -292,7 +292,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e erro func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool { if fileInfo, found := l.LocateVolume(vid); found { - return l.loadExistingVolume(fileInfo, needleMapKind, false) + return l.loadExistingVolume(fileInfo, needleMapKind, false, 0) } return false } diff --git a/weed/storage/idx_binary_search_test.go b/weed/storage/idx_binary_search_test.go index 48f48852e..07eacb898 100644 --- a/weed/storage/idx_binary_search_test.go +++ b/weed/storage/idx_binary_search_test.go @@ -1,19 +1,20 @@ package storage import ( + "os" + "testing" + "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/stretchr/testify/assert" - "os" - "testing" ) func TestFirstInvalidIndex(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 0e49da959..5feb4a754 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -47,7 +47,7 @@ type baseNeedleMapper struct { type TempNeedleMapper interface { NeedleMapper DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error - UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error + UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options, ldbTimeout int64) error } func (nm *baseNeedleMapper) IndexFileSize() uint64 { diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 1566ca7a0..a5a543ba2 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -5,6 +5,8 @@ import ( "os" "path/filepath" "strings" + "sync" + "time" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" @@ -26,12 +28,18 @@ var watermarkKey = []byte("idx_entry_watermark") type LevelDbNeedleMap struct { baseNeedleMapper - dbFileName string - db *leveldb.DB + dbFileName string + db *leveldb.DB + ldbOpts *opt.Options + ldbAccessLock sync.RWMutex + exitChan chan bool + // no need to use atomic + accessFlag int64 + ldbTimeout int64 recordCount uint64 } -func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) { +func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options, ldbTimeout int64) (m *LevelDbNeedleMap, err error) { m = &LevelDbNeedleMap{dbFileName: dbFileName} m.indexFile = indexFile if !isLevelDbFresh(dbFileName, indexFile) { @@ -46,27 +54,36 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option } glog.V(1).Infof("Opening %s...", dbFileName) - if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil { - if errors.IsCorrupted(err) { - m.db, err = leveldb.RecoverFile(dbFileName, opts) + if m.ldbTimeout == 0 { + if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil { + if errors.IsCorrupted(err) { + m.db, err = leveldb.RecoverFile(dbFileName, opts) + } + if err != nil { + return + } } + glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db)) + m.recordCount = uint64(m.indexFileOffset / NeedleMapEntrySize) + watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize + err = setWatermark(m.db, watermark) if err != nil { + glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err) return } } - glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db)) - m.recordCount = uint64(m.indexFileOffset / NeedleMapEntrySize) - watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize - err = setWatermark(m.db, watermark) - if err != nil { - glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err) - return - } mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile) if indexLoadError != nil { return nil, indexLoadError } m.mapMetric = *mm + m.ldbTimeout = ldbTimeout + if m.ldbTimeout > 0 { + m.ldbOpts = opts + m.exitChan = make(chan bool, 1) + m.accessFlag = 0 + go lazyLoadingRoutine(m) + } return } @@ -116,6 +133,14 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) { bytes := make([]byte, NeedleIdSize) + if m.ldbTimeout > 0 { + m.ldbAccessLock.RLock() + defer m.ldbAccessLock.RUnlock() + loadErr := reloadLdb(m) + if loadErr != nil { + return nil, false + } + } NeedleIdToBytes(bytes[0:NeedleIdSize], key) data, err := m.db.Get(bytes, nil) if err != nil || len(data) != OffsetSize+SizeSize { @@ -129,6 +154,14 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error { var oldSize Size var watermark uint64 + if m.ldbTimeout > 0 { + m.ldbAccessLock.RLock() + defer m.ldbAccessLock.RUnlock() + loadErr := reloadLdb(m) + if loadErr != nil { + return loadErr + } + } if oldNeedle, ok := m.Get(key); ok { oldSize = oldNeedle.Size } @@ -188,6 +221,14 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error { func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error { var watermark uint64 + if m.ldbTimeout > 0 { + m.ldbAccessLock.RLock() + defer m.ldbAccessLock.RUnlock() + loadErr := reloadLdb(m) + if loadErr != nil { + return loadErr + } + } oldNeedle, found := m.Get(key) if !found || oldNeedle.Size.IsDeleted() { return nil @@ -223,6 +264,9 @@ func (m *LevelDbNeedleMap) Close() { glog.Warningf("close levelDB failed: %v", err) } } + if m.ldbTimeout > 0 { + m.exitChan <- true + } } func (m *LevelDbNeedleMap) Destroy() error { @@ -231,7 +275,7 @@ func (m *LevelDbNeedleMap) Destroy() error { return os.RemoveAll(m.dbFileName) } -func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error { +func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options, ldbTimeout int64) error { if v.nm != nil { v.nm.Close() v.nm = nil @@ -280,6 +324,13 @@ func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts * } v.nm = m v.tmpNm = nil + m.ldbTimeout = ldbTimeout + if m.ldbTimeout > 0 { + m.ldbOpts = opts + m.exitChan = make(chan bool, 1) + m.accessFlag = 0 + go lazyLoadingRoutine(m) + } return e } @@ -348,3 +399,61 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF }) return err } + +func reloadLdb(m *LevelDbNeedleMap) (err error) { + if m.db != nil { + return nil + } + glog.V(1).Infof("reloading leveldb %s", m.dbFileName) + m.accessFlag = 1 + if m.db, err = leveldb.OpenFile(m.dbFileName, m.ldbOpts); err != nil { + if errors.IsCorrupted(err) { + m.db, err = leveldb.RecoverFile(m.dbFileName, m.ldbOpts) + } + if err != nil { + glog.Fatalf("RecoverFile %s failed:%v", m.dbFileName, err) + return err + } + } + return nil +} + +func unloadLdb(m *LevelDbNeedleMap) (err error) { + m.ldbAccessLock.Lock() + defer m.ldbAccessLock.Unlock() + if m.db != nil { + glog.V(1).Infof("reached max idle count, unload leveldb, %s", m.dbFileName) + m.db.Close() + m.db = nil + } + return nil +} + +func lazyLoadingRoutine(m *LevelDbNeedleMap) (err error) { + glog.V(1).Infof("lazyLoadingRoutine %s", m.dbFileName) + var accessRecord int64 + accessRecord = 1 + for { + select { + case exit := <-m.exitChan: + if exit { + glog.V(1).Infof("exit from lazyLoadingRoutine") + return nil + } + case <-time.After(time.Hour * 1): + glog.V(1).Infof("timeout %s", m.dbFileName) + if m.accessFlag == 0 { + accessRecord++ + glog.V(1).Infof("accessRecord++") + if accessRecord >= m.ldbTimeout { + unloadLdb(m) + } + } else { + glog.V(1).Infof("reset accessRecord %s", m.dbFileName) + // reset accessRecord + accessRecord = 0 + } + continue + } + } +} diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index 7721980ee..a2beb6c33 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -84,7 +84,7 @@ func (nm *NeedleMap) Destroy() error { return os.Remove(nm.indexFile.Name()) } -func (nm *NeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error { +func (nm *NeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options, ldbTimeout int64) error { if v.nm != nil { v.nm.Close() v.nm = nil diff --git a/weed/storage/store.go b/weed/storage/store.go index 3be0f58bf..40bb7bbe7 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -80,7 +80,7 @@ func (s *Store) String() (str string) { } func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, dirnames []string, maxVolumeCounts []int32, - minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) { + minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType, ldbTimeout int64) (s *Store) { s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) @@ -93,7 +93,7 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, wg.Add(1) go func() { defer wg.Done() - location.loadExistingVolumes(needleMapKind) + location.loadExistingVolumes(needleMapKind, ldbTimeout) }() } wg.Wait() @@ -106,7 +106,7 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error { rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -115,7 +115,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType, ldbTimeout) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -158,14 +158,14 @@ func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.FindFreeLocation(diskType); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { + if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb, ldbTimeout); err == nil { location.SetVolume(vid, volume) glog.V(0).Infof("add volume %d", vid) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ @@ -373,7 +373,7 @@ func (s *Store) SetStopping() { func (s *Store) LoadNewVolumes() { for _, location := range s.Locations { - location.loadExistingVolumes(s.NeedleMapKind) + location.loadExistingVolumes(s.NeedleMapKind, 0) } } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index ab8af91e2..b4cd7b560 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -43,6 +43,7 @@ type Volume struct { lastCompactIndexOffset uint64 lastCompactRevision uint16 + ldbTimeout int64 isCompacting bool isCommitCompacting bool @@ -53,12 +54,13 @@ type Volume struct { lastIoError error } -func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { +func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, ldbTimeout int64) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind + v.ldbTimeout = ldbTimeout e = v.load(true, true, needleMapKind, preallocate) v.startWorker() return diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index dac0e22a2..78dfa6901 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -2,9 +2,10 @@ package storage import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage/types" "os" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/syndtr/goleveldb/leveldb/opt" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -136,7 +137,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind case NeedleMapInMemory: if v.tmpNm != nil { glog.V(0).Infof("updating memory compact index %s ", v.FileName(".idx")) - err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil) + err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0) } else { glog.V(0).Infoln("loading memory index", v.FileName(".idx"), "to memory") if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { @@ -151,10 +152,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } if v.tmpNm != nil { glog.V(0).Infoln("updating leveldb index", v.FileName(".ldb")) - err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts) + err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout) } else { glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb")) - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil { glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } } @@ -166,10 +167,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } if v.tmpNm != nil { glog.V(0).Infoln("updating leveldb medium index", v.FileName(".ldb")) - err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts) + err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout) } else { glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb")) - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil { glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } } @@ -181,10 +182,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } if v.tmpNm != nil { glog.V(0).Infoln("updating leveldb large index", v.FileName(".ldb")) - err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts) + err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout) } else { glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb")) - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil { glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } } diff --git a/weed/storage/volume_read_test.go b/weed/storage/volume_read_test.go index f1d0aa2fc..02c514083 100644 --- a/weed/storage/volume_read_test.go +++ b/weed/storage/volume_read_test.go @@ -1,17 +1,18 @@ package storage import ( + "testing" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/stretchr/testify/assert" - "testing" ) func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -48,7 +49,7 @@ func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) { func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index d26fc7ab7..079b7ccf5 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -72,7 +72,7 @@ func TestLDBIndexCompaction(t *testing.T) { func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -115,7 +115,7 @@ func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { v.Close() - v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, 0) + v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, 0, 0) if err != nil { t.Fatalf("volume reloading: %v", err) } diff --git a/weed/storage/volume_write_test.go b/weed/storage/volume_write_test.go index 2fd4026cd..9979aa8f5 100644 --- a/weed/storage/volume_write_test.go +++ b/weed/storage/volume_write_test.go @@ -13,7 +13,7 @@ import ( func TestSearchVolumesWithDeletedNeedles(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index 3fc9cf0b4..3968f62e9 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -69,7 +69,7 @@ func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCac WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil { + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts, 0); err != nil { return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) }