volume: add special handling for .dat larger than 32GB

This commit is contained in:
Chris Lu 2020-10-27 13:11:56 -07:00
parent 44921220b0
commit 06c15ab35c
2 changed files with 11 additions and 0 deletions

View file

@ -24,6 +24,8 @@ const (
TtlBytesLength = 2 TtlBytesLength = 2
) )
var ErrorSizeMismatch = errors.New("size mismatch")
func (n *Needle) DiskSize(version Version) int64 { func (n *Needle) DiskSize(version Version) int64 {
return GetActualSize(n.Size, version) return GetActualSize(n.Size, version)
} }
@ -168,6 +170,11 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi
func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) { func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) {
n.ParseNeedleHeader(bytes) n.ParseNeedleHeader(bytes)
if n.Size != size { if n.Size != size {
// cookie is not always passed in for this API. Use size to do preliminary checking.
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
return ErrorSizeMismatch
}
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
} }
switch version { switch version {

View file

@ -17,6 +17,7 @@ import (
var ErrorNotFound = errors.New("not found") var ErrorNotFound = errors.New("not found")
var ErrorDeleted = errors.New("already deleted") var ErrorDeleted = errors.New("already deleted")
var ErrorSizeMismatch = errors.New("size mismatch")
// isFileUnchanged checks whether this needle to write is same as last one. // isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume. // It requires serialized access in the same volume.
@ -274,6 +275,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
return 0, nil return 0, nil
} }
err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version()) err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
}
if err != nil { if err != nil {
return 0, err return 0, err
} }