mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Optimiz leveldb metric (#3830)
* optimiz updating mapmetric for leveldb * import loading leveldb * add comments
This commit is contained in:
parent
d21e2f523d
commit
84c401e693
|
@ -48,7 +48,6 @@ type TempNeedleMapper interface {
|
||||||
NeedleMapper
|
NeedleMapper
|
||||||
DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error
|
DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error
|
||||||
UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error
|
UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error
|
||||||
UpdateNeedleMapMetric(indexFile *os.File) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nm *baseNeedleMapper) IndexFileSize() uint64 {
|
func (nm *baseNeedleMapper) IndexFileSize() uint64 {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||||
|
@ -179,6 +180,7 @@ func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, update
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func levelDbDelete(db *leveldb.DB, key NeedleId) error {
|
func levelDbDelete(db *leveldb.DB, key NeedleId) error {
|
||||||
bytes := make([]byte, NeedleIdSize)
|
bytes := make([]byte, NeedleIdSize)
|
||||||
NeedleIdToBytes(bytes, key)
|
NeedleIdToBytes(bytes, key)
|
||||||
|
@ -305,23 +307,45 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF
|
||||||
}
|
}
|
||||||
|
|
||||||
err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
|
err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
|
||||||
if !offset.IsZero() && size.IsValid() {
|
m.mapMetric.FileCounter++
|
||||||
|
bytes := make([]byte, NeedleIdSize)
|
||||||
|
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
|
||||||
|
// fresh loading
|
||||||
|
if startFrom == 0 {
|
||||||
|
m.mapMetric.FileByteCounter += uint64(size)
|
||||||
|
e = levelDbWrite(db, key, offset, size, false, 0)
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
// increment loading
|
||||||
|
data, err := db.Get(bytes, nil)
|
||||||
|
if err != nil {
|
||||||
|
if !strings.Contains(strings.ToLower(err.Error()), "not found") {
|
||||||
|
// unexpected error
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// new needle, unlikely happen
|
||||||
|
m.mapMetric.FileByteCounter += uint64(size)
|
||||||
e = levelDbWrite(db, key, offset, size, false, 0)
|
e = levelDbWrite(db, key, offset, size, false, 0)
|
||||||
} else {
|
} else {
|
||||||
e = levelDbDelete(db, key)
|
// needle is found
|
||||||
|
oldSize := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
|
||||||
|
oldOffset := BytesToOffset(data[0:OffsetSize])
|
||||||
|
if !offset.IsZero() && size.IsValid() {
|
||||||
|
// updated needle
|
||||||
|
m.mapMetric.FileByteCounter += uint64(size)
|
||||||
|
if !oldOffset.IsZero() && oldSize.IsValid() {
|
||||||
|
m.mapMetric.DeletionCounter++
|
||||||
|
m.mapMetric.DeletionByteCounter += uint64(oldSize)
|
||||||
|
}
|
||||||
|
e = levelDbWrite(db, key, offset, size, false, 0)
|
||||||
|
} else {
|
||||||
|
// deleted needle
|
||||||
|
m.mapMetric.DeletionCounter++
|
||||||
|
m.mapMetric.DeletionByteCounter += uint64(oldSize)
|
||||||
|
e = levelDbDelete(db, key)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if startFrom != 0 {
|
|
||||||
return needleMapMetricFromIndexFile(indexFile, &m.mapMetric)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *LevelDbNeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error {
|
|
||||||
return needleMapMetricFromIndexFile(indexFile, &m.mapMetric)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,7 +129,3 @@ func (nm *NeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom ui
|
||||||
|
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *NeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -219,15 +219,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
|
||||||
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
|
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
|
||||||
}
|
}
|
||||||
if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset {
|
if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset {
|
||||||
if v.needleMapKind == NeedleMapInMemory {
|
return nil
|
||||||
return nil
|
|
||||||
}
|
|
||||||
newIdx, err := os.OpenFile(newIdxFileName, os.O_RDWR, 0644)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
|
|
||||||
}
|
|
||||||
defer newIdx.Close()
|
|
||||||
return v.tmpNm.UpdateNeedleMapMetric(newIdx)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fail if the old .dat file has changed to a new revision
|
// fail if the old .dat file has changed to a new revision
|
||||||
|
|
Loading…
Reference in a new issue