mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
add CdbMap
This commit is contained in:
parent
5d2a1e8d48
commit
bf0ccf3461
107
weed-fs/src/pkg/storage/cdb_map.go
Normal file
107
weed-fs/src/pkg/storage/cdb_map.go
Normal file
|
@ -0,0 +1,107 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/tgulacsi/go-cdb"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"pkg/util"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CdbMap struct {
|
||||||
|
db *cdb.Cdb
|
||||||
|
transient []byte
|
||||||
|
Filename string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Opens the CDB file and servers as a needle map
|
||||||
|
func NewCdbMap(filename string) (*CdbMap, error) {
|
||||||
|
m, err := cdb.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &CdbMap{db: m, transient: make([]byte, 8),
|
||||||
|
Filename: filename}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// writes the content of the index file to a CDB and returns that
|
||||||
|
func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) {
|
||||||
|
nm := indexFile.Name()
|
||||||
|
nm = nm[strings.LastIndex(nm, ".")+1:] + "cdb"
|
||||||
|
|
||||||
|
var (
|
||||||
|
key uint64
|
||||||
|
offset uint32
|
||||||
|
ok bool
|
||||||
|
)
|
||||||
|
deleted := make(map[uint64]bool, 16)
|
||||||
|
gatherDeletes := func(buf []byte) error {
|
||||||
|
key = util.BytesToUint64(buf[:8])
|
||||||
|
offset = util.BytesToUint32(buf[8:12])
|
||||||
|
if offset > 0 {
|
||||||
|
if _, ok = deleted[key]; ok { //undelete
|
||||||
|
delete(deleted, key)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
deleted[key] = true
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := readIndexFile(indexFile, gatherDeletes); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := cdb.NewWriter(nm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
iterFun := func(buf []byte) error {
|
||||||
|
key = util.BytesToUint64(buf[:8])
|
||||||
|
if _, ok = deleted[key]; !ok {
|
||||||
|
w.PutPair(buf[:8], buf[8:16])
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
indexFile.Seek(0, 0)
|
||||||
|
err = readIndexFile(indexFile, iterFun)
|
||||||
|
w.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewCdbMap(nm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) {
|
||||||
|
util.Uint64toBytes(m.transient, uint64(key))
|
||||||
|
data, err := m.db.Data(m.transient)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
log.Printf("error getting %s: %s", key, err)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return &NeedleValue{Key: key,
|
||||||
|
Offset: util.BytesToUint32(data[:4]),
|
||||||
|
Size: util.BytesToUint32(data[4:8]),
|
||||||
|
}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
|
||||||
|
r, err := os.Open(m.Filename)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
iterFunc := func(elt cdb.Element) error {
|
||||||
|
return pedestrian(&NeedleValue{
|
||||||
|
Key: Key(util.BytesToUint64(elt.Key[:8])),
|
||||||
|
Offset: util.BytesToUint32(elt.Data[:4]),
|
||||||
|
Size: util.BytesToUint32(elt.Data[4:8]),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return cdb.DumpMap(r, iterFunc)
|
||||||
|
}
|
|
@ -109,8 +109,8 @@ type CompactMap struct {
|
||||||
list []CompactSection
|
list []CompactSection
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCompactMap() CompactMap {
|
func NewCompactMap() *CompactMap {
|
||||||
return CompactMap{}
|
return &CompactMap{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
|
func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"pkg/util"
|
"pkg/util"
|
||||||
|
@ -8,7 +9,8 @@ import (
|
||||||
|
|
||||||
type NeedleMap struct {
|
type NeedleMap struct {
|
||||||
indexFile *os.File
|
indexFile *os.File
|
||||||
m CompactMap
|
m MapGetSetter // modifiable map
|
||||||
|
fm MapGetter // frozen map
|
||||||
|
|
||||||
//transient
|
//transient
|
||||||
bytes []byte
|
bytes []byte
|
||||||
|
@ -19,52 +21,106 @@ type NeedleMap struct {
|
||||||
fileByteCounter uint64
|
fileByteCounter uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Map interface for frozen maps
|
||||||
|
type MapGetter interface {
|
||||||
|
Get(key Key) (element *NeedleValue, ok bool)
|
||||||
|
Walk(pedestrian func(*NeedleValue) error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Modifiable map interface
|
||||||
|
type MapSetter interface {
|
||||||
|
Set(key Key, offset, size uint32) (oldsize uint32)
|
||||||
|
Delete(key Key) uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
// Settable and gettable map
|
||||||
|
type MapGetSetter interface {
|
||||||
|
MapGetter
|
||||||
|
MapSetter
|
||||||
|
}
|
||||||
|
|
||||||
|
// New in-memory needle map, backed by "file" index file
|
||||||
func NewNeedleMap(file *os.File) *NeedleMap {
|
func NewNeedleMap(file *os.File) *NeedleMap {
|
||||||
nm := &NeedleMap{
|
return &NeedleMap{
|
||||||
m: NewCompactMap(),
|
m: NewCompactMap(),
|
||||||
bytes: make([]byte, 16),
|
bytes: make([]byte, 16),
|
||||||
indexFile: file,
|
indexFile: file,
|
||||||
}
|
}
|
||||||
return nm
|
}
|
||||||
|
|
||||||
|
// Nes frozen (on-disk, not modifiable(!)) needle map
|
||||||
|
func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||||
|
fm, err := NewCdbMapFromIndex(file)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &NeedleMap{
|
||||||
|
fm: fm,
|
||||||
|
bytes: make([]byte, 16),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
RowsToRead = 1024
|
RowsToRead = 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
func LoadNeedleMap(file *os.File) *NeedleMap {
|
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||||
nm := NewNeedleMap(file)
|
nm := NewNeedleMap(file)
|
||||||
bytes := make([]byte, 16*RowsToRead)
|
|
||||||
count, e := nm.indexFile.Read(bytes)
|
var (
|
||||||
if count > 0 {
|
key uint64
|
||||||
fstat, _ := file.Stat()
|
offset, size, oldSize uint32
|
||||||
log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
|
)
|
||||||
|
iterFun := func(buf []byte) error {
|
||||||
|
key = util.BytesToUint64(buf[:8])
|
||||||
|
offset = util.BytesToUint32(buf[8:12])
|
||||||
|
size = util.BytesToUint32(buf[12:16])
|
||||||
|
nm.fileCounter++
|
||||||
|
nm.fileByteCounter = nm.fileByteCounter + uint64(size)
|
||||||
|
if offset > 0 {
|
||||||
|
oldSize = nm.m.Set(Key(key), offset, size)
|
||||||
|
//log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
|
||||||
|
if oldSize > 0 {
|
||||||
|
nm.deletionCounter++
|
||||||
|
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
nm.m.Delete(Key(key))
|
||||||
|
//log.Println("removing key", key)
|
||||||
|
nm.deletionCounter++
|
||||||
|
nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := readIndexFile(file, iterFun); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return nm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// calls iterFun with each row (raw 16 bytes)
|
||||||
|
func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error {
|
||||||
|
buf := make([]byte, 16*RowsToRead)
|
||||||
|
count, e := io.ReadAtLeast(indexFile, buf, 16)
|
||||||
|
if e != nil && count > 0 {
|
||||||
|
fstat, err := indexFile.Stat()
|
||||||
|
if err != nil {
|
||||||
|
log.Println("ERROR stating %s: %s", indexFile, err)
|
||||||
|
} else {
|
||||||
|
log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for count > 0 && e == nil {
|
for count > 0 && e == nil {
|
||||||
for i := 0; i < count; i += 16 {
|
for i := 0; i < count; i += 16 {
|
||||||
key := util.BytesToUint64(bytes[i : i+8])
|
if e = iterFun(buf[i : i+16]); e != nil {
|
||||||
offset := util.BytesToUint32(bytes[i+8 : i+12])
|
return e
|
||||||
size := util.BytesToUint32(bytes[i+12 : i+16])
|
|
||||||
nm.fileCounter++
|
|
||||||
nm.fileByteCounter = nm.fileByteCounter + uint64(size)
|
|
||||||
if offset > 0 {
|
|
||||||
oldSize := nm.m.Set(Key(key), offset, size)
|
|
||||||
//log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
|
|
||||||
if oldSize > 0 {
|
|
||||||
nm.deletionCounter++
|
|
||||||
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
nm.m.Delete(Key(key))
|
|
||||||
//log.Println("removing key", key)
|
|
||||||
nm.deletionCounter++
|
|
||||||
nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
count, e = nm.indexFile.Read(bytes)
|
count, e = io.ReadAtLeast(indexFile, buf, 16)
|
||||||
}
|
}
|
||||||
return nm
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
|
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
|
||||||
|
|
|
@ -48,8 +48,8 @@ func (v *Volume) load() error {
|
||||||
if ie != nil {
|
if ie != nil {
|
||||||
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
||||||
}
|
}
|
||||||
v.nm = LoadNeedleMap(indexFile)
|
v.nm, e = LoadNeedleMap(indexFile)
|
||||||
return nil
|
return e
|
||||||
}
|
}
|
||||||
func (v *Volume) Version() Version {
|
func (v *Volume) Version() Version {
|
||||||
return v.version
|
return v.version
|
||||||
|
|
Loading…
Reference in a new issue