diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 076252051..7ad141ea5 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -3,7 +3,6 @@ package filesys import ( "context" "os" - "path" "time" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -31,6 +30,7 @@ var _ = fs.NodeGetxattrer(&Dir{}) var _ = fs.NodeSetxattrer(&Dir{}) var _ = fs.NodeRemovexattrer(&Dir{}) var _ = fs.NodeListxattrer(&Dir{}) +var _ = fs.NodeForgetter(&Dir{}) func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { @@ -86,14 +86,22 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { attr.BlockSize = 1024 * 1024 } -func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File { - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, - entryViewCache: nil, - } +func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { + return dir.wfs.getNode(filer2.NewFullPath(dir.Path, name), func() fs.Node { + return &File{ + Name: name, + dir: dir, + wfs: dir.wfs, + entry: entry, + entryViewCache: nil, + } + }) +} + +func (dir *Dir) newDirectory(fullpath filer2.FullPath, entry *filer_pb.Entry) fs.Node { + return dir.wfs.getNode(fullpath, func() fs.Node { + return &Dir{Path: string(fullpath), wfs: dir.wfs, entry: entry} + }) } func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, @@ -130,7 +138,8 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, } } - file := dir.newFile(req.Name, request.Entry) + node := dir.newFile(req.Name, request.Entry) + file := node.(*File) if !request.Entry.IsDirectory { file.isOpen = true } @@ -169,7 +178,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err }) if err == nil { - node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs} + node := dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), nil) return node, nil } @@ -197,12 +206,12 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. if entry != nil { if entry.IsDirectory { - node = &Dir{Path: string(fullFilePath), wfs: dir.wfs, entry: entry} + node = dir.newDirectory(fullFilePath, entry) } else { node = dir.newFile(req.Name, entry) } - resp.EntryValid = time.Second + // resp.EntryValid = time.Second resp.Attr.Inode = fullFilePath.AsInode() resp.Attr.Valid = time.Second resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) @@ -234,6 +243,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File} ret = append(ret, dirent) } + glog.V(4).Infof("dir ReadDirAll : %s %+v", fullpath, entry) dir.wfs.cacheSet(fullpath, entry, cacheTtl) }) if readErr != nil { @@ -312,11 +322,12 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { + glog.V(3).Infof("%v dir setattr %+v", dir.Path, req) + if err := dir.maybeLoadEntry(ctx); err != nil { return err } - glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle) if req.Valid.Mode() { dir.entry.Attributes.FileMode = uint32(req.Mode) } @@ -391,6 +402,12 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp } +func (dir *Dir) Forget() { + glog.V(3).Infof("Forget dir %s/%s", dir.Path) + + dir.wfs.forgetNode(filer2.FullPath(dir.Path)) +} + func (dir *Dir) maybeLoadEntry(ctx context.Context) error { if dir.entry == nil { parentDirPath, name := filer2.FullPath(dir.Path).DirAndName() diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 6b68e4ee9..1bd1a6470 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -15,10 +15,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector newDir := newDirectory.(*Dir) glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName) - dir.wfs.cacheDelete(filer2.NewFullPath(newDir.Path, req.NewName)) - dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.OldName)) - - return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: dir.Path, @@ -36,4 +33,27 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector return nil }) + + if err == nil { + oldPath := filer2.NewFullPath(dir.Path, req.OldName) + dir.wfs.cacheDelete(filer2.NewFullPath(newDir.Path, req.NewName)) + dir.wfs.cacheDelete(oldPath) + + oldFileNode := dir.wfs.getNode(oldPath, func() fs.Node { + return nil + }) + newDirNode := dir.wfs.getNode(filer2.FullPath(dir.Path), func() fs.Node { + return nil + }) + if oldFileNode != nil { + oldFile := oldFileNode.(*File) + oldFile.Name = req.NewName + if newDirNode != nil { + oldFile.dir = newDirNode.(*Dir) + } + } + dir.wfs.forgetNode(oldPath) + } + + return err } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 35d8f249a..f83944678 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -65,7 +65,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da // or buffer is full if adding new data, // flush current buffer and add new data - // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size) + glog.V(4).Infof("offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data)) if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { if chunk != nil { @@ -77,6 +77,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da return } pages.Offset = offset + glog.V(4).Infof("copy data0: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data)) copy(pages.Data, data) pages.Size = int64(len(data)) return @@ -86,7 +87,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da // when this happens, debug shows the data overlapping with existing data is empty // the data is not just append if offset == pages.Offset && int(pages.Size) < len(data) { - // glog.V(2).Infof("pages[%d,%d) pages.Data len=%v, data len=%d, pages.Size=%d", pages.Offset, pages.Offset+pages.Size, len(pages.Data), len(data), pages.Size) + glog.V(4).Infof("copy data1: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data)) copy(pages.Data[pages.Size:], data[pages.Size:]) } else { if pages.Size != 0 { @@ -95,6 +96,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da return pages.flushAndSave(ctx, offset, data) } } else { + glog.V(4).Infof("copy data2: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data)) copy(pages.Data[offset-pages.Offset:], data) } @@ -159,6 +161,8 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Contex return nil, nil } + glog.V(0).Infof("%s/%s saveExistingPagesToStorage [%d,%d): Data len=%d", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Size, len(pages.Data)) + return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 622ba6f57..d811cb179 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -23,6 +23,7 @@ var _ = fs.NodeGetxattrer(&File{}) var _ = fs.NodeSetxattrer(&File{}) var _ = fs.NodeRemovexattrer(&File{}) var _ = fs.NodeListxattrer(&File{}) +var _ = fs.NodeForgetter(&File{}) type File struct { Name string @@ -94,11 +95,12 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { + glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes) + if err := file.maybeLoadEntry(ctx); err != nil { return err } - glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes) if req.Valid.Size() { glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size) @@ -208,6 +210,13 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { return nil } +func (file *File) Forget() { + glog.V(3).Infof("Forget file %s/%s", file.dir.Path, file.Name) + + file.wfs.forgetNode(filer2.NewFullPath(file.dir.Path, file.Name)) + +} + func (file *File) maybeLoadEntry(ctx context.Context) error { if file.entry == nil || !file.isOpen { entry, err := file.wfs.maybeLoadEntry(ctx, file.dir.Path, file.Name) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index d3cc6329d..a2e5a9073 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -46,13 +46,18 @@ type WFS struct { option *Option listDirectoryEntriesCache *ccache.Cache - // contains all open handles - handles []*FileHandle - pathToHandleIndex map[filer2.FullPath]int - pathToHandleLock sync.Mutex - bufPool sync.Pool + // contains all open handles, protected by handlesLock + handlesLock sync.Mutex + handles []*FileHandle + + bufPool sync.Pool stats statsCache + + // nodes, protected by nodesLock + nodesLock sync.Mutex + nodes map[uint64]fs.Node + root fs.Node } type statsCache struct { filer_pb.StatisticsResponse @@ -63,19 +68,21 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), - pathToHandleIndex: make(map[filer2.FullPath]int), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) }, }, + nodes: make(map[uint64]fs.Node), } + wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs} + return wfs } func (wfs *WFS) Root() (fs.Node, error) { - return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil + return wfs.root, nil } func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { @@ -88,42 +95,35 @@ func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFil } func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { - wfs.pathToHandleLock.Lock() - defer wfs.pathToHandleLock.Unlock() fullpath := file.fullpath() + glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid) - index, found := wfs.pathToHandleIndex[fullpath] - if found && wfs.handles[index] != nil { - glog.V(2).Infoln(fullpath, "found fileHandle id", index) - return wfs.handles[index] - } + wfs.handlesLock.Lock() + defer wfs.handlesLock.Unlock() fileHandle = newFileHandle(file, uid, gid) for i, h := range wfs.handles { if h == nil { wfs.handles[i] = fileHandle fileHandle.handle = uint64(i) - wfs.pathToHandleIndex[fullpath] = i - glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle) + glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle) return } } wfs.handles = append(wfs.handles, fileHandle) fileHandle.handle = uint64(len(wfs.handles) - 1) - glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle) - wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) + glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle) return } func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) { - wfs.pathToHandleLock.Lock() - defer wfs.pathToHandleLock.Unlock() + wfs.handlesLock.Lock() + defer wfs.handlesLock.Unlock() - glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) - delete(wfs.pathToHandleIndex, fullpath) + glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) if int(handleId) < len(wfs.handles) { wfs.handles[int(handleId)] = nil } @@ -203,10 +203,33 @@ func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry { func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) { if entry == nil { wfs.listDirectoryEntriesCache.Delete(string(path)) - }else{ + } else { wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl) } } func (wfs *WFS) cacheDelete(path filer2.FullPath) { wfs.listDirectoryEntriesCache.Delete(string(path)) } + +func (wfs *WFS) getNode(fullpath filer2.FullPath, fn func() fs.Node) fs.Node { + wfs.nodesLock.Lock() + defer wfs.nodesLock.Unlock() + + node, found := wfs.nodes[fullpath.AsInode()] + if found { + return node + } + node = fn() + if node != nil { + wfs.nodes[fullpath.AsInode()] = node + } + return node +} + +func (wfs *WFS) forgetNode(fullpath filer2.FullPath) { + wfs.nodesLock.Lock() + defer wfs.nodesLock.Unlock() + + delete(wfs.nodes, fullpath.AsInode()) + +}