From 94bc9afd9d3f8e049219c1cdc9f0d6e0eb4cf456 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 4 Dec 2022 23:33:05 -0800 Subject: [PATCH] refactor: moved to locked entry --- weed/mount/filehandle.go | 40 ++++++++++---------------- weed/mount/filehandle_map.go | 2 +- weed/mount/filehandle_read.go | 2 +- weed/mount/locked_entry.go | 42 ++++++++++++++++++++++++++++ weed/mount/weedfs_attr.go | 6 +--- weed/mount/weedfs_dir_lookup.go | 8 ++---- weed/mount/weedfs_dir_read.go | 2 +- weed/mount/weedfs_file_copy_range.go | 4 --- weed/mount/weedfs_file_lseek.go | 4 +-- weed/mount/weedfs_file_sync.go | 5 +--- weed/mount/weedfs_file_write.go | 2 +- weed/mount/weedfs_rename.go | 7 +++-- weed/mount/weedfs_xattr.go | 12 ++------ 13 files changed, 74 insertions(+), 62 deletions(-) create mode 100644 weed/mount/locked_entry.go diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index c2a197da7..7281ede66 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -1,27 +1,23 @@ package mount import ( - "golang.org/x/sync/semaphore" - "math" - "sync" - - "golang.org/x/exp/slices" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" + "golang.org/x/exp/slices" + "golang.org/x/sync/semaphore" + "math" ) type FileHandleId uint64 type FileHandle struct { - fh FileHandleId - counter int64 - entry *filer_pb.Entry - entryLock sync.Mutex - inode uint64 - wfs *WFS + fh FileHandleId + counter int64 + entry *LockedEntry + inode uint64 + wfs *WFS // cache file has been written to dirtyMetadata bool @@ -48,6 +44,9 @@ func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_p if entry != nil { entry.Attributes.FileSize = filer.FileSize(entry) } + fh.entry = &LockedEntry{ + Entry: entry, + } return fh } @@ -58,27 +57,18 @@ func (fh *FileHandle) FullPath() util.FullPath { } func (fh *FileHandle) GetEntry() *filer_pb.Entry { - fh.entryLock.Lock() - defer fh.entryLock.Unlock() - return fh.entry + return fh.entry.GetEntry() } func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { - fh.entryLock.Lock() - defer fh.entryLock.Unlock() - fh.entry = entry + fh.entry.SetEntry(entry) } func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry { - fh.entryLock.Lock() - defer fh.entryLock.Unlock() - fn(fh.entry) - return fh.entry + return fh.entry.UpdateEntry(fn) } func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { - fh.entryLock.Lock() - defer fh.entryLock.Unlock() if fh.entry == nil { return @@ -107,7 +97,7 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks)) - fh.entry.Chunks = append(fh.entry.GetChunks(), newChunks...) + fh.entry.AppendChunks(newChunks) fh.entryViewCache = nil } diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go index 4cf674166..cc5885ffc 100644 --- a/weed/mount/filehandle_map.go +++ b/weed/mount/filehandle_map.go @@ -50,7 +50,7 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil } else { fh.counter++ } - if fh.entry != entry { + if fh.GetEntry() != entry { fh.SetEntry(entry) } return fh diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index 08f678e69..a316a16cd 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -26,7 +26,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { fileFullPath := fh.FullPath() - entry := fh.entry + entry := fh.GetEntry() if entry == nil { return 0, io.EOF } diff --git a/weed/mount/locked_entry.go b/weed/mount/locked_entry.go new file mode 100644 index 000000000..f3b4bf484 --- /dev/null +++ b/weed/mount/locked_entry.go @@ -0,0 +1,42 @@ +package mount + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "sync" +) + +type LockedEntry struct { + *filer_pb.Entry + sync.RWMutex +} + +func (le *LockedEntry) GetEntry() *filer_pb.Entry { + le.RLock() + defer le.RUnlock() + return le.Entry +} + +func (le *LockedEntry) SetEntry(entry *filer_pb.Entry) { + le.Lock() + defer le.Unlock() + le.Entry = entry +} + +func (le *LockedEntry) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry { + le.Lock() + defer le.Unlock() + fn(le.Entry) + return le.Entry +} + +func (le *LockedEntry) GetChunks() []*filer_pb.FileChunk { + le.RLock() + defer le.RUnlock() + return le.Entry.Chunks +} + +func (le *LockedEntry) AppendChunks(newChunks []*filer_pb.FileChunk) { + le.Lock() + defer le.Unlock() + le.Entry.Chunks = append(le.Entry.Chunks, newChunks...) +} diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index 7691d4e59..3550066e0 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -25,7 +25,7 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse } else { if fh, found := wfs.fhmap.FindFileHandle(inode); found { out.AttrValid = 1 - wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry) + wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry()) out.Nlink = 0 return fuse.OK } @@ -44,10 +44,6 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse if status != fuse.OK { return status } - if fh != nil { - fh.entryLock.Lock() - defer fh.entryLock.Unlock() - } if size, ok := input.GetSize(); ok && entry != nil { glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.GetChunks())) diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go index 49e4b1b56..015f04e99 100644 --- a/weed/mount/weedfs_dir_lookup.go +++ b/weed/mount/weedfs_dir_lookup.go @@ -58,12 +58,10 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true) if fh, found := wfs.fhmap.FindFileHandle(inode); found { - fh.entryLock.Lock() - if fh.entry != nil { - glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry)) - localEntry = filer.FromPbEntry(string(dirPath), fh.entry) + if entry := fh.GetEntry(); entry != nil { + glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry)) + localEntry = filer.FromPbEntry(string(dirPath), entry) } - fh.entryLock.Unlock() } wfs.outputFilerEntry(out, inode, localEntry) diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index ea64feeb7..f140fd86f 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -173,7 +173,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } if fh, found := wfs.fhmap.FindFileHandle(inode); found { glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name)) - entry = filer.FromPbEntry(string(dirPath), fh.entry) + entry = filer.FromPbEntry(string(dirPath), fh.GetEntry()) } wfs.outputFilerEntry(entryOut, inode, entry) } diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index bc092a252..4b0d22137 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -46,8 +46,6 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) // lock source and target file handles fhOut.orderedMutex.Acquire(context.Background(), 1) defer fhOut.orderedMutex.Release(1) - fhOut.entryLock.Lock() - defer fhOut.entryLock.Unlock() if fhOut.entry == nil { return 0, fuse.ENOENT @@ -56,8 +54,6 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) if fhIn.fh != fhOut.fh { fhIn.orderedMutex.Acquire(context.Background(), 1) defer fhIn.orderedMutex.Release(1) - fhIn.entryLock.Lock() - defer fhIn.entryLock.Unlock() } // directories are not supported diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go index 43970983b..69edf7144 100644 --- a/weed/mount/weedfs_file_lseek.go +++ b/weed/mount/weedfs_file_lseek.go @@ -38,10 +38,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO // lock the file until the proper offset was calculated fh.orderedMutex.Acquire(context.Background(), 1) defer fh.orderedMutex.Release(1) - fh.entryLock.Lock() - defer fh.entryLock.Unlock() - fileSize := int64(filer.FileSize(fh.entry)) + fileSize := int64(filer.FileSize(fh.GetEntry())) offset := max(int64(in.Offset), 0) glog.V(4).Infof( diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 585ca0b47..244963ad3 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -118,10 +118,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - fh.entryLock.Lock() - defer fh.entryLock.Unlock() - - entry := fh.entry + entry := fh.GetEntry() if entry == nil { return nil } diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index 255d4adc9..7b13d54ff 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -49,7 +49,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr fh.orderedMutex.Acquire(context.Background(), 1) defer fh.orderedMutex.Release(1) - entry := fh.entry + entry := fh.GetEntry() if entry == nil { return 0, fuse.OK } diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go index 42dcf348d..58c60aa28 100644 --- a/weed/mount/weedfs_rename.go +++ b/weed/mount/weedfs_rename.go @@ -235,8 +235,11 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR sourceInode, targetInode := wfs.inodeToPath.MovePath(oldPath, newPath) if sourceInode != 0 { - if fh, foundFh := wfs.fhmap.FindFileHandle(sourceInode); foundFh && fh.entry != nil { - fh.entry.Name = newName + fh, foundFh := wfs.fhmap.FindFileHandle(sourceInode) + if foundFh { + if entry := fh.GetEntry(); entry != nil { + entry.Name = newName + } } // invalidate attr and data // wfs.fuseServer.InodeNotify(sourceInode, 0, -1) diff --git a/weed/mount/weedfs_xattr.go b/weed/mount/weedfs_xattr.go index b03fa01f1..7a5109ee8 100644 --- a/weed/mount/weedfs_xattr.go +++ b/weed/mount/weedfs_xattr.go @@ -103,17 +103,13 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st } } - path, fh, entry, status := wfs.maybeReadEntry(input.NodeId) + path, _, entry, status := wfs.maybeReadEntry(input.NodeId) if status != fuse.OK { return status } if entry == nil { return fuse.ENOENT } - if fh != nil { - fh.entryLock.Lock() - defer fh.entryLock.Unlock() - } if entry.Extended == nil { entry.Extended = make(map[string][]byte) @@ -181,17 +177,13 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr if len(attr) == 0 { return fuse.EINVAL } - path, fh, entry, status := wfs.maybeReadEntry(header.NodeId) + path, _, entry, status := wfs.maybeReadEntry(header.NodeId) if status != fuse.OK { return status } if entry == nil { return fuse.OK } - if fh != nil { - fh.entryLock.Lock() - defer fh.entryLock.Unlock() - } if entry.Extended == nil { return fuse.ENOATTR