From 0be2d51c96e4c2c05cad5ac3d6104ccb7e1adeb6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 19 Apr 2019 01:56:38 -0700 Subject: [PATCH] read volume lastAppendAtNs when loading a volume --- weed/storage/disk_location.go | 2 ++ weed/storage/needle/needle_read_write.go | 14 +++++----- weed/storage/volume_checking.go | 34 ++++++++++++------------ weed/storage/volume_loading.go | 2 +- 4 files changed, 28 insertions(+), 24 deletions(-) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 6bfb1ff76..a4a3c519e 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -57,9 +57,11 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM size, _, _ := v.FileStat() glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) + // println("volume", vid, "last append at", v.lastAppendAtNs) } else { glog.V(0).Infof("new volume %s error %s", name, e) } + } } } diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index d1df7d4e2..e1ad075b4 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -185,15 +185,17 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version case Version2, Version3: err = n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) } - if size == 0 || err != nil { + if err != nil && err != io.EOF{ return err } - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) - newChecksum := NewCRC(n.Data) - if checksum != newChecksum.Value() { - return errors.New("CRC error! Data On Disk Corrupted") + if size > 0 { + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) + newChecksum := NewCRC(n.Data) + if checksum != newChecksum.Value() { + return errors.New("CRC error! Data On Disk Corrupted") + } + n.Checksum = newChecksum } - n.Checksum = newChecksum if version == Version3 { tsOffset := NeedleHeaderSize + size + NeedleChecksumSize n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize]) diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 9599885ec..980656823 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -9,28 +9,29 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { +func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) { var indexSize int64 - var e error if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) } if indexSize == 0 { - return nil + return 0,nil } var lastIdxEntry []byte if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil { - return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } key, offset, size := IdxFileEntry(lastIdxEntry) - if offset.IsZero() || size == TombstoneFileSize { - return nil + if offset.IsZero() { + return 0,nil } - if e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { - return fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if size == TombstoneFileSize { + size = 0 } - - return nil + if lastAppendAtNs, e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { + return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + } + return } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { @@ -52,14 +53,13 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err return } -func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) error { +func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) { n := new(needle.Needle) - err := n.ReadData(datFile, offset, size, v) - if err != nil { - return err + if err = n.ReadData(datFile, offset, size, v); err != nil { + return n.AppendAtNs, err } if n.Id != key { - return fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id) + return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id) } - return nil + return n.AppendAtNs, err } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index eb82ffe98..cfc59eb09 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -73,7 +73,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - if e = CheckVolumeDataIntegrity(v, indexFile); e != nil { + if v.lastAppendAtNs, e = CheckVolumeDataIntegrity(v, indexFile); e != nil { v.readOnly = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e) }