diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index da7eae621..f73f72429 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -156,7 +156,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { n += delta } - if err == nil && offset+int64(len(p)) > c.fileSize { + if err == nil && offset+int64(len(p)) >= c.fileSize { err = io.EOF } // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err) diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go index d4a34cbfe..37a34f4ea 100644 --- a/weed/filer/reader_at_test.go +++ b/weed/filer/reader_at_test.go @@ -66,9 +66,9 @@ func TestReaderAt(t *testing.T) { chunkCache: &mockChunkCache{}, } - testReadAt(t, readerAt, 0, 10, 10, nil) + testReadAt(t, readerAt, 0, 10, 10, io.EOF) testReadAt(t, readerAt, 0, 12, 10, io.EOF) - testReadAt(t, readerAt, 2, 8, 8, nil) + testReadAt(t, readerAt, 2, 8, 8, io.EOF) testReadAt(t, readerAt, 3, 6, 6, nil) } @@ -116,7 +116,7 @@ func TestReaderAt0(t *testing.T) { chunkCache: &mockChunkCache{}, } - testReadAt(t, readerAt, 0, 10, 10, nil) + testReadAt(t, readerAt, 0, 10, 10, io.EOF) testReadAt(t, readerAt, 3, 16, 7, io.EOF) testReadAt(t, readerAt, 3, 5, 5, nil) @@ -144,7 +144,7 @@ func TestReaderAt1(t *testing.T) { chunkCache: &mockChunkCache{}, } - testReadAt(t, readerAt, 0, 20, 20, nil) + testReadAt(t, readerAt, 0, 20, 20, io.EOF) testReadAt(t, readerAt, 1, 7, 7, nil) testReadAt(t, readerAt, 0, 1, 1, nil) testReadAt(t, readerAt, 18, 4, 2, io.EOF) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 1ab7d0961..3d3fac184 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,147 +2,102 @@ package filesys import ( "bytes" - "io" - "sync" - "time" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" + "sync" ) type ContinuousDirtyPages struct { - intervals *ContinuousIntervals - f *File - lock sync.Mutex - collection string - replication string + intervals *ContinuousIntervals + f *File + writeWaitGroup sync.WaitGroup + chunkSaveErrChan chan error + lock sync.Mutex + collection string + replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { return &ContinuousDirtyPages{ - intervals: &ContinuousIntervals{}, - f: file, + intervals: &ContinuousIntervals{}, + f: file, + chunkSaveErrChan: make(chan error, 8), } } -var counter = int32(0) - -func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. - return pages.flushAndSave(offset, data) + pages.flushAndSave(offset, data) } pages.intervals.AddInterval(data, offset) - var chunk *filer_pb.FileChunk - var hasSavedData bool - if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit { - chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() - if hasSavedData { - chunks = append(chunks, chunk) - } + pages.saveExistingLargestPageToStorage() } return } -func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { - - var chunk *filer_pb.FileChunk - var newChunks []*filer_pb.FileChunk +func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) { // flush existing - if newChunks, err = pages.saveExistingPagesToStorage(); err == nil { - if newChunks != nil { - chunks = append(chunks, newChunks...) - } - } else { - return - } + pages.saveExistingPagesToStorage() // flush the new page - if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) - chunks = append(chunks, chunk) - } - } else { - glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return - } + pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))) return } -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { - - var hasSavedData bool - var chunk *filer_pb.FileChunk - - for { - - chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() - if !hasSavedData { - return chunks, err - } - - if err == nil { - if chunk != nil { - chunks = append(chunks, chunk) - } - } else { - return - } +func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() { + for pages.saveExistingLargestPageToStorage() { } - } -func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *filer_pb.FileChunk, hasSavedData bool, err error) { +func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) { maxList := pages.intervals.RemoveLargestIntervalLinkedList() if maxList == nil { - return nil, false, nil + return false } fileSize := int64(pages.f.entry.Attributes.FileSize) - for { - chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) - if chunkSize == 0 { - return - } - chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) - if err == nil { - if chunk != nil { - hasSavedData = true - } - glog.V(4).Infof("saveToStorage %s %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.GetFileIdString(), maxList.Offset(), maxList.Offset()+chunkSize, fileSize) - return - } else { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+chunkSize, err) - time.Sleep(5 * time.Second) - } + + chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) + if chunkSize == 0 { + return false } + pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) + + return true } -func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { +func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { - dir, _ := pages.f.fullpath().DirAndName() + pages.writeWaitGroup.Add(1) + go func() { + defer pages.writeWaitGroup.Done() - reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) - if err != nil { - return nil, err - } - pages.collection, pages.replication = collection, replication - - return chunk, nil + dir, _ := pages.f.fullpath().DirAndName() + reader = io.LimitReader(reader, size) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) + if err != nil { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) + pages.chunkSaveErrChan <- err + return + } + pages.collection, pages.replication = collection, replication + pages.f.addChunks([]*filer_pb.FileChunk{chunk}) + pages.chunkSaveErrChan <- nil + }() } func maxUint64(x, y uint64) uint64 { diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index e1524f939..43991376b 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -148,11 +148,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - chunks, err := fh.dirtyPages.AddPage(req.Offset, data) - if err != nil { - glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err) - return fuse.EIO - } + fh.dirtyPages.AddPage(req.Offset, data) resp.Size = len(data) @@ -162,12 +158,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f fh.f.dirtyMetadata = true } - if len(chunks) > 0 { - - fh.f.addChunks(chunks) - - fh.f.dirtyMetadata = true - } + fh.f.dirtyMetadata = true return nil } @@ -204,20 +195,24 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { } func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { - // fflush works at fh level + // flush works at fh level // send the data to the OS glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle) - chunks, err := fh.dirtyPages.saveExistingPagesToStorage() + fh.dirtyPages.saveExistingPagesToStorage() + + var err error + go func() { + for t := range fh.dirtyPages.chunkSaveErrChan { + if t != nil { + err = t + } + } + }() + fh.dirtyPages.writeWaitGroup.Wait() + if err != nil { - glog.Errorf("flush %s: %v", fh.f.fullpath(), err) - return fuse.EIO - } - - if len(chunks) > 0 { - - fh.f.addChunks(chunks) - fh.f.dirtyMetadata = true + return err } if !fh.f.dirtyMetadata {