mount: report error when Flush()

The error in Release() is not working.

See https://github.com/jaderhs/libfuse/blob/master/FAQ

related to https://github.com/chrislusf/seaweedfs/issues/1765
This commit is contained in:
Chris Lu 2021-01-28 04:46:37 -08:00
parent 9a06c35da4
commit cf252fc0cd
2 changed files with 16 additions and 32 deletions

View file

@ -3,7 +3,6 @@ package filesys
import ( import (
"bytes" "bytes"
"io" "io"
"runtime"
"sync" "sync"
"time" "time"
@ -16,8 +15,6 @@ type ContinuousDirtyPages struct {
f *File f *File
writeWaitGroup sync.WaitGroup writeWaitGroup sync.WaitGroup
chunkAddLock sync.Mutex chunkAddLock sync.Mutex
chunkSaveErrChan chan error
chunkSaveErrChanClosed bool
lastErr error lastErr error
collection string collection string
replication string replication string
@ -27,15 +24,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{ dirtyPages := &ContinuousDirtyPages{
intervals: &ContinuousIntervals{}, intervals: &ContinuousIntervals{},
f: file, f: file,
chunkSaveErrChan: make(chan error, runtime.NumCPU()),
} }
go func() {
for t := range dirtyPages.chunkSaveErrChan {
if t != nil {
dirtyPages.lastErr = t
}
}
}()
return dirtyPages return dirtyPages
} }
@ -94,15 +83,6 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
errChanSize := pages.f.wfs.option.ConcurrentWriters
if errChanSize == 0 {
errChanSize = runtime.NumCPU()
}
if pages.chunkSaveErrChanClosed {
pages.chunkSaveErrChan = make(chan error, errChanSize)
pages.chunkSaveErrChanClosed = false
}
mtime := time.Now().UnixNano() mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1) pages.writeWaitGroup.Add(1)
writer := func() { writer := func() {
@ -112,7 +92,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil { if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
pages.chunkSaveErrChan <- err pages.lastErr = err
return return
} }
chunk.Mtime = mtime chunk.Mtime = mtime

View file

@ -184,25 +184,20 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
fh.Lock() fh.Lock()
defer fh.Unlock() defer fh.Unlock()
fh.f.isOpen-- if fh.f.isOpen <= 0 {
if fh.f.isOpen < 0 {
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0) glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
fh.f.isOpen = 0 fh.f.isOpen = 0
return nil return nil
} }
if fh.f.isOpen == 0 { if fh.f.isOpen == 1 {
if err := fh.doFlush(ctx, req.Header); err != nil { if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Release doFlush %s: %v", fh.f.Name, err) glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
return err
} }
// stop the goroutine fh.f.isOpen--
if !fh.dirtyPages.chunkSaveErrChanClosed {
fh.dirtyPages.chunkSaveErrChanClosed = true
close(fh.dirtyPages.chunkSaveErrChan)
}
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
if closer, ok := fh.f.reader.(io.Closer); ok { if closer, ok := fh.f.reader.(io.Closer); ok {
@ -216,10 +211,18 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle)
fh.Lock() fh.Lock()
defer fh.Unlock() defer fh.Unlock()
return fh.doFlush(ctx, req.Header) if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Flush doFlush %s: %v", fh.f.Name, err)
return err
}
glog.V(4).Infof("Flush %v fh %d success", fh.f.fullpath(), fh.handle)
return nil
} }
func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
@ -232,7 +235,8 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
fh.dirtyPages.writeWaitGroup.Wait() fh.dirtyPages.writeWaitGroup.Wait()
if fh.dirtyPages.lastErr != nil { if fh.dirtyPages.lastErr != nil {
return fh.dirtyPages.lastErr glog.Errorf("%v doFlush last err: %v", fh.f.fullpath(), fh.dirtyPages.lastErr)
return fuse.EIO
} }
if !fh.f.dirtyMetadata { if !fh.f.dirtyMetadata {