mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
purge old cache implementation
This commit is contained in:
parent
b9365de47b
commit
628b27ef3b
|
@ -20,7 +20,6 @@ type MountOptions struct {
|
||||||
umaskString *string
|
umaskString *string
|
||||||
nonempty *bool
|
nonempty *bool
|
||||||
outsideContainerClusterMode *bool
|
outsideContainerClusterMode *bool
|
||||||
asyncMetaDataCaching *bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -48,7 +47,6 @@ func init() {
|
||||||
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
|
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
|
||||||
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
|
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
|
||||||
mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
|
mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
|
||||||
mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdMount = &Command{
|
var cmdMount = &Command{
|
||||||
|
|
|
@ -167,7 +167,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
|
||||||
MountMtime: time.Now(),
|
MountMtime: time.Now(),
|
||||||
Umask: umask,
|
Umask: umask,
|
||||||
OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
|
OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
|
||||||
AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching,
|
|
||||||
Cipher: cipher,
|
Cipher: cipher,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -89,22 +89,18 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
||||||
return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
|
return &File{
|
||||||
return &File{
|
Name: name,
|
||||||
Name: name,
|
dir: dir,
|
||||||
dir: dir,
|
wfs: dir.wfs,
|
||||||
wfs: dir.wfs,
|
entry: entry,
|
||||||
entry: entry,
|
entryViewCache: nil,
|
||||||
entryViewCache: nil,
|
}
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
|
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
|
||||||
|
|
||||||
return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
|
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
|
||||||
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,9 +135,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
}
|
}
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
||||||
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -190,9 +184,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
||||||
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -213,15 +205,12 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
|
||||||
glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
|
glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
|
||||||
|
|
||||||
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
|
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
|
||||||
entry := dir.wfs.cacheGet(fullFilePath)
|
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
|
||||||
cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
|
if cacheErr == filer_pb.ErrNotFound {
|
||||||
if cacheErr == filer_pb.ErrNotFound {
|
return nil, fuse.ENOENT
|
||||||
return nil, fuse.ENOENT
|
|
||||||
}
|
|
||||||
entry = cachedEntry.ToProtoEntry()
|
|
||||||
}
|
}
|
||||||
|
entry := cachedEntry.ToProtoEntry()
|
||||||
|
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
|
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
|
||||||
|
@ -230,7 +219,6 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
|
||||||
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
|
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
|
||||||
return nil, fuse.ENOENT
|
return nil, fuse.ENOENT
|
||||||
}
|
}
|
||||||
dir.wfs.cacheSet(fullFilePath, entry, 5*time.Minute)
|
|
||||||
} else {
|
} else {
|
||||||
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
|
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
|
||||||
}
|
}
|
||||||
|
@ -262,7 +250,6 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
|
||||||
|
|
||||||
glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
|
glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
|
||||||
|
|
||||||
cacheTtl := 5 * time.Minute
|
|
||||||
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
|
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
|
||||||
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
|
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
|
||||||
inode := fullpath.AsInode()
|
inode := fullpath.AsInode()
|
||||||
|
@ -273,29 +260,19 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
|
||||||
dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File}
|
dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File}
|
||||||
ret = append(ret, dirent)
|
ret = append(ret, dirent)
|
||||||
}
|
}
|
||||||
dir.wfs.cacheSet(fullpath, entry, cacheTtl)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
|
||||||
listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
|
if listErr != nil {
|
||||||
if listErr != nil {
|
glog.Errorf("list meta cache: %v", listErr)
|
||||||
glog.Errorf("list meta cache: %v", listErr)
|
return nil, fuse.EIO
|
||||||
return nil, fuse.EIO
|
|
||||||
}
|
|
||||||
for _, cachedEntry := range listedEntries {
|
|
||||||
processEachEntryFn(cachedEntry.ToProtoEntry(), false)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
for _, cachedEntry := range listedEntries {
|
||||||
readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn)
|
processEachEntryFn(cachedEntry.ToProtoEntry(), false)
|
||||||
if readErr != nil {
|
|
||||||
glog.V(0).Infof("list %s: %v", dir.FullPath(), err)
|
|
||||||
return ret, fuse.EIO
|
|
||||||
}
|
}
|
||||||
|
return
|
||||||
|
|
||||||
return ret, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
|
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
|
||||||
|
@ -321,12 +298,7 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
||||||
|
|
||||||
dir.wfs.deleteFileChunks(entry.Chunks)
|
dir.wfs.deleteFileChunks(entry.Chunks)
|
||||||
|
|
||||||
dir.wfs.cacheDelete(filePath)
|
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
|
||||||
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
|
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
|
||||||
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(3).Infof("remove file: %v", req)
|
glog.V(3).Infof("remove file: %v", req)
|
||||||
err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false)
|
err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false)
|
||||||
|
@ -342,12 +314,8 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
||||||
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
|
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
|
||||||
|
|
||||||
t := util.NewFullPath(dir.FullPath(), req.Name)
|
t := util.NewFullPath(dir.FullPath(), req.Name)
|
||||||
dir.wfs.cacheDelete(t)
|
|
||||||
dir.wfs.fsNodeCache.DeleteFsNode(t)
|
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
|
||||||
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(3).Infof("remove directory entry: %v", req)
|
glog.V(3).Infof("remove directory entry: %v", req)
|
||||||
err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false)
|
err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false)
|
||||||
|
@ -384,8 +352,6 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
|
||||||
dir.entry.Attributes.Mtime = req.Mtime.Unix()
|
dir.entry.Attributes.Mtime = req.Mtime.Unix()
|
||||||
}
|
}
|
||||||
|
|
||||||
dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
|
|
||||||
|
|
||||||
return dir.saveEntry()
|
return dir.saveEntry()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -402,8 +368,6 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
|
|
||||||
|
|
||||||
return dir.saveEntry()
|
return dir.saveEntry()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -420,8 +384,6 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
|
|
||||||
|
|
||||||
return dir.saveEntry()
|
return dir.saveEntry()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -444,8 +406,6 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
|
||||||
|
|
||||||
func (dir *Dir) Forget() {
|
func (dir *Dir) Forget() {
|
||||||
glog.V(3).Infof("Forget dir %s", dir.FullPath())
|
glog.V(3).Infof("Forget dir %s", dir.FullPath())
|
||||||
|
|
||||||
dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) maybeLoadEntry() error {
|
func (dir *Dir) maybeLoadEntry() error {
|
||||||
|
@ -478,9 +438,7 @@ func (dir *Dir) saveEntry() error {
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
}
|
}
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
||||||
dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -42,9 +42,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
}
|
}
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
||||||
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -38,14 +38,5 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
dir.wfs.cacheDelete(newPath)
|
|
||||||
dir.wfs.cacheDelete(oldPath)
|
|
||||||
|
|
||||||
// fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
|
|
||||||
dir.wfs.fsNodeCache.Move(oldPath, newPath)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,8 +150,6 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
file.wfs.cacheDelete(file.fullpath())
|
|
||||||
|
|
||||||
return file.saveEntry()
|
return file.saveEntry()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -168,8 +166,6 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
file.wfs.cacheDelete(file.fullpath())
|
|
||||||
|
|
||||||
return file.saveEntry()
|
return file.saveEntry()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -186,8 +182,6 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
file.wfs.cacheDelete(file.fullpath())
|
|
||||||
|
|
||||||
return file.saveEntry()
|
return file.saveEntry()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -219,7 +213,6 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
||||||
func (file *File) Forget() {
|
func (file *File) Forget() {
|
||||||
t := util.NewFullPath(file.dir.FullPath(), file.Name)
|
t := util.NewFullPath(file.dir.FullPath(), file.Name)
|
||||||
glog.V(3).Infof("Forget file %s", t)
|
glog.V(3).Infof("Forget file %s", t)
|
||||||
file.wfs.fsNodeCache.DeleteFsNode(t)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) maybeLoadEntry(ctx context.Context) error {
|
func (file *File) maybeLoadEntry(ctx context.Context) error {
|
||||||
|
@ -278,9 +271,7 @@ func (file *File) saveEntry() error {
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
}
|
}
|
||||||
|
|
||||||
if file.wfs.option.AsyncMetaDataCaching {
|
file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
||||||
file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -214,9 +214,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||||
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
|
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if fh.f.wfs.option.AsyncMetaDataCaching {
|
fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
||||||
fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
|
|
||||||
}
|
|
||||||
|
|
||||||
fh.f.wfs.deleteFileChunks(garbages)
|
fh.f.wfs.deleteFileChunks(garbages)
|
||||||
for i, chunk := range garbages {
|
for i, chunk := range garbages {
|
||||||
|
|
|
@ -1,207 +0,0 @@
|
||||||
package filesys
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
|
||||||
"github.com/seaweedfs/fuse/fs"
|
|
||||||
)
|
|
||||||
|
|
||||||
type FsCache struct {
|
|
||||||
root *FsNode
|
|
||||||
sync.RWMutex
|
|
||||||
}
|
|
||||||
type FsNode struct {
|
|
||||||
parent *FsNode
|
|
||||||
node fs.Node
|
|
||||||
name string
|
|
||||||
childrenLock sync.RWMutex
|
|
||||||
children map[string]*FsNode
|
|
||||||
}
|
|
||||||
|
|
||||||
func newFsCache(root fs.Node) *FsCache {
|
|
||||||
return &FsCache{
|
|
||||||
root: &FsNode{
|
|
||||||
node: root,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
|
|
||||||
|
|
||||||
c.RLock()
|
|
||||||
defer c.RUnlock()
|
|
||||||
|
|
||||||
return c.doGetFsNode(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
|
|
||||||
t := c.root
|
|
||||||
for _, p := range path.Split() {
|
|
||||||
t = t.findChild(p)
|
|
||||||
if t == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return t.node
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
|
|
||||||
|
|
||||||
c.Lock()
|
|
||||||
defer c.Unlock()
|
|
||||||
|
|
||||||
c.doSetFsNode(path, node)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
|
|
||||||
t := c.root
|
|
||||||
for _, p := range path.Split() {
|
|
||||||
t = t.ensureChild(p)
|
|
||||||
}
|
|
||||||
t.node = node
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
|
|
||||||
|
|
||||||
c.Lock()
|
|
||||||
defer c.Unlock()
|
|
||||||
|
|
||||||
t := c.doGetFsNode(path)
|
|
||||||
if t != nil {
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
t = genNodeFn()
|
|
||||||
c.doSetFsNode(path, t)
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FsCache) DeleteFsNode(path util.FullPath) {
|
|
||||||
|
|
||||||
c.Lock()
|
|
||||||
defer c.Unlock()
|
|
||||||
|
|
||||||
t := c.root
|
|
||||||
for _, p := range path.Split() {
|
|
||||||
t = t.findChild(p)
|
|
||||||
if t == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if t.parent != nil {
|
|
||||||
t.parent.disconnectChild(t)
|
|
||||||
}
|
|
||||||
t.deleteSelf()
|
|
||||||
}
|
|
||||||
|
|
||||||
// oldPath and newPath are full path including the new name
|
|
||||||
func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
|
|
||||||
|
|
||||||
c.Lock()
|
|
||||||
defer c.Unlock()
|
|
||||||
|
|
||||||
// find old node
|
|
||||||
src := c.root
|
|
||||||
for _, p := range oldPath.Split() {
|
|
||||||
src = src.findChild(p)
|
|
||||||
if src == nil {
|
|
||||||
return src
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if src.parent != nil {
|
|
||||||
src.parent.disconnectChild(src)
|
|
||||||
}
|
|
||||||
|
|
||||||
// find new node
|
|
||||||
target := c.root
|
|
||||||
for _, p := range newPath.Split() {
|
|
||||||
target = target.ensureChild(p)
|
|
||||||
}
|
|
||||||
parent := target.parent
|
|
||||||
src.name = target.name
|
|
||||||
if dir, ok := src.node.(*Dir); ok {
|
|
||||||
dir.name = target.name // target is not Dir, but a shortcut
|
|
||||||
}
|
|
||||||
if f, ok := src.node.(*File); ok {
|
|
||||||
f.Name = target.name
|
|
||||||
if f.entry != nil {
|
|
||||||
f.entry.Name = f.Name
|
|
||||||
}
|
|
||||||
}
|
|
||||||
parent.disconnectChild(target)
|
|
||||||
|
|
||||||
target.deleteSelf()
|
|
||||||
|
|
||||||
src.connectToParent(parent)
|
|
||||||
|
|
||||||
return src
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *FsNode) connectToParent(parent *FsNode) {
|
|
||||||
n.parent = parent
|
|
||||||
oldNode := parent.findChild(n.name)
|
|
||||||
if oldNode != nil {
|
|
||||||
oldNode.deleteSelf()
|
|
||||||
}
|
|
||||||
if dir, ok := n.node.(*Dir); ok {
|
|
||||||
dir.parent = parent.node.(*Dir)
|
|
||||||
}
|
|
||||||
if f, ok := n.node.(*File); ok {
|
|
||||||
f.dir = parent.node.(*Dir)
|
|
||||||
}
|
|
||||||
n.childrenLock.Lock()
|
|
||||||
parent.children[n.name] = n
|
|
||||||
n.childrenLock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *FsNode) findChild(name string) *FsNode {
|
|
||||||
n.childrenLock.RLock()
|
|
||||||
defer n.childrenLock.RUnlock()
|
|
||||||
|
|
||||||
child, found := n.children[name]
|
|
||||||
if found {
|
|
||||||
return child
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *FsNode) ensureChild(name string) *FsNode {
|
|
||||||
n.childrenLock.Lock()
|
|
||||||
defer n.childrenLock.Unlock()
|
|
||||||
|
|
||||||
if n.children == nil {
|
|
||||||
n.children = make(map[string]*FsNode)
|
|
||||||
}
|
|
||||||
child, found := n.children[name]
|
|
||||||
if found {
|
|
||||||
return child
|
|
||||||
}
|
|
||||||
t := &FsNode{
|
|
||||||
parent: n,
|
|
||||||
node: nil,
|
|
||||||
name: name,
|
|
||||||
children: nil,
|
|
||||||
}
|
|
||||||
n.children[name] = t
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *FsNode) disconnectChild(child *FsNode) {
|
|
||||||
n.childrenLock.Lock()
|
|
||||||
delete(n.children, child.name)
|
|
||||||
n.childrenLock.Unlock()
|
|
||||||
child.parent = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *FsNode) deleteSelf() {
|
|
||||||
n.childrenLock.Lock()
|
|
||||||
for _, child := range n.children {
|
|
||||||
child.deleteSelf()
|
|
||||||
}
|
|
||||||
n.children = nil
|
|
||||||
n.childrenLock.Unlock()
|
|
||||||
|
|
||||||
n.node = nil
|
|
||||||
n.parent = nil
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,96 +0,0 @@
|
||||||
package filesys
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestPathSplit(t *testing.T) {
|
|
||||||
parts := util.FullPath("/").Split()
|
|
||||||
if len(parts) != 0 {
|
|
||||||
t.Errorf("expecting an empty list, but getting %d", len(parts))
|
|
||||||
}
|
|
||||||
|
|
||||||
parts = util.FullPath("/readme.md").Split()
|
|
||||||
if len(parts) != 1 {
|
|
||||||
t.Errorf("expecting an empty list, but getting %d", len(parts))
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFsCache(t *testing.T) {
|
|
||||||
|
|
||||||
cache := newFsCache(nil)
|
|
||||||
|
|
||||||
x := cache.GetFsNode(util.FullPath("/y/x"))
|
|
||||||
if x != nil {
|
|
||||||
t.Errorf("wrong node!")
|
|
||||||
}
|
|
||||||
|
|
||||||
p := util.FullPath("/a/b/c")
|
|
||||||
cache.SetFsNode(p, &File{Name: "cc"})
|
|
||||||
tNode := cache.GetFsNode(p)
|
|
||||||
tFile := tNode.(*File)
|
|
||||||
if tFile.Name != "cc" {
|
|
||||||
t.Errorf("expecting a FsNode")
|
|
||||||
}
|
|
||||||
|
|
||||||
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
|
|
||||||
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
|
|
||||||
cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
|
|
||||||
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
|
|
||||||
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
|
|
||||||
|
|
||||||
b := cache.GetFsNode(util.FullPath("/a/b"))
|
|
||||||
if b != nil {
|
|
||||||
t.Errorf("unexpected node!")
|
|
||||||
}
|
|
||||||
|
|
||||||
a := cache.GetFsNode(util.FullPath("/a"))
|
|
||||||
if a == nil {
|
|
||||||
t.Errorf("missing node!")
|
|
||||||
}
|
|
||||||
|
|
||||||
cache.DeleteFsNode(util.FullPath("/a"))
|
|
||||||
if b != nil {
|
|
||||||
t.Errorf("unexpected node!")
|
|
||||||
}
|
|
||||||
|
|
||||||
a = cache.GetFsNode(util.FullPath("/a"))
|
|
||||||
if a != nil {
|
|
||||||
t.Errorf("wrong DeleteFsNode!")
|
|
||||||
}
|
|
||||||
|
|
||||||
z := cache.GetFsNode(util.FullPath("/z"))
|
|
||||||
if z == nil {
|
|
||||||
t.Errorf("missing node!")
|
|
||||||
}
|
|
||||||
|
|
||||||
y := cache.GetFsNode(util.FullPath("/x/y"))
|
|
||||||
if y != nil {
|
|
||||||
t.Errorf("wrong node!")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFsCacheMove(t *testing.T) {
|
|
||||||
|
|
||||||
cache := newFsCache(nil)
|
|
||||||
|
|
||||||
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
|
|
||||||
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
|
|
||||||
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
|
|
||||||
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
|
|
||||||
|
|
||||||
cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
|
|
||||||
|
|
||||||
d := cache.GetFsNode(util.FullPath("/z/x/d"))
|
|
||||||
if d == nil {
|
|
||||||
t.Errorf("unexpected nil node!")
|
|
||||||
}
|
|
||||||
if d.(*File).Name != "dd" {
|
|
||||||
t.Errorf("unexpected non dd node!")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -10,18 +10,19 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/grace"
|
|
||||||
"github.com/karlseguin/ccache"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util/grace"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/fuse"
|
||||||
|
"github.com/seaweedfs/fuse/fs"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
|
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
||||||
"github.com/seaweedfs/fuse"
|
|
||||||
"github.com/seaweedfs/fuse/fs"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Option struct {
|
type Option struct {
|
||||||
|
@ -47,7 +48,6 @@ type Option struct {
|
||||||
|
|
||||||
OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
|
OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
|
||||||
Cipher bool // whether encrypt data on volume server
|
Cipher bool // whether encrypt data on volume server
|
||||||
AsyncMetaDataCaching bool // whether asynchronously cache meta data
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +56,6 @@ var _ = fs.FSStatfser(&WFS{})
|
||||||
|
|
||||||
type WFS struct {
|
type WFS struct {
|
||||||
option *Option
|
option *Option
|
||||||
listDirectoryEntriesCache *ccache.Cache
|
|
||||||
|
|
||||||
// contains all open handles, protected by handlesLock
|
// contains all open handles, protected by handlesLock
|
||||||
handlesLock sync.Mutex
|
handlesLock sync.Mutex
|
||||||
|
@ -67,7 +66,6 @@ type WFS struct {
|
||||||
stats statsCache
|
stats statsCache
|
||||||
|
|
||||||
root fs.Node
|
root fs.Node
|
||||||
fsNodeCache *FsCache
|
|
||||||
|
|
||||||
chunkCache *chunk_cache.ChunkCache
|
chunkCache *chunk_cache.ChunkCache
|
||||||
metaCache *meta_cache.MetaCache
|
metaCache *meta_cache.MetaCache
|
||||||
|
@ -80,7 +78,6 @@ type statsCache struct {
|
||||||
func NewSeaweedFileSystem(option *Option) *WFS {
|
func NewSeaweedFileSystem(option *Option) *WFS {
|
||||||
wfs := &WFS{
|
wfs := &WFS{
|
||||||
option: option,
|
option: option,
|
||||||
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
|
|
||||||
handles: make(map[uint64]*FileHandle),
|
handles: make(map[uint64]*FileHandle),
|
||||||
bufPool: sync.Pool{
|
bufPool: sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
|
@ -95,21 +92,18 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
||||||
wfs.chunkCache.Shutdown()
|
wfs.chunkCache.Shutdown()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if wfs.option.AsyncMetaDataCaching {
|
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
|
||||||
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
|
startTime := time.Now()
|
||||||
startTime := time.Now()
|
if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
|
||||||
if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
|
glog.V(0).Infof("failed to init meta cache: %v", err)
|
||||||
glog.V(0).Infof("failed to init meta cache: %v", err)
|
} else {
|
||||||
} else {
|
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
|
||||||
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
|
grace.OnInterrupt(func() {
|
||||||
grace.OnInterrupt(func() {
|
wfs.metaCache.Shutdown()
|
||||||
wfs.metaCache.Shutdown()
|
})
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
|
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
|
||||||
wfs.fsNodeCache = newFsCache(wfs.root)
|
|
||||||
|
|
||||||
return wfs
|
return wfs
|
||||||
}
|
}
|
||||||
|
@ -229,24 +223,6 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
|
|
||||||
item := wfs.listDirectoryEntriesCache.Get(string(path))
|
|
||||||
if item != nil && !item.Expired() {
|
|
||||||
return item.Value().(*filer_pb.Entry)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
|
|
||||||
if entry == nil {
|
|
||||||
wfs.listDirectoryEntriesCache.Delete(string(path))
|
|
||||||
} else {
|
|
||||||
wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func (wfs *WFS) cacheDelete(path util.FullPath) {
|
|
||||||
wfs.listDirectoryEntriesCache.Delete(string(path))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
|
func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
|
||||||
if !wfs.option.OutsideContainerClusterMode {
|
if !wfs.option.OutsideContainerClusterMode {
|
||||||
return hostAndPort
|
return hostAndPort
|
||||||
|
|
|
@ -3,10 +3,10 @@ package filesys
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/seaweedfs/fuse"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/seaweedfs/fuse"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
|
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
|
||||||
|
@ -110,43 +110,13 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis
|
||||||
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
|
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
|
||||||
|
|
||||||
fullpath := util.NewFullPath(dir, name)
|
fullpath := util.NewFullPath(dir, name)
|
||||||
entry = wfs.cacheGet(fullpath)
|
|
||||||
if entry != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
||||||
|
|
||||||
// read from async meta cache
|
// read from async meta cache
|
||||||
if wfs.option.AsyncMetaDataCaching {
|
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
|
||||||
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
|
if cacheErr == filer_pb.ErrNotFound {
|
||||||
if cacheErr == filer_pb.ErrNotFound {
|
return nil, fuse.ENOENT
|
||||||
return nil, fuse.ENOENT
|
|
||||||
}
|
|
||||||
return cachedEntry.ToProtoEntry(), nil
|
|
||||||
}
|
}
|
||||||
|
return cachedEntry.ToProtoEntry(), nil
|
||||||
|
|
||||||
err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
||||||
|
|
||||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
|
||||||
Name: name,
|
|
||||||
Directory: dir,
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := filer_pb.LookupEntry(client, request)
|
|
||||||
if err != nil {
|
|
||||||
if err == filer_pb.ErrNotFound {
|
|
||||||
glog.V(3).Infof("file attr read not found file %v: %v", request, err)
|
|
||||||
return fuse.ENOENT
|
|
||||||
}
|
|
||||||
glog.V(3).Infof("attr read %v: %v", request, err)
|
|
||||||
return fuse.EIO
|
|
||||||
}
|
|
||||||
|
|
||||||
entry = resp.Entry
|
|
||||||
wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue