Add boltdb for volume needle map

boltdb is fairly slow to write, about 6 minutes for recreating index
for 1553934 files. Boltdb loads 1,553,934 x 16 = 24,862,944bytes from
disk, and generate the boltdb as large as 134,217,728 bytes in 6
minutes.

To compare, for leveldb, it recreates index in leveldb as large as
27,188,148 bytes in 8 seconds.
For in memory version, it loads the index in

To test the memory consumption, the leveldb or boltdb index are
created. And the server is restarted. Using the benchmark tool to read
lots of files. There are 7 volumes in benchmark collection, each with
about 1553K files.
For leveldb, the memory starts at 142,884KB, and stays at 179,340KB.
For boltdb, the memory starts at 73,756KB, and stays at 144,564KB.
For in-memory, the memory starts at 368,152KB, and stays at 448,032KB.
This commit is contained in:
chrislusf 2015-03-29 11:04:32 -07:00
parent 0d23f49f74
commit 1b6ab2f6af
13 changed files with 310 additions and 89 deletions

View file

@ -7,6 +7,14 @@ import (
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
) )
type NeedleMapType int
const (
NeedleMapInMemory NeedleMapType = iota
NeedleMapLevelDb
NeedleMapBoltDb
)
type NeedleMapper interface { type NeedleMapper interface {
Put(key uint64, offset uint32, size uint32) error Put(key uint64, offset uint32, size uint32) error
Get(key uint64) (element *NeedleValue, ok bool) Get(key uint64) (element *NeedleValue, ok bool)

View file

@ -0,0 +1,182 @@
package storage
import (
"fmt"
"os"
"github.com/boltdb/bolt"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
)
type BoltDbNeedleMap struct {
dbFileName string
indexFile *os.File
db *bolt.DB
mapMetric
}
var boltdbBucket = []byte("weed")
func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) {
m = &BoltDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
if !isBoltDbFresh(dbFileName, indexFile) {
glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
generateBoltDbFile(dbFileName, indexFile)
glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
}
glog.V(1).Infof("Opening %s...", dbFileName)
if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil {
return
}
glog.V(1).Infof("Loading %s...", indexFile.Name())
nm, indexLoadError := LoadNeedleMap(indexFile)
if indexLoadError != nil {
return nil, indexLoadError
}
m.mapMetric = nm.mapMetric
return
}
func isBoltDbFresh(dbFileName string, indexFile *os.File) bool {
// normally we always write to index file first
dbLogFile, err := os.Open(dbFileName)
if err != nil {
return false
}
defer dbLogFile.Close()
dbStat, dbStatErr := dbLogFile.Stat()
indexStat, indexStatErr := indexFile.Stat()
if dbStatErr != nil || indexStatErr != nil {
glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
return false
}
return dbStat.ModTime().After(indexStat.ModTime())
}
func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
db, err := bolt.Open(dbFileName, 0644, nil)
if err != nil {
return err
}
defer db.Close()
return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
if offset > 0 {
boltDbWrite(db, key, offset, size)
} else {
boltDbDelete(db, key)
}
return nil
})
}
func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
bytes := make([]byte, 8)
var data []byte
util.Uint64toBytes(bytes, key)
err := m.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(boltdbBucket)
if bucket == nil {
return fmt.Errorf("Bucket %q not found!", boltdbBucket)
}
data = bucket.Get(bytes)
return nil
})
if err != nil || len(data) != 8 {
glog.V(0).Infof("Failed to get %d %v", key, err)
return nil, false
}
offset := util.BytesToUint32(data[0:4])
size := util.BytesToUint32(data[4:8])
return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
}
func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
var oldSize uint32
if oldNeedle, ok := m.Get(key); ok {
oldSize = oldNeedle.Size
}
m.logPut(key, oldSize, size)
// write to index file first
if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil {
return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
}
return boltDbWrite(m.db, key, offset, size)
}
func boltDbWrite(db *bolt.DB,
key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
return db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
if err != nil {
return err
}
err = bucket.Put(bytes[0:8], bytes[8:16])
if err != nil {
return err
}
return nil
})
}
func boltDbDelete(db *bolt.DB, key uint64) error {
bytes := make([]byte, 8)
util.Uint64toBytes(bytes, key)
return db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
if err != nil {
return err
}
err = bucket.Delete(bytes)
if err != nil {
return err
}
return nil
})
}
func (m *BoltDbNeedleMap) Delete(key uint64) error {
if oldNeedle, ok := m.Get(key); ok {
m.logDelete(oldNeedle.Size)
}
// write to index file first
if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil {
return err
}
return boltDbDelete(m.db, key)
}
func (m *BoltDbNeedleMap) Close() {
m.db.Close()
}
func (m *BoltDbNeedleMap) Destroy() error {
m.Close()
os.Remove(m.indexFile.Name())
return os.Remove(m.dbFileName)
}
func (m *BoltDbNeedleMap) ContentSize() uint64 {
return m.FileByteCounter
}
func (m *BoltDbNeedleMap) DeletedSize() uint64 {
return m.DeletionByteCounter
}
func (m *BoltDbNeedleMap) FileCount() int {
return m.FileCounter
}
func (m *BoltDbNeedleMap) DeletedCount() int {
return m.DeletionCounter
}
func (m *BoltDbNeedleMap) MaxFileKey() uint64 {
return m.MaximumFileKey
}

