mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge branch 'upload_via_temp_file'
This commit is contained in:
commit
e55aa41690
|
@ -150,7 +150,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (
|
||||||
}()
|
}()
|
||||||
|
|
||||||
entry, err = actualStore.FindEntry(ctx, fp)
|
entry, err = actualStore.FindEntry(ctx, fp)
|
||||||
glog.V(4).Infof("FindEntry %s: %v", fp, err)
|
// glog.V(4).Infof("FindEntry %s: %v", fp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
|
||||||
c.readerLock.Lock()
|
c.readerLock.Lock()
|
||||||
defer c.readerLock.Unlock()
|
defer c.readerLock.Unlock()
|
||||||
|
|
||||||
glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
|
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
|
||||||
return c.doReadAt(p, offset)
|
return c.doReadAt(p, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
|
||||||
if chunkStart >= chunkStop {
|
if chunkStart >= chunkStop {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
|
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
|
||||||
var buffer []byte
|
var buffer []byte
|
||||||
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
|
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
|
||||||
bufferLength := chunkStop - chunkStart
|
bufferLength := chunkStop - chunkStart
|
||||||
|
@ -152,7 +152,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
|
||||||
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
|
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
|
// glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
|
||||||
|
|
||||||
if err == nil && remaining > 0 && c.fileSize > startOffset {
|
if err == nil && remaining > 0 && c.fileSize > startOffset {
|
||||||
delta := int(min(remaining, c.fileSize-startOffset))
|
delta := int(min(remaining, c.fileSize-startOffset))
|
||||||
|
|
|
@ -296,7 +296,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
|
||||||
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
|
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
|
||||||
|
|
||||||
dirPath := util.FullPath(dir.FullPath())
|
dirPath := util.FullPath(dir.FullPath())
|
||||||
glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
|
// glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
|
||||||
|
|
||||||
fullFilePath := dirPath.Child(req.Name)
|
fullFilePath := dirPath.Child(req.Name)
|
||||||
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
|
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
|
||||||
|
|
|
@ -56,18 +56,23 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
|
||||||
}
|
}
|
||||||
pages.tf = tf
|
pages.tf = tf
|
||||||
pages.writtenIntervals.tempFile = tf
|
pages.writtenIntervals.tempFile = tf
|
||||||
|
pages.writtenIntervals.lastOffset = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
writtenOffset := pages.writtenIntervals.TotalSize()
|
writtenOffset := pages.writtenIntervals.lastOffset
|
||||||
|
dataSize := int64(len(data))
|
||||||
|
|
||||||
glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+int64(len(data)))
|
// glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
|
||||||
|
|
||||||
if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
|
if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
|
||||||
pages.lastErr = err
|
pages.lastErr = err
|
||||||
} else {
|
} else {
|
||||||
pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
|
pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
|
||||||
|
pages.writtenIntervals.lastOffset += dataSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pages.writtenIntervals.debug()
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,6 +86,11 @@ func (pages *TempFileDirtyPages) FlushData() error {
|
||||||
pages.pageAddLock.Lock()
|
pages.pageAddLock.Lock()
|
||||||
defer pages.pageAddLock.Unlock()
|
defer pages.pageAddLock.Unlock()
|
||||||
if pages.tf != nil {
|
if pages.tf != nil {
|
||||||
|
|
||||||
|
pages.writtenIntervals.tempFile = nil
|
||||||
|
pages.writtenIntervals.lists = nil
|
||||||
|
|
||||||
|
pages.tf.Close()
|
||||||
os.Remove(pages.tf.Name())
|
os.Remove(pages.tf.Name())
|
||||||
pages.tf = nil
|
pages.tf = nil
|
||||||
}
|
}
|
||||||
|
@ -91,15 +101,16 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
|
||||||
|
|
||||||
pageSize := pages.f.wfs.option.ChunkSizeLimit
|
pageSize := pages.f.wfs.option.ChunkSizeLimit
|
||||||
|
|
||||||
uploadedSize := int64(0)
|
// glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
|
||||||
|
|
||||||
for _, list := range pages.writtenIntervals.lists {
|
for _, list := range pages.writtenIntervals.lists {
|
||||||
for {
|
listStopOffset := list.Offset() + list.Size()
|
||||||
start, stop := max(list.Offset(), uploadedSize), min(list.Offset()+list.Size(), uploadedSize+pageSize)
|
for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
|
||||||
|
start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
|
||||||
if start >= stop {
|
if start >= stop {
|
||||||
break
|
continue
|
||||||
}
|
}
|
||||||
uploadedSize = stop
|
// glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
|
||||||
glog.V(4).Infof("uploading %v [%d,%d)", pages.f.Name, start, stop)
|
|
||||||
pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
|
pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package filesys
|
package filesys
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,8 +20,9 @@ type WrittenIntervalLinkedList struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type WrittenContinuousIntervals struct {
|
type WrittenContinuousIntervals struct {
|
||||||
tempFile *os.File
|
tempFile *os.File
|
||||||
lists []*WrittenIntervalLinkedList
|
lastOffset int64
|
||||||
|
lists []*WrittenIntervalLinkedList
|
||||||
}
|
}
|
||||||
|
|
||||||
func (list *WrittenIntervalLinkedList) Offset() int64 {
|
func (list *WrittenIntervalLinkedList) Offset() int64 {
|
||||||
|
@ -95,6 +96,21 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *WrittenContinuousIntervals) debug() {
|
||||||
|
log.Printf("++")
|
||||||
|
for _, l := range c.lists {
|
||||||
|
log.Printf("++++")
|
||||||
|
for t := l.Head; ; t = t.Next {
|
||||||
|
log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
|
||||||
|
if t.Next == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Printf("----")
|
||||||
|
}
|
||||||
|
log.Printf("--")
|
||||||
|
}
|
||||||
|
|
||||||
func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) {
|
func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) {
|
||||||
|
|
||||||
interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
|
interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
|
||||||
|
@ -223,6 +239,7 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader
|
||||||
for t := l.Head; ; t = t.Next {
|
for t := l.Head; ; t = t.Next {
|
||||||
startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop)
|
startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop)
|
||||||
if startOffset < stopOffset {
|
if startOffset < stopOffset {
|
||||||
|
// log.Printf("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
|
||||||
readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset))
|
readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset))
|
||||||
}
|
}
|
||||||
if t.Next == nil {
|
if t.Next == nil {
|
||||||
|
@ -236,29 +253,32 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
type FileSectionReader struct {
|
type FileSectionReader struct {
|
||||||
file *os.File
|
file *os.File
|
||||||
Offset int64
|
tempStartOffset int64
|
||||||
dataStart int64
|
Offset int64
|
||||||
dataStop int64
|
dataStart int64
|
||||||
|
dataStop int64
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = io.Reader(&FileSectionReader{})
|
var _ = io.Reader(&FileSectionReader{})
|
||||||
|
|
||||||
func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader {
|
func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader {
|
||||||
return &FileSectionReader{
|
return &FileSectionReader{
|
||||||
file: tempfile,
|
file: tempfile,
|
||||||
Offset: offset,
|
tempStartOffset: offset,
|
||||||
dataStart: dataOffset,
|
Offset: offset,
|
||||||
dataStop: dataOffset + size,
|
dataStart: dataOffset,
|
||||||
|
dataStop: dataOffset + size,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSectionReader) Read(p []byte) (n int, err error) {
|
func (f *FileSectionReader) Read(p []byte) (n int, err error) {
|
||||||
dataLen := min(f.dataStop-f.Offset, int64(len(p)))
|
remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset)
|
||||||
if dataLen < 0 {
|
if remaining <= 0 {
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("reading %v [%d,%d)", f.file.Name(), f.Offset, f.Offset+dataLen)
|
dataLen := min(remaining, int64(len(p)))
|
||||||
|
// glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart)
|
||||||
n, err = f.file.ReadAt(p[:dataLen], f.Offset)
|
n, err = f.file.ReadAt(p[:dataLen], f.Offset)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
f.Offset += int64(n)
|
f.Offset += int64(n)
|
||||||
|
|
|
@ -38,8 +38,8 @@ type FileHandle struct {
|
||||||
func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
|
func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
|
||||||
fh := &FileHandle{
|
fh := &FileHandle{
|
||||||
f: file,
|
f: file,
|
||||||
dirtyPages: newContinuousDirtyPages(file, writeOnly),
|
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
|
||||||
/// dirtyPages: newTempFileDirtyPages(file, writeOnly),
|
dirtyPages: newTempFileDirtyPages(file, writeOnly),
|
||||||
Uid: uid,
|
Uid: uid,
|
||||||
Gid: gid,
|
Gid: gid,
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
||||||
glog.Errorf("file handle read %s: %v", fileFullPath, err)
|
glog.Errorf("file handle read %s: %v", fileFullPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
|
// glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
|
||||||
|
|
||||||
return int64(totalRead), err
|
return int64(totalRead), err
|
||||||
}
|
}
|
||||||
|
@ -175,7 +175,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
|
||||||
|
|
||||||
entry.Content = nil
|
entry.Content = nil
|
||||||
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
|
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
|
||||||
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
|
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
|
||||||
|
|
||||||
fh.dirtyPages.AddPage(req.Offset, data)
|
fh.dirtyPages.AddPage(req.Offset, data)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue