diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 072f562fc..25ef830ae 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -67,6 +67,7 @@ type WFS struct { concurrentWriters *util.LimitedConcurrentExecutor inodeToPath *InodeToPath fhmap *FileHandleToInode + dhmap *DirectoryHandleToInode } func NewSeaweedFileSystem(option *Option) *WFS { @@ -76,6 +77,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { signature: util.RandomInt32(), inodeToPath: NewInodeToPath(), fhmap: NewFileHandleToInode(), + dhmap: NewDirectoryHandleToInode(), } wfs.root = Directory{ diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index b14bacc1f..99882fcf8 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -9,8 +9,66 @@ import ( "github.com/hanwen/go-fuse/v2/fuse" "math" "os" + "sync" ) +type DirectoryHandleId uint64 + +type DirectoryHandle struct { + isFinished bool + counter uint32 + lastEntryName string +} + +type DirectoryHandleToInode struct { + // shares the file handle id sequencer with FileHandleToInode{nextFh} + sync.Mutex + dir2inode map[DirectoryHandleId]*DirectoryHandle +} + +func NewDirectoryHandleToInode() *DirectoryHandleToInode { + return &DirectoryHandleToInode{ + dir2inode: make(map[DirectoryHandleId]*DirectoryHandle), + } +} + +func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) { + wfs.fhmap.Lock() + fh := wfs.fhmap.nextFh + wfs.fhmap.nextFh++ + wfs.fhmap.Unlock() + + wfs.dhmap.Lock() + defer wfs.dhmap.Unlock() + dh := &DirectoryHandle{ + isFinished: false, + lastEntryName: "", + } + wfs.dhmap.dir2inode[DirectoryHandleId(fh)] = dh + return DirectoryHandleId(fh), dh +} + +func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle { + wfs.dhmap.Lock() + defer wfs.dhmap.Unlock() + if dh, found := wfs.dhmap.dir2inode[dhid]; found { + return dh + } + dh := &DirectoryHandle{ + isFinished: false, + lastEntryName: "", + } + + wfs.dhmap.dir2inode[dhid] = dh + return dh +} + +func (wfs *WFS) ReleaseDirectoryHandle(dhid DirectoryHandleId) { + wfs.dhmap.Lock() + defer wfs.dhmap.Unlock() + delete(wfs.dhmap.dir2inode, dhid) +} + // Directory handling /** Open directory @@ -25,6 +83,8 @@ func (wfs *WFS) OpenDir(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.Op if !wfs.inodeToPath.HasInode(input.NodeId) { return fuse.ENOENT } + dhid, _ := wfs.AcquireDirectoryHandle() + out.Fh = uint64(dhid) return fuse.OK } @@ -34,6 +94,7 @@ func (wfs *WFS) OpenDir(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.Op * path parameter will be NULL. */ func (wfs *WFS) ReleaseDir(input *fuse.ReleaseIn) { + wfs.ReleaseDirectoryHandle(DirectoryHandleId(input.Fh)) } /** Synchronize directory contents @@ -72,18 +133,23 @@ func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fus } func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status { + + dh := wfs.GetDirectoryHandle(DirectoryHandleId(input.Fh)) + if dh.isFinished { + return fuse.OK + } + dirPath := wfs.inodeToPath.GetPath(input.NodeId) - var counter uint64 var dirEntry fuse.DirEntry if input.Offset == 0 && !isPlusMode { - counter++ + dh.counter++ dirEntry.Ino = input.NodeId dirEntry.Name = "." dirEntry.Mode = toSystemMode(os.ModeDir) out.AddDirEntry(dirEntry) - counter++ + dh.counter++ parentDir, _ := dirPath.DirAndName() parentInode := wfs.inodeToPath.GetInode(util.FullPath(parentDir)) dirEntry.Ino = parentInode @@ -94,10 +160,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } processEachEntryFn := func(entry *filer.Entry, isLast bool) bool { - counter++ - if counter <= input.Offset { - return true - } + dh.counter++ dirEntry.Name = entry.Name() inode := wfs.inodeToPath.GetInode(dirPath.Child(dirEntry.Name)) dirEntry.Ino = inode @@ -113,6 +176,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } wfs.outputFilerEntry(entryOut, inode, entry) } + dh.lastEntryName = entry.Name() return true } @@ -120,13 +184,16 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) return fuse.EIO } - listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { return processEachEntryFn(entry, false) }) if listErr != nil { glog.Errorf("list meta cache: %v", listErr) return fuse.EIO } + if dh.counter < input.Length { + dh.isFinished = true + } return fuse.OK }