View file

@ -21,7 +21,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl
m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
if !isLevelDbFresh(dbFileName, indexFile) { if !isLevelDbFresh(dbFileName, indexFile) {
glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
generateDbFile(dbFileName, indexFile) generateLevelDbFile(dbFileName, indexFile)
glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
} }
glog.V(1).Infof("Opening %s...", dbFileName) glog.V(1).Infof("Opening %s...", dbFileName)
@ -54,7 +54,7 @@ func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
return dbStat.ModTime().After(indexStat.ModTime()) return dbStat.ModTime().After(indexStat.ModTime())
} }
func generateDbFile(dbFileName string, indexFile *os.File) error { func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
db, err := leveldb.OpenFile(dbFileName, nil) db, err := leveldb.OpenFile(dbFileName, nil)
if err != nil { if err != nil {
return err return err
@ -75,7 +75,6 @@ func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
util.Uint64toBytes(bytes, key) util.Uint64toBytes(bytes, key)
data, err := m.db.Get(bytes, nil) data, err := m.db.Get(bytes, nil)
if err != nil || len(data) != 8 { if err != nil || len(data) != 8 {
glog.V(0).Infof("Failed to get %d %v", key, err)
return nil, false return nil, false
} }
offset := util.BytesToUint32(data[0:4]) offset := util.BytesToUint32(data[0:4])

View file

@ -90,18 +90,18 @@ func (s *Store) String() (str string) {
return return
} }
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) { func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
s.Locations = make([]*DiskLocation, 0) s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ { for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
location.volumes = make(map[VolumeId]*Volume) location.volumes = make(map[VolumeId]*Volume)
location.loadExistingVolumes(useLevelDb) location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location) s.Locations = append(s.Locations, location)
} }
return return
} }
func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error { func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement) rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil { if e != nil {
return e return e
@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb
if err != nil { if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
} }
e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl) e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
} else { } else {
pair := strings.Split(range_string, "-") pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64) start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
} }
for id := start; id <= end; id++ { for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil { if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
e = err e = err
} }
} }
@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
} }
return ret return ret
} }
func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error { func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil { if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid) return fmt.Errorf("Volume Id %d already exists!", vid)
} }
if location := s.findFreeLocation(); location != nil { if location := s.findFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl) location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil { if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume location.volumes[vid] = volume
return nil return nil
} else { } else {
@ -195,7 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, repl
return fmt.Errorf("No more free space left") return fmt.Errorf("No more free space left")
} }
func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil { if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs { for _, dir := range dirs {
name := dir.Name() name := dir.Name()
@ -208,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) {
} }
if vid, err := NewVolumeId(base); err == nil { if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil { if l.volumes[vid] == nil {
if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil { if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
l.volumes[vid] = v l.volumes[vid] = v
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
} }

View file

@ -14,12 +14,13 @@ import (
) )
type Volume struct { type Volume struct {
Id VolumeId Id VolumeId
dir string dir string
Collection string Collection string
dataFile *os.File dataFile *os.File
nm NeedleMapper nm NeedleMapper
readOnly bool needleMapKind NeedleMapType
readOnly bool
SuperBlock SuperBlock
@ -27,20 +28,22 @@ type Volume struct {
lastModifiedTime uint64 //unix time in seconds lastModifiedTime uint64 //unix time in seconds
} }
func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id} v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true, useLevelDb) v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind)
return return
} }
func (v *Volume) String() string { func (v *Volume) String() string {
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
} }
func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id} v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{} v.SuperBlock = SuperBlock{}
e = v.load(false, false, false) v.needleMapKind = needleMapKind
e = v.load(false, false, needleMapKind)
return return
} }
func (v *Volume) FileName() (fileName string) { func (v *Volume) FileName() (fileName string) {
@ -51,7 +54,7 @@ func (v *Volume) FileName() (fileName string) {
} }
return return
} }
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error { func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
var e error var e error
fileName := v.FileName() fileName := v.FileName()
@ -99,16 +102,22 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bo
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
} }
} }
if !useLevelDb { switch needleMapKind {
case NeedleMapInMemory:
glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
if v.nm, e = LoadNeedleMap(indexFile); e != nil { if v.nm, e = LoadNeedleMap(indexFile); e != nil {
glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
} }
} else { case NeedleMapLevelDb:
glog.V(0).Infoln("loading leveldb file", fileName+".ldb") glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
} }
case NeedleMapBoltDb:
glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
}
} }
} }
return e return e
@ -263,11 +272,12 @@ func (v *Volume) read(n *Needle) (int, error) {
} }
func ScanVolumeFile(dirname string, collection string, id VolumeId, func ScanVolumeFile(dirname string, collection string, id VolumeId,
needleMapKind NeedleMapType,
visitSuperBlock func(SuperBlock) error, visitSuperBlock func(SuperBlock) error,
readNeedleBody bool, readNeedleBody bool,
visitNeedle func(n *Needle, offset int64) error) (err error) { visitNeedle func(n *Needle, offset int64) error) (err error) {
var v *Volume var v *Volume
if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil { if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
return fmt.Errorf("Failed to load volume %d: %v", id, err) return fmt.Errorf("Failed to load volume %d: %v", id, err)
} }
if err = visitSuperBlock(v.SuperBlock); err != nil { if err = visitSuperBlock(v.SuperBlock); err != nil {

View file

@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error {
} }
//glog.V(3).Infof("Pretending to be vacuuming...") //glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second) //time.Sleep(20 * time.Second)
if e = v.load(true, false, false); e != nil { if e = v.load(true, false, v.needleMapKind); e != nil {
return e return e
} }
return nil return nil
@ -63,27 +63,28 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
now := uint64(time.Now().Unix()) now := uint64(time.Now().Unix())
err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error { err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind,
_, err = dst.Write(superBlock.Bytes()) func(superBlock SuperBlock) error {
return err _, err = dst.Write(superBlock.Bytes())
}, true, func(n *Needle, offset int64) error { return err
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { }, true, func(n *Needle, offset int64) error {
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
return nil
}
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, err = n.Append(dst, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
new_offset += n.DiskSize()
glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
}
return nil return nil
} })
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, err = n.Append(dst, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
new_offset += n.DiskSize()
glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
}
return nil
})
return return
} }

