mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
reduce file seek when writing
This commit is contained in:
parent
81904ad336
commit
1478d7ea21
|
@ -3,7 +3,6 @@ package storage
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
@ -28,20 +27,19 @@ func (n *Needle) DiskSize(version Version) int64 {
|
||||||
return getActualSize(n.Size, version)
|
return getActualSize(n.Size, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize int64, err error) {
|
func (n *Needle) Append(w *os.File, version Version) (offset Offset, size uint32, actualSize int64, err error) {
|
||||||
if s, ok := w.(io.Seeker); ok {
|
if end, e := w.Seek(0, 2); e == nil {
|
||||||
if end, e := s.Seek(0, 1); e == nil {
|
defer func(w *os.File, off int64) {
|
||||||
defer func(s io.Seeker, off int64) {
|
if err != nil {
|
||||||
if err != nil {
|
if te := w.Truncate(end); te != nil {
|
||||||
if _, e = s.Seek(off, 0); e != nil {
|
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
|
||||||
glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}(s, end)
|
}
|
||||||
} else {
|
}(w, end)
|
||||||
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
|
offset = Offset(end)
|
||||||
return
|
} else {
|
||||||
}
|
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
switch version {
|
switch version {
|
||||||
case Version1:
|
case Version1:
|
||||||
|
@ -159,9 +157,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
|
||||||
_, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding])
|
_, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding])
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.DataSize, getActualSize(n.Size, version), err
|
return offset, n.DataSize, getActualSize(n.Size, version), err
|
||||||
}
|
}
|
||||||
return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
return 0, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
|
func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
|
||||||
|
|
|
@ -200,7 +200,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
|
||||||
}
|
}
|
||||||
// TODO: count needle size ahead
|
// TODO: count needle size ahead
|
||||||
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
|
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
|
||||||
size, err = v.writeNeedle(n)
|
_, size, err = v.writeNeedle(n)
|
||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize())
|
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize())
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
|
func (v *Volume) writeNeedle(n *Needle) (offset Offset, size uint32, err error) {
|
||||||
glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
|
glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
|
||||||
if v.readOnly {
|
if v.readOnly {
|
||||||
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
|
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
|
||||||
|
@ -87,32 +87,15 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
|
||||||
glog.V(4).Infof("needle is unchanged!")
|
glog.V(4).Infof("needle is unchanged!")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var offset int64
|
|
||||||
if offset, err = v.dataFile.Seek(0, 2); err != nil {
|
|
||||||
glog.V(0).Infof("failed to seek the end of file: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//ensure file writing starting from aligned positions
|
|
||||||
if offset%NeedlePaddingSize != 0 {
|
|
||||||
offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
|
|
||||||
if offset, err = v.dataFile.Seek(offset, 0); err != nil {
|
|
||||||
glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
if size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
|
if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
|
||||||
if e := v.dataFile.Truncate(offset); e != nil {
|
|
||||||
err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
nv, ok := v.nm.Get(n.Id)
|
nv, ok := v.nm.Get(n.Id)
|
||||||
if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
|
if !ok || Offset(nv.Offset)*NeedlePaddingSize < offset {
|
||||||
if err = v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size); err != nil {
|
if err = v.nm.Put(n.Id, offset/NeedlePaddingSize, n.Size); err != nil {
|
||||||
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
|
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,16 +116,15 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) {
|
||||||
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
|
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
|
||||||
if ok && nv.Size != TombstoneFileSize {
|
if ok && nv.Size != TombstoneFileSize {
|
||||||
size := nv.Size
|
size := nv.Size
|
||||||
offset, err := v.dataFile.Seek(0, 2)
|
n.Data = nil
|
||||||
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
|
offset, _, _, err := n.Append(v.dataFile, v.Version())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
if err := v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize)); err != nil {
|
if err = v.nm.Delete(n.Id, offset/NeedlePaddingSize); err != nil {
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
n.Data = nil
|
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
|
||||||
_, _, err = n.Append(v.dataFile, v.Version())
|
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
return 0, nil
|
return 0, nil
|
||||||
|
|
|
@ -215,7 +215,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
|
||||||
fakeDelNeedle.Id = key
|
fakeDelNeedle.Id = key
|
||||||
fakeDelNeedle.Cookie = 0x12345678
|
fakeDelNeedle.Cookie = 0x12345678
|
||||||
fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano())
|
fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
_, _, err = fakeDelNeedle.Append(dst, v.Version())
|
_, _, _, err = fakeDelNeedle.Append(dst, v.Version())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("append deleted %d failed: %v", key, err)
|
return fmt.Errorf("append deleted %d failed: %v", key, err)
|
||||||
}
|
}
|
||||||
|
@ -269,7 +269,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
|
||||||
if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil {
|
if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil {
|
||||||
return fmt.Errorf("cannot put needle: %s", err)
|
return fmt.Errorf("cannot put needle: %s", err)
|
||||||
}
|
}
|
||||||
if _, _, err := n.Append(dst, v.Version()); err != nil {
|
if _, _, _, err := n.Append(dst, v.Version()); err != nil {
|
||||||
return fmt.Errorf("cannot append needle: %s", err)
|
return fmt.Errorf("cannot append needle: %s", err)
|
||||||
}
|
}
|
||||||
new_offset += n.DiskSize(version)
|
new_offset += n.DiskSize(version)
|
||||||
|
@ -329,7 +329,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
|
||||||
if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil {
|
if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil {
|
||||||
return fmt.Errorf("cannot put needle: %s", err)
|
return fmt.Errorf("cannot put needle: %s", err)
|
||||||
}
|
}
|
||||||
if _, _, err = n.Append(dst, v.Version()); err != nil {
|
if _, _, _, err = n.Append(dst, v.Version()); err != nil {
|
||||||
return fmt.Errorf("cannot append needle: %s", err)
|
return fmt.Errorf("cannot append needle: %s", err)
|
||||||
}
|
}
|
||||||
new_offset += n.DiskSize(v.Version())
|
new_offset += n.DiskSize(v.Version())
|
||||||
|
|
|
@ -122,7 +122,7 @@ func TestCompaction(t *testing.T) {
|
||||||
}
|
}
|
||||||
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
|
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
|
||||||
n := newRandomNeedle(uint64(i))
|
n := newRandomNeedle(uint64(i))
|
||||||
size, err := v.writeNeedle(n)
|
_, size, err := v.writeNeedle(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("write file %d: %v", i, err)
|
t.Fatalf("write file %d: %v", i, err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue