diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index eb2308758..89db04eb0 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -3,6 +3,7 @@ package filer import ( "fmt" "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" @@ -20,17 +21,17 @@ type ReaderCache struct { type SingleChunkCacher struct { sync.Mutex - parent *ReaderCache - chunkFileId string - data []byte - err error - cipherKey []byte - isGzipped bool - chunkSize int - shouldCache bool - wg sync.WaitGroup - cacheStartedCh chan struct{} - completedTime time.Time + parent *ReaderCache + chunkFileId string + data []byte + err error + cipherKey []byte + isGzipped bool + chunkSize int + shouldCache bool + wg sync.WaitGroup + cacheStartedCh chan struct{} + completedTimeNew int64 } func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { @@ -50,13 +51,17 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { rc.Lock() defer rc.Unlock() + if len(rc.downloaders) >= rc.limit { + return + } + for _, chunkView := range chunkViews { if _, found := rc.downloaders[chunkView.FileId]; found { continue } if len(rc.downloaders) >= rc.limit { - // if still no slots, return + // abort when slots are filled return } @@ -74,27 +79,28 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { rc.Lock() - defer rc.Unlock() + if cacher, found := rc.downloaders[fileId]; found { if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil { + rc.Unlock() return n, err } } if shouldCache || rc.lookupFileIdFn == nil { n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) if n > 0 { + rc.Unlock() return n, err } } // clean up old downloaders if len(rc.downloaders) >= rc.limit { - oldestFid, oldestTime := "", time.Now() + oldestFid, oldestTime := "", time.Now().Unix() for fid, downloader := range rc.downloaders { - if !downloader.completedTime.IsZero() { - if downloader.completedTime.Before(oldestTime) { - oldestFid, oldestTime = fid, downloader.completedTime - } + completedTime := atomic.LoadInt64(&downloader.completedTimeNew) + if completedTime > 0 && completedTime < oldestTime { + oldestFid, oldestTime = fid, completedTime } } if oldestFid != "" { @@ -110,6 +116,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt go cacher.startCaching() <-cacher.cacheStartedCh rc.downloaders[fileId] = cacher + rc.Unlock() return cacher.readChunkAt(buffer, offset) } @@ -172,7 +179,7 @@ func (s *SingleChunkCacher) startCaching() { if s.shouldCache { s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) } - s.completedTime = time.Now() + atomic.StoreInt64(&s.completedTimeNew, time.Now().Unix()) return } diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go index e32f7fc2d..b0906e99f 100644 --- a/weed/filer/reader_pattern.go +++ b/weed/filer/reader_pattern.go @@ -1,5 +1,9 @@ package filer +import ( + "sync/atomic" +) + type ReaderPattern struct { isSequentialCounter int64 lastReadStopOffset int64 @@ -18,18 +22,20 @@ func NewReaderPattern() *ReaderPattern { } func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { - if rp.lastReadStopOffset == offset { - if rp.isSequentialCounter < ModeChangeLimit { - rp.isSequentialCounter++ + lastOffset := atomic.SwapInt64(&rp.lastReadStopOffset, offset+int64(size)) + counter := atomic.LoadInt64(&rp.isSequentialCounter) + + if lastOffset == offset { + if counter < ModeChangeLimit { + atomic.AddInt64(&rp.isSequentialCounter, 1) } } else { - if rp.isSequentialCounter > -ModeChangeLimit { - rp.isSequentialCounter-- + if counter > -ModeChangeLimit { + atomic.AddInt64(&rp.isSequentialCounter, -1) } } - rp.lastReadStopOffset = offset + int64(size) } func (rp *ReaderPattern) IsRandomMode() bool { - return rp.isSequentialCounter < 0 + return atomic.LoadInt64(&rp.isSequentialCounter) < 0 } diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index 8175c61f4..4595764ee 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -1,12 +1,14 @@ package mount import ( + "sync" + + "golang.org/x/exp/slices" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "golang.org/x/exp/slices" - "sync" ) type FileHandleId uint64 @@ -57,12 +59,20 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry { defer fh.entryLock.Unlock() return fh.entry } + func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { fh.entryLock.Lock() defer fh.entryLock.Unlock() fh.entry = entry } +func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + fn(fh.entry) + return fh.entry +} + func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { fh.entryLock.Lock() defer fh.entryLock.Unlock() diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go index a5e1016ca..4cf674166 100644 --- a/weed/mount/filehandle_map.go +++ b/weed/mount/filehandle_map.go @@ -1,8 +1,9 @@ package mount import ( - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "sync" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) type FileHandleToInode struct { @@ -49,7 +50,9 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil } else { fh.counter++ } - fh.entry = entry + if fh.entry != entry { + fh.SetEntry(entry) + } return fh } diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 9f1d85ab5..7cff71c52 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -135,10 +135,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle } var found bool if fh, found = wfs.fhmap.FindFileHandle(inode); found { - entry = fh.GetEntry() - if entry != nil && fh.entry.Attributes == nil { - entry.Attributes = &filer_pb.FuseAttributes{} - } + entry = fh.UpdateEntry(func(entry *filer_pb.Entry) { + if entry != nil && fh.entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} + } + }) } else { entry, status = wfs.maybeLoadEntry(path) } diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go index 7a9b7fecc..49e4b1b56 100644 --- a/weed/mount/weedfs_dir_lookup.go +++ b/weed/mount/weedfs_dir_lookup.go @@ -2,7 +2,9 @@ package mount import ( "context" + "github.com/hanwen/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" @@ -55,9 +57,13 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true) - if fh, found := wfs.fhmap.FindFileHandle(inode); found && fh.entry != nil { - glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry)) - localEntry = filer.FromPbEntry(string(dirPath), fh.entry) + if fh, found := wfs.fhmap.FindFileHandle(inode); found { + fh.entryLock.Lock() + if fh.entry != nil { + glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry)) + localEntry = filer.FromPbEntry(string(dirPath), fh.entry) + } + fh.entryLock.Unlock() } wfs.outputFilerEntry(out, inode, localEntry)