View file

@ -33,7 +33,8 @@ func runCompact(cmd *Command, args []string) bool {
} }
vid := storage.VolumeId(*compactVolumeId) vid := storage.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, false, nil, nil) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
storage.NeedleMapInMemory, nil, nil)
if err != nil { if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err) glog.Fatalf("Load Volume [ERROR] %s\n", err)
} }

View file

@ -99,23 +99,25 @@ func runExport(cmd *Command, args []string) bool {
var version storage.Version var version storage.Version
err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error { err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid,
version = superBlock.Version() storage.NeedleMapInMemory,
return nil func(superBlock storage.SuperBlock) error {
}, true, func(n *storage.Needle, offset int64) error { version = superBlock.Version()
nv, ok := nm.Get(n.Id) return nil
glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v", }, true, func(n *storage.Needle, offset int64) error {
n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped(), ok, nv) nv, ok := nm.Get(n.Id)
if ok && nv.Size > 0 && int64(nv.Offset)*8 == offset { glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v",
return walker(vid, n, version) n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped(), ok, nv)
} if ok && nv.Size > 0 && int64(nv.Offset)*8 == offset {
if !ok { return walker(vid, n, version)
glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size) }
} else { if !ok {
glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size) glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size)
} } else {
return nil glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size)
}) }
return nil
})
if err != nil { if err != nil {
glog.Fatalf("Export Volume File [ERROR] %s\n", err) glog.Fatalf("Export Volume File [ERROR] %s\n", err)
} }

View file

