From fb635146a139f350b126cdb6d93821cad13cdde5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 18 Apr 2013 00:23:14 -0700 Subject: [PATCH] refactoring needle mapper interface to separate index file storage logic out --- go/replication/store_replicate.go | 2 +- go/storage/needle_map.go | 67 ++++++++++++++++++++++--------- go/storage/store.go | 22 +++++----- go/storage/volume.go | 14 +++---- 4 files changed, 68 insertions(+), 37 deletions(-) diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go index ee5161d4d..cc5d806d2 100644 --- a/go/replication/store_replicate.go +++ b/go/replication/store_replicate.go @@ -36,7 +36,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum if errorStatus != "" { if _, err = s.Delete(volumeId, needle); err != nil { errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + - strconv.FormatUint(uint64(volumeId), 10) + ": " + err.Error() + volumeId.String() + ": " + err.Error() } else { distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index b2e232009..c836a87fb 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -1,12 +1,32 @@ package storage import ( + "bufio" "code.google.com/p/weed-fs/go/util" "fmt" "io" "os" ) +type NeedleMapper interface { + Put(key uint64, offset uint32, size uint32) (int, error) + Get(key uint64) (element *NeedleValue, ok bool) + Delete(key uint64) error + Close() + ContentSize() uint64 + DeletedSize() uint64 + FileCount() int + DeletedCount() int + Visit(visit func(NeedleValue) error) (err error) +} + +type mapMetric struct { + DeletionCounter int `json:"DeletionCounter"` + FileCounter int `json:"FileCounter"` + DeletionByteCounter uint64 `json:"DeletionByteCounter"` + FileByteCounter uint64 `json:"FileByteCounter"` +} + type NeedleMap struct { indexFile *os.File m CompactMap @@ -14,10 +34,7 @@ type NeedleMap struct { //transient bytes []byte - deletionCounter int - fileCounter int - deletionByteCounter uint64 - fileByteCounter uint64 + mapMetric } func NewNeedleMap(file *os.File) *NeedleMap { @@ -35,31 +52,32 @@ const ( func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) + bufferReader := bufio.NewReaderSize(nm.indexFile, 1024*1024) bytes := make([]byte, 16*RowsToRead) - count, e := nm.indexFile.Read(bytes) + count, e := bufferReader.Read(bytes) for count > 0 && e == nil { for i := 0; i < count; i += 16 { key := util.BytesToUint64(bytes[i : i+8]) offset := util.BytesToUint32(bytes[i+8 : i+12]) size := util.BytesToUint32(bytes[i+12 : i+16]) - nm.fileCounter++ - nm.fileByteCounter = nm.fileByteCounter + uint64(size) + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) if offset > 0 { oldSize := nm.m.Set(Key(key), offset, size) //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) if oldSize > 0 { - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } } else { oldSize := nm.m.Delete(Key(key)) //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } } - count, e = nm.indexFile.Read(bytes) + count, e = bufferReader.Read(bytes) } if e == io.EOF { e = nil @@ -72,11 +90,11 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], offset) util.Uint32toBytes(nm.bytes[12:16], size) - nm.fileCounter++ - nm.fileByteCounter = nm.fileByteCounter + uint64(size) + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) if oldSize > 0 { - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } return nm.indexFile.Write(nm.bytes) } @@ -85,7 +103,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { return } func (nm *NeedleMap) Delete(key uint64) error { - nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key))) offset, err := nm.indexFile.Seek(0, 1) if err != nil { return fmt.Errorf("cannot get position of indexfile: %s", err) @@ -100,14 +118,23 @@ func (nm *NeedleMap) Delete(key uint64) error { } return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile, err, plus) } - nm.deletionCounter++ + nm.DeletionCounter++ return nil } func (nm *NeedleMap) Close() { _ = nm.indexFile.Close() } -func (nm *NeedleMap) ContentSize() uint64 { - return nm.fileByteCounter +func (nm NeedleMap) ContentSize() uint64 { + return nm.FileByteCounter +} +func (nm NeedleMap) DeletedSize() uint64 { + return nm.DeletionByteCounter +} +func (nm NeedleMap) FileCount() int { + return nm.FileCounter +} +func (nm NeedleMap) DeletedCount() int { + return nm.DeletionCounter } func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { return nm.m.Visit(visit) diff --git a/go/storage/store.go b/go/storage/store.go index 4bf0f4ef3..954bae0ae 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -121,9 +121,11 @@ func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for k, v := range s.volumes { s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), - RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter, - DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter, - ReadOnly: v.readOnly} + RepType: v.ReplicaType, Version: v.Version(), + FileCount: v.nm.FileCount(), + DeleteCount: v.nm.DeletedCount(), + DeletedByteCount: v.nm.DeletedSize(), + ReadOnly: v.readOnly} stats = append(stats, s) } return stats @@ -140,9 +142,11 @@ func (s *Store) Join() error { stats := new([]*VolumeInfo) for k, v := range s.volumes { s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), - RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter, - DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter, - ReadOnly: v.readOnly} + RepType: v.ReplicaType, Version: v.Version(), + FileCount: v.nm.FileCount(), + DeleteCount: v.nm.DeletedCount(), + DeletedByteCount: v.nm.DeletedSize(), + ReadOnly: v.readOnly} *stats = append(*stats, s) } bytes, _ := json.Marshal(stats) @@ -175,8 +179,8 @@ func (s *Store) Close() { func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { if v := s.volumes[i]; v != nil { if v.readOnly { - err = errors.New("Volume " + i.String() + " is read only!") - return + err = errors.New("Volume " + i.String() + " is read only!") + return } else { size, err = v.write(n) if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { @@ -189,7 +193,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { return } log.Println("volume", i, "not found!") - err = errors.New("Volume " + i.String() + " not found!") + err = errors.New("Volume " + i.String() + " not found!") return } func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { diff --git a/go/storage/volume.go b/go/storage/volume.go index 42a931a6d..98f712433 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -30,7 +30,7 @@ type Volume struct { Id VolumeId dir string dataFile *os.File - nm *NeedleMap + nm NeedleMapper readOnly bool SuperBlock @@ -70,10 +70,10 @@ func (v *Volume) load(alsoLoadIndex bool) error { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) - if ie != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) - } + indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + if ie != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } v.nm, e = LoadNeedleMap(indexFile) } return e @@ -198,7 +198,7 @@ func (v *Volume) read(n *Needle) (int, error) { } func (v *Volume) garbageLevel() float64 { - return float64(v.nm.deletionByteCounter) / float64(v.ContentSize()) + return float64(v.nm.DeletedSize()) / float64(v.ContentSize()) } func (v *Volume) compact() error { @@ -305,5 +305,5 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro return } func (v *Volume) ContentSize() uint64 { - return v.nm.fileByteCounter + return v.nm.ContentSize() }