From aecea226660bb3a5404b840eddba668baff7886d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 10 Feb 2013 14:00:06 -0800 Subject: [PATCH] Issue 15: Stress test corrupts volume --- go/storage/needle_map.go | 20 ++++++++++++++------ go/storage/volume.go | 34 +++++++++++++++++++++------------- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 774ac7627..1d5def880 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -1,9 +1,9 @@ package storage import ( - //"log" - "os" "code.google.com/p/weed-fs/go/util" + "fmt" + "os" ) type NeedleMap struct { @@ -32,7 +32,7 @@ const ( RowsToRead = 1024 ) -func LoadNeedleMap(file *os.File) *NeedleMap { +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) bytes := make([]byte, 16*RowsToRead) count, e := nm.indexFile.Read(bytes) @@ -60,7 +60,7 @@ func LoadNeedleMap(file *os.File) *NeedleMap { count, e = nm.indexFile.Read(bytes) } - return nm + return nm, e } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { @@ -80,13 +80,21 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { element, ok = nm.m.Get(Key(key)) return } -func (nm *NeedleMap) Delete(key uint64) { +func (nm *NeedleMap) Delete(key uint64) error { nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) + offset, err := nm.indexFile.Seek(0, 1) + if err != nil { + return fmt.Errorf("cannot get position of indexfile: %s", err) + } util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], 0) util.Uint32toBytes(nm.bytes[12:16], 0) - nm.indexFile.Write(nm.bytes) + if _, err = nm.indexFile.Write(nm.bytes); err != nil { + nm.indexFile.Truncate(offset) + return fmt.Errorf("error writing to indexfile %s: %s", nm.indexFile, err) + } nm.deletionCounter++ + return nil } func (nm *NeedleMap) Close() { nm.indexFile.Close() diff --git a/go/storage/volume.go b/go/storage/volume.go index 707c6e6f8..516b1589b 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -56,20 +56,18 @@ func (v *Volume) load(alsoLoadIndex bool) error { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) } if v.ReplicaType == CopyNil { - if e = v.readSuperBlock(); e != nil { - return e - } + e = v.readSuperBlock() } else { - v.maybeWriteSuperBlock() + e = v.maybeWriteSuperBlock() } - if alsoLoadIndex { + if e == nil && alsoLoadIndex { indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) if ie != nil { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) } - v.nm = LoadNeedleMap(indexFile) + v.nm, e = LoadNeedleMap(indexFile) } - return nil + return e } func (v *Volume) Version() Version { return v.SuperBlock.Version @@ -90,16 +88,17 @@ func (v *Volume) Close() { v.nm.Close() v.dataFile.Close() } -func (v *Volume) maybeWriteSuperBlock() { +func (v *Volume) maybeWriteSuperBlock() error { stat, e := v.dataFile.Stat() if e != nil { fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e) - return + return e } if stat.Size() == 0 { v.SuperBlock.Version = CurrentVersion - v.dataFile.Write(v.SuperBlock.Bytes()) + _, e = v.dataFile.Write(v.SuperBlock.Bytes()) } + return e } func (v *Volume) readSuperBlock() (err error) { v.dataFile.Seek(0, 0) @@ -129,6 +128,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { return } if size, err = n.Append(v.dataFile, v.Version()); err != nil { + v.dataFile.Truncate(offset) return } nv, ok := v.nm.Get(n.Id) @@ -143,9 +143,17 @@ func (v *Volume) delete(n *Needle) (uint32, error) { nv, ok := v.nm.Get(n.Id) //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) if ok { - v.nm.Delete(n.Id) - v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0) - _, err := n.Append(v.dataFile, v.Version()) + if err := v.nm.Delete(n.Id); err != nil { + return 0, err + } + offset, err := v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0) + if err != nil { + return 0, fmt.Errorf("cannot get datafile (%s) position: %s", v.dataFile, err) + } + if _, err = n.Append(v.dataFile, v.Version()); err != nil { + v.dataFile.Truncate(offset) + return 0, err + } return nv.Size, err } return 0, nil