mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Revert "refactor: moved to locked entry" (#4035)
* Revert "refactor: moved to locked entry"
This reverts commit 94bc9afd9d
.
* only add LockedEntry, no changes to entryLock
* fix compilation
This commit is contained in:
parent
38479b6329
commit
dac9c28d05
|
@ -8,6 +8,7 @@ import (
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
"golang.org/x/sync/semaphore"
|
"golang.org/x/sync/semaphore"
|
||||||
"math"
|
"math"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileHandleId uint64
|
type FileHandleId uint64
|
||||||
|
@ -16,6 +17,7 @@ type FileHandle struct {
|
||||||
fh FileHandleId
|
fh FileHandleId
|
||||||
counter int64
|
counter int64
|
||||||
entry *LockedEntry
|
entry *LockedEntry
|
||||||
|
entryLock sync.Mutex
|
||||||
inode uint64
|
inode uint64
|
||||||
wfs *WFS
|
wfs *WFS
|
||||||
|
|
||||||
|
@ -69,6 +71,8 @@ func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
|
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
|
||||||
|
fh.entryLock.Lock()
|
||||||
|
defer fh.entryLock.Unlock()
|
||||||
|
|
||||||
if fh.entry == nil {
|
if fh.entry == nil {
|
||||||
return
|
return
|
||||||
|
|
|
@ -44,6 +44,10 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
|
||||||
if status != fuse.OK {
|
if status != fuse.OK {
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
if fh != nil {
|
||||||
|
fh.entryLock.Lock()
|
||||||
|
defer fh.entryLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
if size, ok := input.GetSize(); ok && entry != nil {
|
if size, ok := input.GetSize(); ok && entry != nil {
|
||||||
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.GetChunks()))
|
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.GetChunks()))
|
||||||
|
|
|
@ -58,10 +58,12 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
|
||||||
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
|
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
|
||||||
|
|
||||||
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
|
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
|
||||||
|
fh.entryLock.Lock()
|
||||||
if entry := fh.GetEntry(); entry != nil {
|
if entry := fh.GetEntry(); entry != nil {
|
||||||
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry))
|
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry))
|
||||||
localEntry = filer.FromPbEntry(string(dirPath), entry)
|
localEntry = filer.FromPbEntry(string(dirPath), entry)
|
||||||
}
|
}
|
||||||
|
fh.entryLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
wfs.outputFilerEntry(out, inode, localEntry)
|
wfs.outputFilerEntry(out, inode, localEntry)
|
||||||
|
|
|
@ -46,6 +46,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
|
||||||
// lock source and target file handles
|
// lock source and target file handles
|
||||||
fhOut.orderedMutex.Acquire(context.Background(), 1)
|
fhOut.orderedMutex.Acquire(context.Background(), 1)
|
||||||
defer fhOut.orderedMutex.Release(1)
|
defer fhOut.orderedMutex.Release(1)
|
||||||
|
fhOut.entryLock.Lock()
|
||||||
|
defer fhOut.entryLock.Unlock()
|
||||||
|
|
||||||
if fhOut.entry == nil {
|
if fhOut.entry == nil {
|
||||||
return 0, fuse.ENOENT
|
return 0, fuse.ENOENT
|
||||||
|
@ -54,6 +56,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
|
||||||
if fhIn.fh != fhOut.fh {
|
if fhIn.fh != fhOut.fh {
|
||||||
fhIn.orderedMutex.Acquire(context.Background(), 1)
|
fhIn.orderedMutex.Acquire(context.Background(), 1)
|
||||||
defer fhIn.orderedMutex.Release(1)
|
defer fhIn.orderedMutex.Release(1)
|
||||||
|
fhIn.entryLock.Lock()
|
||||||
|
defer fhIn.entryLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// directories are not supported
|
// directories are not supported
|
||||||
|
|
|
@ -38,6 +38,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
|
||||||
// lock the file until the proper offset was calculated
|
// lock the file until the proper offset was calculated
|
||||||
fh.orderedMutex.Acquire(context.Background(), 1)
|
fh.orderedMutex.Acquire(context.Background(), 1)
|
||||||
defer fh.orderedMutex.Release(1)
|
defer fh.orderedMutex.Release(1)
|
||||||
|
fh.entryLock.Lock()
|
||||||
|
defer fh.entryLock.Unlock()
|
||||||
|
|
||||||
fileSize := int64(filer.FileSize(fh.GetEntry()))
|
fileSize := int64(filer.FileSize(fh.GetEntry()))
|
||||||
offset := max(int64(in.Offset), 0)
|
offset := max(int64(in.Offset), 0)
|
||||||
|
|
|
@ -118,6 +118,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
|
||||||
|
|
||||||
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
fh.entryLock.Lock()
|
||||||
|
defer fh.entryLock.Unlock()
|
||||||
|
|
||||||
entry := fh.GetEntry()
|
entry := fh.GetEntry()
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -103,13 +103,17 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
path, _, entry, status := wfs.maybeReadEntry(input.NodeId)
|
path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
|
||||||
if status != fuse.OK {
|
if status != fuse.OK {
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
return fuse.ENOENT
|
return fuse.ENOENT
|
||||||
}
|
}
|
||||||
|
if fh != nil {
|
||||||
|
fh.entryLock.Lock()
|
||||||
|
defer fh.entryLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
if entry.Extended == nil {
|
if entry.Extended == nil {
|
||||||
entry.Extended = make(map[string][]byte)
|
entry.Extended = make(map[string][]byte)
|
||||||
|
@ -177,13 +181,17 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr
|
||||||
if len(attr) == 0 {
|
if len(attr) == 0 {
|
||||||
return fuse.EINVAL
|
return fuse.EINVAL
|
||||||
}
|
}
|
||||||
path, _, entry, status := wfs.maybeReadEntry(header.NodeId)
|
path, fh, entry, status := wfs.maybeReadEntry(header.NodeId)
|
||||||
if status != fuse.OK {
|
if status != fuse.OK {
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
return fuse.OK
|
return fuse.OK
|
||||||
}
|
}
|
||||||
|
if fh != nil {
|
||||||
|
fh.entryLock.Lock()
|
||||||
|
defer fh.entryLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
if entry.Extended == nil {
|
if entry.Extended == nil {
|
||||||
return fuse.ENOATTR
|
return fuse.ENOATTR
|
||||||
|
|
Loading…
Reference in a new issue