mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
read volume lastAppendAtNs when loading a volume
This commit is contained in:
parent
ac2727853f
commit
0be2d51c96
|
@ -57,9 +57,11 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM
|
||||||
size, _, _ := v.FileStat()
|
size, _, _ := v.FileStat()
|
||||||
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
|
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
|
||||||
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
|
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
|
||||||
|
// println("volume", vid, "last append at", v.lastAppendAtNs)
|
||||||
} else {
|
} else {
|
||||||
glog.V(0).Infof("new volume %s error %s", name, e)
|
glog.V(0).Infof("new volume %s error %s", name, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,15 +185,17 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version
|
||||||
case Version2, Version3:
|
case Version2, Version3:
|
||||||
err = n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
|
err = n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
|
||||||
}
|
}
|
||||||
if size == 0 || err != nil {
|
if err != nil && err != io.EOF{
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
|
if size > 0 {
|
||||||
newChecksum := NewCRC(n.Data)
|
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
|
||||||
if checksum != newChecksum.Value() {
|
newChecksum := NewCRC(n.Data)
|
||||||
return errors.New("CRC error! Data On Disk Corrupted")
|
if checksum != newChecksum.Value() {
|
||||||
|
return errors.New("CRC error! Data On Disk Corrupted")
|
||||||
|
}
|
||||||
|
n.Checksum = newChecksum
|
||||||
}
|
}
|
||||||
n.Checksum = newChecksum
|
|
||||||
if version == Version3 {
|
if version == Version3 {
|
||||||
tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
|
tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
|
||||||
n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize])
|
n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize])
|
||||||
|
|
|
@ -9,28 +9,29 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
|
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) {
|
||||||
var indexSize int64
|
var indexSize int64
|
||||||
var e error
|
|
||||||
if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
|
if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
|
||||||
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
|
return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
|
||||||
}
|
}
|
||||||
if indexSize == 0 {
|
if indexSize == 0 {
|
||||||
return nil
|
return 0,nil
|
||||||
}
|
}
|
||||||
var lastIdxEntry []byte
|
var lastIdxEntry []byte
|
||||||
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
|
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
|
||||||
return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
|
return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
|
||||||
}
|
}
|
||||||
key, offset, size := IdxFileEntry(lastIdxEntry)
|
key, offset, size := IdxFileEntry(lastIdxEntry)
|
||||||
if offset.IsZero() || size == TombstoneFileSize {
|
if offset.IsZero() {
|
||||||
return nil
|
return 0,nil
|
||||||
}
|
}
|
||||||
if e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
|
if size == TombstoneFileSize {
|
||||||
return fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
|
size = 0
|
||||||
}
|
}
|
||||||
|
if lastAppendAtNs, e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
|
||||||
return nil
|
return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
|
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
|
||||||
|
@ -52,14 +53,13 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) error {
|
func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
|
||||||
n := new(needle.Needle)
|
n := new(needle.Needle)
|
||||||
err := n.ReadData(datFile, offset, size, v)
|
if err = n.ReadData(datFile, offset, size, v); err != nil {
|
||||||
if err != nil {
|
return n.AppendAtNs, err
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
if n.Id != key {
|
if n.Id != key {
|
||||||
return fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
|
return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
|
||||||
}
|
}
|
||||||
return nil
|
return n.AppendAtNs, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,7 +73,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||||
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
|
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
|
if v.lastAppendAtNs, e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
|
||||||
v.readOnly = true
|
v.readOnly = true
|
||||||
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
|
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue