diff --git a/weed-fs/src/pkg/storage/cdb_map.go b/weed-fs/src/pkg/storage/cdb_map.go deleted file mode 100644 index b80b3cde9..000000000 --- a/weed-fs/src/pkg/storage/cdb_map.go +++ /dev/null @@ -1,107 +0,0 @@ -package storage - -import ( - "github.com/tgulacsi/go-cdb" - "io" - "log" - "os" - "pkg/util" - "strings" -) - -type CdbMap struct { - db *cdb.Cdb - transient []byte - Filename string -} - -// Opens the CDB file and servers as a needle map -func NewCdbMap(filename string) (*CdbMap, error) { - m, err := cdb.Open(filename) - if err != nil { - return nil, err - } - return &CdbMap{db: m, transient: make([]byte, 8), - Filename: filename}, nil -} - -// writes the content of the index file to a CDB and returns that -func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) { - nm := indexFile.Name() - nm = nm[strings.LastIndex(nm, ".")+1:] + "cdb" - - var ( - key uint64 - offset uint32 - ok bool - ) - deleted := make(map[uint64]bool, 16) - gatherDeletes := func(buf []byte) error { - key = util.BytesToUint64(buf[:8]) - offset = util.BytesToUint32(buf[8:12]) - if offset > 0 { - if _, ok = deleted[key]; ok { //undelete - delete(deleted, key) - } - } else { - deleted[key] = true - } - return nil - } - if err := readIndexFile(indexFile, gatherDeletes); err != nil { - return nil, err - } - - w, err := cdb.NewWriter(nm) - if err != nil { - return nil, err - } - iterFun := func(buf []byte) error { - key = util.BytesToUint64(buf[:8]) - if _, ok = deleted[key]; !ok { - w.PutPair(buf[:8], buf[8:16]) - } - return nil - } - indexFile.Seek(0, 0) - err = readIndexFile(indexFile, iterFun) - w.Close() - if err != nil { - return nil, err - } - - return NewCdbMap(nm) -} - -func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) { - util.Uint64toBytes(m.transient, uint64(key)) - data, err := m.db.Data(m.transient) - if err != nil { - if err == io.EOF { - return nil, false - } - log.Printf("error getting %s: %s", key, err) - return nil, false - } - return &NeedleValue{Key: key, - Offset: util.BytesToUint32(data[:4]), - Size: util.BytesToUint32(data[4:8]), - }, true -} - -func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) { - r, err := os.Open(m.Filename) - if err != nil { - return err - } - defer r.Close() - - iterFunc := func(elt cdb.Element) error { - return pedestrian(&NeedleValue{ - Key: Key(util.BytesToUint64(elt.Key[:8])), - Offset: util.BytesToUint32(elt.Data[:4]), - Size: util.BytesToUint32(elt.Data[4:8]), - }) - } - return cdb.DumpMap(r, iterFunc) -} diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 61cc2c841..90ed42198 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -109,8 +109,8 @@ type CompactMap struct { list []CompactSection } -func NewCompactMap() *CompactMap { - return &CompactMap{} +func NewCompactMap() CompactMap { + return CompactMap{} } func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index 9d7369509..9ce38c1f2 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -1,7 +1,6 @@ package storage import ( - "io" "log" "os" "pkg/util" @@ -9,8 +8,7 @@ import ( type NeedleMap struct { indexFile *os.File - m MapGetSetter // modifiable map - fm MapGetter // frozen map + m CompactMap //transient bytes []byte @@ -21,106 +19,52 @@ type NeedleMap struct { fileByteCounter uint64 } -// Map interface for frozen maps -type MapGetter interface { - Get(key Key) (element *NeedleValue, ok bool) - Walk(pedestrian func(*NeedleValue) error) error -} - -// Modifiable map interface -type MapSetter interface { - Set(key Key, offset, size uint32) (oldsize uint32) - Delete(key Key) uint32 -} - -// Settable and gettable map -type MapGetSetter interface { - MapGetter - MapSetter -} - -// New in-memory needle map, backed by "file" index file func NewNeedleMap(file *os.File) *NeedleMap { - return &NeedleMap{ + nm := &NeedleMap{ m: NewCompactMap(), bytes: make([]byte, 16), indexFile: file, } -} - -// Nes frozen (on-disk, not modifiable(!)) needle map -func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) { - fm, err := NewCdbMapFromIndex(file) - if err != nil { - return nil, err - } - return &NeedleMap{ - fm: fm, - bytes: make([]byte, 16), - }, nil + return nm } const ( RowsToRead = 1024 ) -func LoadNeedleMap(file *os.File) (*NeedleMap, error) { +func LoadNeedleMap(file *os.File) *NeedleMap { nm := NewNeedleMap(file) - - var ( - key uint64 - offset, size, oldSize uint32 - ) - iterFun := func(buf []byte) error { - key = util.BytesToUint64(buf[:8]) - offset = util.BytesToUint32(buf[8:12]) - size = util.BytesToUint32(buf[12:16]) - nm.fileCounter++ - nm.fileByteCounter = nm.fileByteCounter + uint64(size) - if offset > 0 { - oldSize = nm.m.Set(Key(key), offset, size) - //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) - } - } else { - nm.m.Delete(Key(key)) - //log.Println("removing key", key) - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) - } - - return nil - } - if err := readIndexFile(file, iterFun); err != nil { - return nil, err - } - return nm, nil -} - -// calls iterFun with each row (raw 16 bytes) -func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error { - buf := make([]byte, 16*RowsToRead) - count, e := io.ReadAtLeast(indexFile, buf, 16) - if e != nil && count > 0 { - fstat, err := indexFile.Stat() - if err != nil { - log.Println("ERROR stating %s: %s", indexFile, err) - } else { - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) - } + bytes := make([]byte, 16*RowsToRead) + count, e := nm.indexFile.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) } for count > 0 && e == nil { for i := 0; i < count; i += 16 { - if e = iterFun(buf[i : i+16]); e != nil { - return e + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + } else { + nm.m.Delete(Key(key)) + //log.Println("removing key", key) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) } } - count, e = io.ReadAtLeast(indexFile, buf, 16) + count, e = nm.indexFile.Read(bytes) } - return nil + return nm } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 71dfb5aee..5e64d0763 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -48,8 +48,8 @@ func (v *Volume) load() error { if ie != nil { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) } - v.nm, e = LoadNeedleMap(indexFile) - return e + v.nm = LoadNeedleMap(indexFile) + return nil } func (v *Volume) Version() Version { return v.version