mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Issue 45 in weed-fs: [Compact issue] Offset overflow
New issue 45 by hieu.hcmus@gmail.com: [Compact issue] Offset overflow http://code.google.com/p/weed-fs/issues/detail?id=45 You are using uint32(Maximum 4Gb) to store needle offset(Maximum 32Gb) when compacting. Currently It is ok if the volume size is < 4gb Change variable "offset" in ScanVolumeFile function to uint64 to fix the issue.
This commit is contained in:
parent
82b74c7940
commit
69ac6b6bf6
|
@ -19,6 +19,9 @@ const (
|
||||||
NeedleChecksumSize = 4
|
NeedleChecksumSize = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Needle file size is limited to 4GB for now.
|
||||||
|
*/
|
||||||
type Needle struct {
|
type Needle struct {
|
||||||
Cookie uint32 `comment:"random number to mitigate brute force lookups"`
|
Cookie uint32 `comment:"random number to mitigate brute force lookups"`
|
||||||
Id uint64 `comment:"needle id"`
|
Id uint64 `comment:"needle id"`
|
||||||
|
|
|
@ -17,9 +17,9 @@ const (
|
||||||
LastModifiedBytesLength = 5
|
LastModifiedBytesLength = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n *Needle) DiskSize() uint32 {
|
func (n *Needle) DiskSize() int64 {
|
||||||
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
|
padding := NeedlePaddingSize - ((NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize) % NeedlePaddingSize)
|
||||||
return NeedleHeaderSize + n.Size + padding + NeedleChecksumSize
|
return NeedleHeaderSize + int64(n.Size) + padding + NeedleChecksumSize
|
||||||
}
|
}
|
||||||
func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
|
func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
|
||||||
if s, ok := w.(io.Seeker); ok {
|
if s, ok := w.(io.Seeker); ok {
|
||||||
|
|
|
@ -307,7 +307,7 @@ func (v *Volume) freeze() error {
|
||||||
|
|
||||||
func ScanVolumeFile(dirname string, id VolumeId,
|
func ScanVolumeFile(dirname string, id VolumeId,
|
||||||
visitSuperBlock func(SuperBlock) error,
|
visitSuperBlock func(SuperBlock) error,
|
||||||
visitNeedle func(n *Needle, offset uint32) error) (err error) {
|
visitNeedle func(n *Needle, offset int64) error) (err error) {
|
||||||
var v *Volume
|
var v *Volume
|
||||||
if v, err = loadVolumeWithoutIndex(dirname, id); err != nil {
|
if v, err = loadVolumeWithoutIndex(dirname, id); err != nil {
|
||||||
return
|
return
|
||||||
|
@ -318,7 +318,7 @@ func ScanVolumeFile(dirname string, id VolumeId,
|
||||||
|
|
||||||
version := v.Version()
|
version := v.Version()
|
||||||
|
|
||||||
offset := uint32(SuperBlockSize)
|
offset := int64(SuperBlockSize)
|
||||||
n, rest, e := ReadNeedleHeader(v.dataFile, version)
|
n, rest, e := ReadNeedleHeader(v.dataFile, version)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
err = fmt.Errorf("cannot read needle header: %s", e)
|
err = fmt.Errorf("cannot read needle header: %s", e)
|
||||||
|
@ -332,7 +332,7 @@ func ScanVolumeFile(dirname string, id VolumeId,
|
||||||
if err = visitNeedle(n, offset); err != nil {
|
if err = visitNeedle(n, offset); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
offset += NeedleHeaderSize + rest
|
offset += int64(NeedleHeaderSize + rest)
|
||||||
if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
|
if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return nil
|
return nil
|
||||||
|
@ -359,17 +359,17 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
|
||||||
defer idx.Close()
|
defer idx.Close()
|
||||||
|
|
||||||
nm := NewNeedleMap(idx)
|
nm := NewNeedleMap(idx)
|
||||||
new_offset := uint32(SuperBlockSize)
|
new_offset := int64(SuperBlockSize)
|
||||||
|
|
||||||
err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
|
err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
|
||||||
_, err = dst.Write(superBlock.Bytes())
|
_, err = dst.Write(superBlock.Bytes())
|
||||||
return err
|
return err
|
||||||
}, func(n *Needle, offset uint32) error {
|
}, func(n *Needle, offset int64) error {
|
||||||
nv, ok := v.nm.Get(n.Id)
|
nv, ok := v.nm.Get(n.Id)
|
||||||
//glog.V(0).Infoln("file size is", n.Size, "rest", rest)
|
//glog.V(0).Infoln("file size is", n.Size, "rest", rest)
|
||||||
if ok && nv.Offset*NeedlePaddingSize == offset {
|
if ok && int64(nv.Offset)*NeedlePaddingSize == offset {
|
||||||
if nv.Size > 0 {
|
if nv.Size > 0 {
|
||||||
if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil {
|
if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
|
||||||
return fmt.Errorf("cannot put needle: %s", err)
|
return fmt.Errorf("cannot put needle: %s", err)
|
||||||
}
|
}
|
||||||
if _, err = n.Append(dst, v.Version()); err != nil {
|
if _, err = n.Append(dst, v.Version()); err != nil {
|
||||||
|
|
|
@ -98,7 +98,7 @@ func runExport(cmd *Command, args []string) bool {
|
||||||
err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error {
|
err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error {
|
||||||
version = superBlock.Version
|
version = superBlock.Version
|
||||||
return nil
|
return nil
|
||||||
}, func(n *storage.Needle, offset uint32) error {
|
}, func(n *storage.Needle, offset int64) error {
|
||||||
debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
|
debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
|
||||||
nv, ok := nm.Get(n.Id)
|
nv, ok := nm.Get(n.Id)
|
||||||
if ok && nv.Size > 0 {
|
if ok && nv.Size > 0 {
|
||||||
|
|
|
@ -45,10 +45,10 @@ func runFix(cmd *Command, args []string) bool {
|
||||||
vid := storage.VolumeId(*fixVolumeId)
|
vid := storage.VolumeId(*fixVolumeId)
|
||||||
err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error {
|
err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error {
|
||||||
return nil
|
return nil
|
||||||
}, func(n *storage.Needle, offset uint32) error {
|
}, func(n *storage.Needle, offset int64) error {
|
||||||
debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
|
debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
|
||||||
if n.Size > 0 {
|
if n.Size > 0 {
|
||||||
count, pe := nm.Put(n.Id, offset/storage.NeedlePaddingSize, n.Size)
|
count, pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size)
|
||||||
debug("saved", count, "with error", pe)
|
debug("saved", count, "with error", pe)
|
||||||
} else {
|
} else {
|
||||||
debug("skipping deleted file ...")
|
debug("skipping deleted file ...")
|
||||||
|
|
Loading…
Reference in a new issue