diff --git a/weed/command/export.go b/weed/command/export.go index a27b88c64..8d664ad3b 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -16,6 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -89,7 +90,7 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, type VolumeFileScanner4Export struct { version needle.Version counter int - needleMap *storage.NeedleMap + needleMap *needle_map.MemDb vid needle.VolumeId } @@ -192,15 +193,10 @@ func runExport(cmd *Command, args []string) bool { fileName = *export.collection + "_" + fileName } vid := needle.VolumeId(*export.volumeId) - indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644) - if err != nil { - glog.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - needleMap, err := storage.LoadBtreeNeedleMap(indexFile) - if err != nil { - glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err) + needleMap := needle_map.NewMemDb() + if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil { + glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err) } volumeFileScanner := &VolumeFileScanner4Export{ diff --git a/weed/command/fix.go b/weed/command/fix.go index 9acf1801f..76bc19f7e 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -32,7 +33,7 @@ var ( type VolumeFileScanner4Fix struct { version needle.Version - nm *storage.NeedleMap + nm *needle_map.MemDb } func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error { @@ -47,11 +48,11 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped()) if n.Size > 0 && n.Size != types.TombstoneFileSize { - pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size) + pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size) glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") - return scanner.nm.Delete(n.Id, types.ToOffset(offset)) + return scanner.nm.Delete(n.Id) } return nil } @@ -73,8 +74,7 @@ func runFix(cmd *Command, args []string) bool { } defer indexFile.Close() - nm := storage.NewBtreeNeedleMap(indexFile) - defer nm.Close() + nm := needle_map.NewMemDb() vid := needle.VolumeId(*fixVolumeId) scanner := &VolumeFileScanner4Fix{ diff --git a/weed/storage/needle_map/btree_map.go b/weed/storage/needle_map/btree_map.go deleted file mode 100644 index a26c5e068..000000000 --- a/weed/storage/needle_map/btree_map.go +++ /dev/null @@ -1,53 +0,0 @@ -package needle_map - -import ( - . "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/google/btree" -) - -//This map assumes mostly inserting increasing keys -type BtreeMap struct { - tree *btree.BTree -} - -func NewBtreeMap() *BtreeMap { - return &BtreeMap{ - tree: btree.New(32), - } -} - -func (cm *BtreeMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) { - found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size}) - if found != nil { - old := found.(NeedleValue) - return old.Offset, old.Size - } - return -} - -func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) { - found := cm.tree.Delete(NeedleValue{key, Offset{}, 0}) - if found != nil { - old := found.(NeedleValue) - return old.Size - } - return -} -func (cm *BtreeMap) Get(key NeedleId) (*NeedleValue, bool) { - found := cm.tree.Get(NeedleValue{key, Offset{}, 0}) - if found != nil { - old := found.(NeedleValue) - return &old, true - } - return nil, false -} - -// Visit visits all entries or stop if any error when visiting -func (cm *BtreeMap) AscendingVisit(visit func(NeedleValue) error) (ret error) { - cm.tree.Ascend(func(item btree.Item) bool { - needle := item.(NeedleValue) - ret = visit(needle) - return ret == nil - }) - return ret -} diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go new file mode 100644 index 000000000..6aba6adeb --- /dev/null +++ b/weed/storage/needle_map/memdb.go @@ -0,0 +1,112 @@ +package needle_map + +import ( + "fmt" + "os" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/idx" + . "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +//This map uses in memory level db +type MemDb struct { + db *leveldb.DB +} + +func NewMemDb() *MemDb { + opts := &opt.Options{} + + var err error + t := &MemDb{} + if t.db, err = leveldb.Open(storage.NewMemStorage(), opts); err != nil { + glog.V(0).Infof("MemDb fails to open: %v", err) + return nil + } + + return t +} + +func (cm *MemDb) Set(key NeedleId, offset Offset, size uint32) error { + + bytes := ToBytes(key, offset, size) + + if err := cm.db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil { + return fmt.Errorf("failed to write temp leveldb: %v", err) + } + return nil +} + +func (cm *MemDb) Delete(key NeedleId) error { + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes, key) + return cm.db.Delete(bytes, nil) + +} +func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) { + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], key) + data, err := cm.db.Get(bytes, nil) + if err != nil || len(data) != OffsetSize+SizeSize { + return nil, false + } + offset := BytesToOffset(data[0:OffsetSize]) + size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) + return &NeedleValue{Key: key, Offset: offset, Size: size}, true +} + +// Visit visits all entries or stop if any error when visiting +func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) { + iter := cm.db.NewIterator(nil, nil) + for iter.Next() { + key := BytesToNeedleId(iter.Key()) + data := iter.Value() + offset := BytesToOffset(data[0:OffsetSize]) + size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) + + needle := NeedleValue{Key: key, Offset: offset, Size: size} + ret = visit(needle) + if ret != nil { + return + } + } + iter.Release() + ret = iter.Error() + + return +} + +func (cm *MemDb) SaveToIdx(idxName string) (ret error) { + idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return + } + defer idxFile.Close() + + return cm.AscendingVisit(func(value NeedleValue) error { + _, err := idxFile.Write(value.ToBytes()) + return err + }) + +} + +func (cm *MemDb) LoadFromIdx(idxName string) (ret error) { + idxFile, err := os.OpenFile(idxName, os.O_RDONLY, 0644) + if err != nil { + return + } + defer idxFile.Close() + + return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error { + if offset.IsZero() || size == TombstoneFileSize { + return nil + } + return cm.Set(key, offset, size) + }) + +} diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index ee639a7e6..e4273f1b2 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -22,24 +22,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap { return nm } -func NewBtreeNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ - m: needle_map.NewBtreeMap(), - } - nm.indexFile = file - return nm -} - func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewCompactNeedleMap(file) return doLoading(file, nm) } -func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) { - nm := NewBtreeNeedleMap(file) - return doLoading(file, nm) -} - func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error { nm.MaybeSetMaxFileKey(key) diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go deleted file mode 100644 index 539f83a87..000000000 --- a/weed/storage/needle_map_metric_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package storage - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - . "github.com/chrislusf/seaweedfs/weed/storage/types" - "io/ioutil" - "math/rand" - "testing" -) - -func TestFastLoadingNeedleMapMetrics(t *testing.T) { - - idxFile, _ := ioutil.TempFile("", "tmp.idx") - nm := NewBtreeNeedleMap(idxFile) - - for i := 0; i < 10000; i++ { - nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1)) - if rand.Float32() < 0.2 { - nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0))) - } - } - - mm, _ := newNeedleMapMetricFromIndexFile(idxFile) - - glog.V(0).Infof("FileCount expected %d actual %d", nm.FileCount(), mm.FileCount()) - glog.V(0).Infof("DeletedSize expected %d actual %d", nm.DeletedSize(), mm.DeletedSize()) - glog.V(0).Infof("ContentSize expected %d actual %d", nm.ContentSize(), mm.ContentSize()) - glog.V(0).Infof("DeletedCount expected %d actual %d", nm.DeletedCount(), mm.DeletedCount()) - glog.V(0).Infof("MaxFileKey expected %d actual %d", nm.MaxFileKey(), mm.MaxFileKey()) -} diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index b3dcdbd9d..704f1f4ef 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -115,6 +115,7 @@ func (v *Volume) CommitCompact() error { if e = v.load(true, false, v.needleMapKind, 0); e != nil { return e } + return nil } func (v *Volume) cleanupCompact() error { @@ -270,7 +271,7 @@ type VolumeFileScanner4Vacuum struct { version needle.Version v *Volume dstBackend backend.BackendStorageFile - nm *NeedleMap + nm *needle_map.MemDb newOffset int64 now uint64 writeThrottler *util.WriteThrottler @@ -295,7 +296,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in nv, ok := scanner.v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize { - if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { + if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil { @@ -312,32 +313,33 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { var ( dst backend.BackendStorageFile - idx *os.File ) if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil { return } defer dst.Close() - if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { - return - } - defer idx.Close() + nm := needle_map.NewMemDb() scanner := &VolumeFileScanner4Vacuum{ v: v, now: uint64(time.Now().Unix()), - nm: NewBtreeNeedleMap(idx), + nm: nm, dstBackend: dst, writeThrottler: util.NewWriteThrottler(compactionBytePerSecond), } err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) + if err != nil { + return nil + } + + err = nm.SaveToIdx(idxName) return } func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { var ( - dst, idx, oldIndexFile *os.File + dst, oldIndexFile *os.File ) if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { return @@ -345,17 +347,13 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { dstDatBackend := backend.NewDiskFile(dst) defer dstDatBackend.Close() - if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { - return - } - defer idx.Close() - if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil { return } defer oldIndexFile.Close() - nm := NewBtreeNeedleMap(idx) + nm := needle_map.NewMemDb() + now := uint64(time.Now().Unix()) v.SuperBlock.CompactionRevision++ @@ -384,7 +382,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if nv.Offset == offset && nv.Size > 0 { - if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil { + if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { @@ -396,5 +394,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { return nil }) + nm.SaveToIdx(idxName) + return }