@ -47,19 +47,21 @@ func runFix(cmd *Command, args []string) bool {
defer nm.Close() defer nm.Close()
vid := storage.VolumeId(*fixVolumeId) vid := storage.VolumeId(*fixVolumeId)
err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, func(superBlock storage.SuperBlock) error { err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid,
return nil storage.NeedleMapInMemory,
}, false, func(n *storage.Needle, offset int64) error { func(superBlock storage.SuperBlock) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped()) return nil
if n.Size > 0 { }, false, func(n *storage.Needle, offset int64) error {
pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped())
glog.V(2).Infof("saved %d with error %v", n.Size, pe) if n.Size > 0 {
} else { pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size)
glog.V(2).Infof("skipping deleted file ...") glog.V(2).Infof("saved %d with error %v", n.Size, pe)
return nm.Delete(n.Id) } else {
} glog.V(2).Infof("skipping deleted file ...")
return nil return nm.Delete(n.Id)
}) }
return nil
})
if err != nil { if err != nil {
glog.Fatalf("Export Volume File [ERROR] %s\n", err) glog.Fatalf("Export Volume File [ERROR] %s\n", err)
} }

View file

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server" "github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -68,7 +69,7 @@ var (
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
volumeUseLevelDb = cmdServer.Flag.Bool("volume.leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.") volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.")
volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
@ -233,10 +234,17 @@ func runServer(cmd *Command, args []string) bool {
if isSeperatedPublicPort { if isSeperatedPublicPort {
publicVolumeMux = http.NewServeMux() publicVolumeMux = http.NewServeMux()
} }
volumeNeedleMapKind := storage.NeedleMapInMemory
switch *volumeIndexType {
case "leveldb":
volumeNeedleMapKind = storage.NeedleMapLevelDb
case "boltdb":
volumeNeedleMapKind = storage.NeedleMapBoltDb
}
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*serverIp, *volumePort, *serverPublicUrl, *serverIp, *volumePort, *serverPublicUrl,
folders, maxCounts, folders, maxCounts,
*volumeUseLevelDb, volumeNeedleMapKind,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation, serverWhiteList, *volumeFixJpgOrientation,
) )

View file

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server" "github.com/chrislusf/weed-fs/go/weed/weed_server"
) )
@ -32,7 +33,7 @@ type VolumeServerOptions struct {
dataCenter *string dataCenter *string
rack *string rack *string
whiteList []string whiteList []string
useLevelDb *bool indexType *string
fixJpgOrientation *bool fixJpgOrientation *bool
} }
@ -49,7 +50,7 @@ func init() {
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
v.useLevelDb = cmdVolume.Flag.Bool("leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.") v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
} }
@ -115,10 +116,17 @@ func runVolume(cmd *Command, args []string) bool {
publicVolumeMux = http.NewServeMux() publicVolumeMux = http.NewServeMux()
} }
volumeNeedleMapKind := storage.NeedleMapInMemory
switch *v.indexType {
case "leveldb":
volumeNeedleMapKind = storage.NeedleMapLevelDb
case "boltdb":
volumeNeedleMapKind = storage.NeedleMapBoltDb
}
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl, *v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits, v.folders, v.folderMaxLimits,
*v.useLevelDb, volumeNeedleMapKind,
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList, v.whiteList,
*v.fixJpgOrientation, *v.fixJpgOrientation,

View file

@ -20,14 +20,14 @@ type VolumeServer struct {
store *storage.Store store *storage.Store
guard *security.Guard guard *security.Guard
UseLevelDb bool needleMapKind storage.NeedleMapType
FixJpgOrientation bool FixJpgOrientation bool
} }
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string, port int, publicUrl string,
folders []string, maxCounts []int, folders []string, maxCounts []int,
useLevelDb bool, needleMapKind storage.NeedleMapType,
masterNode string, pulseSeconds int, masterNode string, pulseSeconds int,
dataCenter string, rack string, dataCenter string, rack string,
whiteList []string, whiteList []string,
@ -36,11 +36,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,
dataCenter: dataCenter, dataCenter: dataCenter,
rack: rack, rack: rack,
UseLevelDb: useLevelDb, needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation, FixJpgOrientation: fixJpgOrientation,
} }
vs.SetMasterNode(masterNode) vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.UseLevelDb) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
vs.guard = security.NewGuard(whiteList, "") vs.guard = security.NewGuard(whiteList, "")
@ -77,7 +77,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
glog.V(0).Infoln("Volume Server Connected with master at", master) glog.V(0).Infoln("Volume Server Connected with master at", master)
} }
} else { } else {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs, err) glog.V(0).Infof("Volume Server Failed to talk with master %+v: %v", vs, err)
if connected { if connected {
connected = false connected = false
} }

View file

@ -17,7 +17,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
} }
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.UseLevelDb, r.FormValue("replication"), r.FormValue("ttl")) err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl"))
if err == nil { if err == nil {
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else { } else {