From 22064c342585bddaa7ebdb21e39cac7db87826df Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 11 Sep 2022 19:44:34 -0700 Subject: [PATCH] mount: ensure ordered file handle lock and unlock --- weed/mount/filehandle.go | 13 ++++++++----- weed/mount/weedfs_file_copy_range.go | 9 +++++---- weed/mount/weedfs_file_lseek.go | 5 +++-- weed/mount/weedfs_file_read.go | 5 +++-- weed/mount/weedfs_file_sync.go | 8 ++++---- weed/mount/weedfs_file_write.go | 5 +++-- 6 files changed, 26 insertions(+), 19 deletions(-) diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index 5d1552ce6..aadcb3836 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -1,6 +1,8 @@ package mount import ( + "golang.org/x/sync/semaphore" + "math" "sync" "golang.org/x/exp/slices" @@ -28,17 +30,18 @@ type FileHandle struct { reader *filer.ChunkReadAt contentType string handle uint64 - sync.Mutex + orderedMutex *semaphore.Weighted isDeleted bool } func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle { fh := &FileHandle{ - fh: handleId, - counter: 1, - inode: inode, - wfs: wfs, + fh: handleId, + counter: 1, + inode: inode, + wfs: wfs, + orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)), } // dirtyPages: newContinuousDirtyPages(file, writeOnly), fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit) diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index 412869abc..bc092a252 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -1,6 +1,7 @@ package mount import ( + "context" "net/http" "github.com/hanwen/go-fuse/v2/fuse" @@ -43,8 +44,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) } // lock source and target file handles - fhOut.Lock() - defer fhOut.Unlock() + fhOut.orderedMutex.Acquire(context.Background(), 1) + defer fhOut.orderedMutex.Release(1) fhOut.entryLock.Lock() defer fhOut.entryLock.Unlock() @@ -53,8 +54,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) } if fhIn.fh != fhOut.fh { - fhIn.Lock() - defer fhIn.Unlock() + fhIn.orderedMutex.Acquire(context.Background(), 1) + defer fhIn.orderedMutex.Release(1) fhIn.entryLock.Lock() defer fhIn.entryLock.Unlock() } diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go index ed495f5b5..0564ac0ee 100644 --- a/weed/mount/weedfs_file_lseek.go +++ b/weed/mount/weedfs_file_lseek.go @@ -1,6 +1,7 @@ package mount import ( + "context" "syscall" "github.com/hanwen/go-fuse/v2/fuse" @@ -35,8 +36,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO } // lock the file until the proper offset was calculated - fh.Lock() - defer fh.Unlock() + fh.orderedMutex.Acquire(context.Background(), 1) + defer fh.orderedMutex.Release(1) fh.entryLock.Lock() defer fh.entryLock.Unlock() diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go index 307ad5960..8375f9a5d 100644 --- a/weed/mount/weedfs_file_read.go +++ b/weed/mount/weedfs_file_read.go @@ -1,6 +1,7 @@ package mount import ( + "context" "io" "github.com/hanwen/go-fuse/v2/fuse" @@ -39,8 +40,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse return nil, fuse.ENOENT } - fh.Lock() - defer fh.Unlock() + fh.orderedMutex.Acquire(context.Background(), 1) + defer fh.orderedMutex.Release(1) offset := int64(in.Offset) totalRead, err := readDataByFileHandle(buff, fh, offset) diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 4e1a9f3f0..7b80ddc73 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -55,8 +55,8 @@ func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status { return fuse.ENOENT } - fh.Lock() - defer fh.Unlock() + fh.orderedMutex.Acquire(context.Background(), 1) + defer fh.orderedMutex.Release(1) return wfs.doFlush(fh, in.Uid, in.Gid) } @@ -87,8 +87,8 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu return fuse.ENOENT } - fh.Lock() - defer fh.Unlock() + fh.orderedMutex.Acquire(context.Background(), 1) + defer fh.orderedMutex.Release(1) return wfs.doFlush(fh, in.Uid, in.Gid) diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index 2b7a6cea2..255d4adc9 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -1,6 +1,7 @@ package mount import ( + "context" "github.com/hanwen/go-fuse/v2/fuse" "net/http" "syscall" @@ -45,8 +46,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size)) - fh.Lock() - defer fh.Unlock() + fh.orderedMutex.Acquire(context.Background(), 1) + defer fh.orderedMutex.Release(1) entry := fh.entry if entry == nil {