From c3792c8352d72b9dac5e6032e9b0710e032e18d2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 27 Feb 2022 03:03:19 -0800 Subject: [PATCH] remove dead code --- weed/command/{mount2.go => mount.go} | 0 weed/command/mount_darwin.go | 8 - weed/command/mount_linux.go | 6 - ..._notsupported.go => mount_notsupported.go} | 2 +- weed/command/{mount2_std.go => mount_std.go} | 0 weed/filesys/dir.go | 655 ------------------ weed/filesys/dir_link.go | 169 ----- weed/filesys/dir_rename.go | 149 ---- weed/filesys/dirty_pages_chunked.go | 100 --- weed/filesys/file.go | 406 ----------- weed/filesys/filehandle.go | 346 --------- weed/filesys/fscache.go | 213 ------ weed/filesys/fscache_test.go | 115 --- weed/filesys/meta_cache/cache_config.go | 32 - weed/filesys/meta_cache/id_mapper.go | 101 --- weed/filesys/meta_cache/meta_cache.go | 154 ---- weed/filesys/meta_cache/meta_cache_init.go | 47 -- .../meta_cache/meta_cache_subscribe.go | 68 -- weed/filesys/page_writer.go | 98 --- .../page_writer/chunk_interval_list.go | 115 --- .../page_writer/chunk_interval_list_test.go | 49 -- weed/filesys/page_writer/dirty_pages.go | 30 - weed/filesys/page_writer/page_chunk.go | 16 - weed/filesys/page_writer/page_chunk_mem.go | 69 -- .../page_writer/page_chunk_swapfile.go | 121 ---- weed/filesys/page_writer/upload_pipeline.go | 182 ----- .../page_writer/upload_pipeline_lock.go | 63 -- .../page_writer/upload_pipeline_test.go | 47 -- weed/filesys/page_writer_pattern.go | 44 -- weed/filesys/permission.go | 63 -- weed/filesys/unimplemented.go | 22 - weed/filesys/wfs.go | 319 --------- weed/filesys/wfs_filer_client.go | 51 -- weed/filesys/wfs_write.go | 84 --- weed/filesys/xattr.go | 153 ---- weed/mount/dirty_pages_chunked.go | 2 +- weed/mount/page_writer.go | 2 +- 37 files changed, 3 insertions(+), 4098 deletions(-) rename weed/command/{mount2.go => mount.go} (100%) rename weed/command/{mount2_notsupported.go => mount_notsupported.go} (79%) rename weed/command/{mount2_std.go => mount_std.go} (100%) delete mode 100644 weed/filesys/dir.go delete mode 100644 weed/filesys/dir_link.go delete mode 100644 weed/filesys/dir_rename.go delete mode 100644 weed/filesys/dirty_pages_chunked.go delete mode 100644 weed/filesys/file.go delete mode 100644 weed/filesys/filehandle.go delete mode 100644 weed/filesys/fscache.go delete mode 100644 weed/filesys/fscache_test.go delete mode 100644 weed/filesys/meta_cache/cache_config.go delete mode 100644 weed/filesys/meta_cache/id_mapper.go delete mode 100644 weed/filesys/meta_cache/meta_cache.go delete mode 100644 weed/filesys/meta_cache/meta_cache_init.go delete mode 100644 weed/filesys/meta_cache/meta_cache_subscribe.go delete mode 100644 weed/filesys/page_writer.go delete mode 100644 weed/filesys/page_writer/chunk_interval_list.go delete mode 100644 weed/filesys/page_writer/chunk_interval_list_test.go delete mode 100644 weed/filesys/page_writer/dirty_pages.go delete mode 100644 weed/filesys/page_writer/page_chunk.go delete mode 100644 weed/filesys/page_writer/page_chunk_mem.go delete mode 100644 weed/filesys/page_writer/page_chunk_swapfile.go delete mode 100644 weed/filesys/page_writer/upload_pipeline.go delete mode 100644 weed/filesys/page_writer/upload_pipeline_lock.go delete mode 100644 weed/filesys/page_writer/upload_pipeline_test.go delete mode 100644 weed/filesys/page_writer_pattern.go delete mode 100644 weed/filesys/permission.go delete mode 100644 weed/filesys/unimplemented.go delete mode 100644 weed/filesys/wfs.go delete mode 100644 weed/filesys/wfs_filer_client.go delete mode 100644 weed/filesys/wfs_write.go delete mode 100644 weed/filesys/xattr.go diff --git a/weed/command/mount2.go b/weed/command/mount.go similarity index 100% rename from weed/command/mount2.go rename to weed/command/mount.go diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go index f0a5581e7..05d6a1bc4 100644 --- a/weed/command/mount_darwin.go +++ b/weed/command/mount_darwin.go @@ -1,13 +1,5 @@ package command -import ( - "github.com/seaweedfs/fuse" -) - -func osSpecificMountOptions() []fuse.MountOption { - return []fuse.MountOption{} -} - func checkMountPointAvailable(dir string) bool { return true } diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go index 25c4f72cf..aebb14e61 100644 --- a/weed/command/mount_linux.go +++ b/weed/command/mount_linux.go @@ -6,8 +6,6 @@ import ( "io" "os" "strings" - - "github.com/seaweedfs/fuse" ) const ( @@ -137,10 +135,6 @@ func parseInfoFile(r io.Reader) ([]*Info, error) { return out, nil } -func osSpecificMountOptions() []fuse.MountOption { - return []fuse.MountOption{} -} - func checkMountPointAvailable(dir string) bool { mountPoint := dir if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") { diff --git a/weed/command/mount2_notsupported.go b/weed/command/mount_notsupported.go similarity index 79% rename from weed/command/mount2_notsupported.go rename to weed/command/mount_notsupported.go index 075b73436..894c8e313 100644 --- a/weed/command/mount2_notsupported.go +++ b/weed/command/mount_notsupported.go @@ -8,7 +8,7 @@ import ( "runtime" ) -func runMount2(cmd *Command, args []string) bool { +func runMount(cmd *Command, args []string) bool { fmt.Printf("Mount is not supported on %s %s\n", runtime.GOOS, runtime.GOARCH) return true diff --git a/weed/command/mount2_std.go b/weed/command/mount_std.go similarity index 100% rename from weed/command/mount2_std.go rename to weed/command/mount_std.go diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go deleted file mode 100644 index b40addfe7..000000000 --- a/weed/filesys/dir.go +++ /dev/null @@ -1,655 +0,0 @@ -package filesys - -import ( - "bytes" - "context" - "math" - "os" - "strings" - "syscall" - "time" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type Dir struct { - name string - wfs *WFS - entry *filer_pb.Entry - parent *Dir - id uint64 -} - -var _ = fs.Node(&Dir{}) - -var _ = fs.NodeIdentifier(&Dir{}) -var _ = fs.NodeCreater(&Dir{}) -var _ = fs.NodeMknoder(&Dir{}) -var _ = fs.NodeMkdirer(&Dir{}) -var _ = fs.NodeFsyncer(&Dir{}) -var _ = fs.NodeRequestLookuper(&Dir{}) -var _ = fs.HandleReadDirAller(&Dir{}) -var _ = fs.NodeRemover(&Dir{}) -var _ = fs.NodeRenamer(&Dir{}) -var _ = fs.NodeSetattrer(&Dir{}) -var _ = fs.NodeGetxattrer(&Dir{}) -var _ = fs.NodeSetxattrer(&Dir{}) -var _ = fs.NodeRemovexattrer(&Dir{}) -var _ = fs.NodeListxattrer(&Dir{}) -var _ = fs.NodeForgetter(&Dir{}) - -func (dir *Dir) Id() uint64 { - if dir.parent == nil { - return 1 - } - return dir.id -} - -func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { - - entry, err := dir.maybeLoadEntry() - if err != nil { - glog.V(3).Infof("dir Attr %s, err: %+v", dir.FullPath(), err) - return err - } - - // https://github.com/bazil/fuse/issues/196 - attr.Valid = time.Second - attr.Inode = dir.Id() - attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir - attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) - attr.Ctime = time.Unix(entry.Attributes.Mtime, 0) - attr.Atime = time.Unix(entry.Attributes.Mtime, 0) - attr.Gid = entry.Attributes.Gid - attr.Uid = entry.Attributes.Uid - - if dir.FullPath() == dir.wfs.option.FilerMountRootPath { - attr.BlockSize = blockSize - } - - glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) - - return nil -} - -func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - - glog.V(4).Infof("dir Getxattr %s", dir.FullPath()) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - return getxattr(entry, req, resp) -} - -func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { - // fsync works at OS level - // write the file chunks to the filerGrpcAddress - glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req) - - return nil -} - -func (dir *Dir) newFile(name string, fileMode os.FileMode) fs.Node { - - fileFullPath := util.NewFullPath(dir.FullPath(), name) - fileId := fileFullPath.AsInode(fileMode) - dir.wfs.handlesLock.Lock() - existingHandle, found := dir.wfs.handles[fileId] - dir.wfs.handlesLock.Unlock() - - if found { - glog.V(4).Infof("newFile found opened file handle: %+v", fileFullPath) - return existingHandle.f - } - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - id: fileId, - } -} - -func (dir *Dir) newDirectory(fullpath util.FullPath) fs.Node { - - return &Dir{name: fullpath.Name(), wfs: dir.wfs, parent: dir, id: fullpath.AsInode(os.ModeDir)} - -} - -func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, - resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { - - if err := checkName(req.Name); err != nil { - return nil, nil, err - } - - exclusive := req.Flags&fuse.OpenExclusive != 0 - isDirectory := req.Mode&os.ModeDir > 0 - - if exclusive || isDirectory { - _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, exclusive) - if err != nil { - return nil, nil, err - } - } - var node fs.Node - if isDirectory { - node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name)) - return node, node, nil - } - - node = dir.newFile(req.Name, req.Mode) - file := node.(*File) - file.entry = &filer_pb.Entry{ - Name: req.Name, - IsDirectory: req.Mode&os.ModeDir > 0, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), - Uid: req.Uid, - Gid: req.Gid, - Collection: dir.wfs.option.Collection, - Replication: dir.wfs.option.Replication, - TtlSec: dir.wfs.option.TtlSec, - }, - } - file.dirtyMetadata = true - fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) - return file, fh, nil - -} - -func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) { - - if err := checkName(req.Name); err != nil { - return nil, err - } - - glog.V(3).Infof("dir %s Mknod %+v", dir.FullPath(), req) - - _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false) - - if err != nil { - return nil, err - } - var node fs.Node - node = dir.newFile(req.Name, req.Mode) - return node, nil -} - -func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) { - dirFullPath := dir.FullPath() - request := &filer_pb.CreateEntryRequest{ - Directory: dirFullPath, - Entry: &filer_pb.Entry{ - Name: name, - IsDirectory: mode&os.ModeDir > 0, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(mode &^ dir.wfs.option.Umask), - Uid: uid, - Gid: gid, - Collection: dir.wfs.option.Collection, - Replication: dir.wfs.option.Replication, - TtlSec: dir.wfs.option.TtlSec, - }, - }, - OExcl: exclusive, - Signatures: []int32{dir.wfs.signature}, - } - glog.V(1).Infof("create %s/%s", dirFullPath, name) - - err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(request.Entry) - defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) - - if err := filer_pb.CreateEntry(client, request); err != nil { - if strings.Contains(err.Error(), "EEXIST") { - return fuse.EEXIST - } - glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err) - return fuse.EIO - } - - if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { - glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err) - return fuse.EIO - } - - return nil - }) - return request, err -} - -func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { - - if err := checkName(req.Name); err != nil { - return nil, err - } - - glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name) - - newEntry := &filer_pb.Entry{ - Name: req.Name, - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), - Uid: req.Uid, - Gid: req.Gid, - }, - } - - dirFullPath := dir.FullPath() - - err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(newEntry) - defer dir.wfs.mapPbIdFromFilerToLocal(newEntry) - - request := &filer_pb.CreateEntryRequest{ - Directory: dirFullPath, - Entry: newEntry, - Signatures: []int32{dir.wfs.signature}, - } - - glog.V(1).Infof("mkdir: %v", request) - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err) - return err - } - - if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { - glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err) - return fuse.EIO - } - - return nil - }) - - if err == nil { - node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name)) - - return node, nil - } - - glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err) - - return nil, fuse.EIO -} - -func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - - if err := checkName(req.Name); err != nil { - return nil, err - } - - dirPath := util.FullPath(dir.FullPath()) - // glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String()) - - fullFilePath := dirPath.Child(req.Name) - visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) - if visitErr != nil { - glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) - return nil, fuse.EIO - } - localEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - - if localEntry == nil { - // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) - entry, err := filer_pb.GetEntry(dir.wfs, fullFilePath) - if err != nil { - glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) - return nil, fuse.ENOENT - } - localEntry = filer.FromPbEntry(string(dirPath), entry) - } else { - glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) - } - - if localEntry != nil { - if localEntry.IsDirectory() { - node = dir.newDirectory(fullFilePath) - } else { - node = dir.newFile(req.Name, localEntry.Attr.Mode) - } - - // resp.EntryValid = time.Second - resp.Attr.Valid = time.Second - resp.Attr.Size = localEntry.FileSize - resp.Attr.Mtime = localEntry.Attr.Mtime - resp.Attr.Crtime = localEntry.Attr.Crtime - resp.Attr.Mode = localEntry.Attr.Mode - resp.Attr.Gid = localEntry.Attr.Gid - resp.Attr.Uid = localEntry.Attr.Uid - if localEntry.HardLinkCounter > 0 { - resp.Attr.Nlink = uint32(localEntry.HardLinkCounter) - } - - return node, nil - } - - glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err) - return nil, fuse.ENOENT -} - -func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - - dirPath := util.FullPath(dir.FullPath()) - glog.V(4).Infof("dir ReadDirAll %s", dirPath) - - processEachEntryFn := func(entry *filer.Entry, isLast bool) { - if entry.IsDirectory() { - dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir, Inode: dirPath.Child(entry.Name()).AsInode(os.ModeDir)} - ret = append(ret, dirent) - } else { - dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode)), Inode: dirPath.Child(entry.Name()).AsInode(entry.Attr.Mode)} - ret = append(ret, dirent) - } - } - - if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil { - glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) - return nil, fuse.EIO - } - listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { - processEachEntryFn(entry, false) - return true - }) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) - return nil, fuse.EIO - } - - // create proper . and .. directories - ret = append(ret, fuse.Dirent{ - Inode: dirPath.AsInode(os.ModeDir), - Name: ".", - Type: fuse.DT_Dir, - }) - - // return the correct parent inode for the mount root - var inode uint64 - if string(dirPath) == dir.wfs.option.FilerMountRootPath { - inode = dir.wfs.option.MountParentInode - } else { - inode = util.FullPath(dir.parent.FullPath()).AsInode(os.ModeDir) - } - - ret = append(ret, fuse.Dirent{ - Inode: inode, - Name: "..", - Type: fuse.DT_Dir, - }) - - return -} - -func findFileType(mode uint16) fuse.DirentType { - switch mode & (syscall.S_IFMT & 0xffff) { - case syscall.S_IFSOCK: - return fuse.DT_Socket - case syscall.S_IFLNK: - return fuse.DT_Link - case syscall.S_IFREG: - return fuse.DT_File - case syscall.S_IFBLK: - return fuse.DT_Block - case syscall.S_IFDIR: - return fuse.DT_Dir - case syscall.S_IFCHR: - return fuse.DT_Char - case syscall.S_IFIFO: - return fuse.DT_FIFO - } - return fuse.DT_File -} - -func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { - - parentHasPermission := false - parentEntry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - if err := checkPermission(parentEntry, req.Uid, req.Gid, true); err == nil { - parentHasPermission = true - } - - dirFullPath := dir.FullPath() - filePath := util.NewFullPath(dirFullPath, req.Name) - entry, err := filer_pb.GetEntry(dir.wfs, filePath) - if err != nil { - return err - } - if !parentHasPermission { - if err := checkPermission(entry, req.Uid, req.Gid, true); err != nil { - return err - } - } - - if !req.Dir { - return dir.removeOneFile(entry, req) - } - - return dir.removeFolder(entry, req) - -} - -func (dir *Dir) removeOneFile(entry *filer_pb.Entry, req *fuse.RemoveRequest) error { - - dirFullPath := dir.FullPath() - filePath := util.NewFullPath(dirFullPath, req.Name) - - // first, ensure the filer store can correctly delete - glog.V(3).Infof("remove file: %v", req) - isDeleteData := entry != nil && entry.HardLinkCounter <= 1 - err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature}) - if err != nil { - glog.V(3).Infof("not found remove file %s: %v", filePath, err) - return fuse.ENOENT - } - - // then, delete meta cache and fsNode cache - if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil { - glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err) - return fuse.ESTALE - } - - // remove current file handle if any - dir.wfs.handlesLock.Lock() - defer dir.wfs.handlesLock.Unlock() - inodeId := filePath.AsInode(0) - if fh, ok := dir.wfs.handles[inodeId]; ok { - delete(dir.wfs.handles, inodeId) - fh.isDeleted = true - } - - return nil - -} - -func (dir *Dir) removeFolder(entry *filer_pb.Entry, req *fuse.RemoveRequest) error { - - dirFullPath := dir.FullPath() - - glog.V(3).Infof("remove directory entry: %v", req) - ignoreRecursiveErr := true // ignore recursion error since the OS should manage it - err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, true, true, ignoreRecursiveErr, false, []int32{dir.wfs.signature}) - if err != nil { - glog.V(0).Infof("remove %s/%s: %v", dirFullPath, req.Name, err) - if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { - return fuse.EEXIST - } - return fuse.ENOENT - } - - t := util.NewFullPath(dirFullPath, req.Name) - dir.wfs.metaCache.DeleteEntry(context.Background(), t) - - return nil - -} - -func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - - glog.V(4).Infof("%v dir setattr %+v mode=%d", dir.FullPath(), req, req.Mode) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - if req.Valid.Mode() { - entry.Attributes.FileMode = uint32(req.Mode) - } - - if req.Valid.Uid() { - entry.Attributes.Uid = req.Uid - } - - if req.Valid.Gid() { - entry.Attributes.Gid = req.Gid - } - - if req.Valid.Mtime() { - entry.Attributes.Mtime = req.Mtime.Unix() - } - - entry.Attributes.Mtime = time.Now().Unix() - - return dir.saveEntry(entry) - -} - -func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - - glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - if err := setxattr(entry, req); err != nil { - return err - } - - return dir.saveEntry(entry) - -} - -func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - - glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - if err := removexattr(entry, req); err != nil { - return err - } - - return dir.saveEntry(entry) - -} - -func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - - glog.V(4).Infof("dir Listxattr %s", dir.FullPath()) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - if err := listxattr(entry, req, resp); err != nil { - return err - } - - return nil - -} - -func (dir *Dir) Forget() { - glog.V(4).Infof("Forget dir %s", dir.FullPath()) -} - -func (dir *Dir) maybeLoadEntry() (*filer_pb.Entry, error) { - parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName() - return dir.wfs.maybeLoadEntry(parentDirPath, name) -} - -func (dir *Dir) saveEntry(entry *filer_pb.Entry) error { - - parentDir, name := util.FullPath(dir.FullPath()).DirAndName() - - return dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(entry) - defer dir.wfs.mapPbIdFromFilerToLocal(entry) - - request := &filer_pb.UpdateEntryRequest{ - Directory: parentDir, - Entry: entry, - Signatures: []int32{dir.wfs.signature}, - } - - glog.V(1).Infof("save dir entry: %v", request) - _, err := client.UpdateEntry(context.Background(), request) - if err != nil { - glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) - return fuse.EIO - } - - if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { - glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) - return fuse.ESTALE - } - - return nil - }) -} - -func (dir *Dir) FullPath() string { - var parts []string - for p := dir; p != nil; p = p.parent { - if strings.HasPrefix(p.name, "/") { - if len(p.name) > 1 { - parts = append(parts, p.name[1:]) - } - } else { - parts = append(parts, p.name) - } - } - - if len(parts) == 0 { - return "/" - } - - var buf bytes.Buffer - for i := len(parts) - 1; i >= 0; i-- { - buf.WriteString("/") - buf.WriteString(parts[i]) - } - return buf.String() -} diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go deleted file mode 100644 index 7fc98a5a6..000000000 --- a/weed/filesys/dir_link.go +++ /dev/null @@ -1,169 +0,0 @@ -package filesys - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/util" - "os" - "syscall" - "time" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" -) - -var _ = fs.NodeLinker(&Dir{}) -var _ = fs.NodeSymlinker(&Dir{}) -var _ = fs.NodeReadlinker(&File{}) - -const ( - HARD_LINK_MARKER = '\x01' -) - -func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.Node, error) { - - if err := checkName(req.NewName); err != nil { - return nil, err - } - - oldFile, ok := old.(*File) - if !ok { - glog.Errorf("old node is not a file: %+v", old) - } - - glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName) - - oldEntry, err := oldFile.maybeLoadEntry(ctx) - if err != nil { - return nil, err - } - - if oldEntry == nil { - return nil, fuse.EIO - } - - // update old file to hardlink mode - if len(oldEntry.HardLinkId) == 0 { - oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER) - oldEntry.HardLinkCounter = 1 - } - oldEntry.HardLinkCounter++ - updateOldEntryRequest := &filer_pb.UpdateEntryRequest{ - Directory: oldFile.dir.FullPath(), - Entry: oldEntry, - Signatures: []int32{dir.wfs.signature}, - } - - // CreateLink 1.2 : update new file to hardlink mode - oldEntry.Attributes.Mtime = time.Now().Unix() - request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), - Entry: &filer_pb.Entry{ - Name: req.NewName, - IsDirectory: false, - Attributes: oldEntry.Attributes, - Chunks: oldEntry.Chunks, - Extended: oldEntry.Extended, - HardLinkId: oldEntry.HardLinkId, - HardLinkCounter: oldEntry.HardLinkCounter, - }, - Signatures: []int32{dir.wfs.signature}, - } - - // apply changes to the filer, and also apply to local metaCache - err = dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(request.Entry) - defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) - - if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil { - glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err) - return fuse.EIO - } - dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry)) - - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err) - return fuse.EIO - } - dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) - - return nil - }) - - if err != nil { - return nil, fuse.EIO - } - - // create new file node - newNode := dir.newFile(req.NewName, 0) - newFile := newNode.(*File) - - return newFile, err - -} - -func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) { - - if err := checkName(req.NewName); err != nil { - return nil, err - } - - glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) - - request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), - Entry: &filer_pb.Entry{ - Name: req.NewName, - IsDirectory: false, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32((os.FileMode(0777) | os.ModeSymlink) &^ dir.wfs.option.Umask), - Uid: req.Uid, - Gid: req.Gid, - SymlinkTarget: req.Target, - }, - }, - Signatures: []int32{dir.wfs.signature}, - } - - err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(request.Entry) - defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) - - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err) - return fuse.EIO - } - - dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) - - return nil - }) - - symlink := dir.newFile(req.NewName, os.ModeSymlink) - - return symlink, err - -} - -func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) { - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return "", err - } - - if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 { - return "", fuse.Errno(syscall.EINVAL) - } - - glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget) - - return entry.Attributes.SymlinkTarget, nil - -} diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go deleted file mode 100644 index 4cc9959f6..000000000 --- a/weed/filesys/dir_rename.go +++ /dev/null @@ -1,149 +0,0 @@ -package filesys - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - "io" - "os" - "strings" -) - -func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { - - if err := checkName(req.NewName); err != nil { - return err - } - if err := checkName(req.OldName); err != nil { - return err - } - - newDir := newDirectory.(*Dir) - - newPath := util.NewFullPath(newDir.FullPath(), req.NewName) - oldPath := util.NewFullPath(dir.FullPath(), req.OldName) - - glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) - - // update remote filer - err := dir.wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - request := &filer_pb.StreamRenameEntryRequest{ - OldDirectory: dir.FullPath(), - OldName: req.OldName, - NewDirectory: newDir.FullPath(), - NewName: req.NewName, - Signatures: []int32{dir.wfs.signature}, - } - - stream, err := client.StreamRenameEntry(ctx, request) - if err != nil { - glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - glog.V(0).Infof("dir Rename %s => %s receive: %v", oldPath, newPath, recvErr) - if strings.Contains(recvErr.Error(), "not empty") { - return fuse.EEXIST - } - if strings.Contains(recvErr.Error(), "not directory") { - return fuse.ENOTDIR - } - return recvErr - } - } - - if err = dir.handleRenameResponse(ctx, resp); err != nil { - glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err) - return err - } - - } - - return nil - - }) - - return err -} - -func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error { - // comes from filer StreamRenameEntry, can only be create or delete entry - - if resp.EventNotification.NewEntry != nil { - // with new entry, the old entry name also exists. This is the first step to create new entry - newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry) - if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil { - return err - } - - oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath) - oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name - - entryFileMode := newEntry.Attr.Mode - - oldPath := oldParent.Child(oldName) - newPath := newParent.Child(newName) - oldFsNode := NodeWithId(oldPath.AsInode(entryFileMode)) - newFsNode := NodeWithId(newPath.AsInode(entryFileMode)) - newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode(os.ModeDir))) - var newDir *Dir - if found { - newDir = newDirNode.(*Dir) - } - dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) { - if file, ok := internalNode.(*File); ok { - glog.V(4).Infof("internal file node %s", oldParent.Child(oldName)) - file.Name = newName - file.id = uint64(newFsNode) - if found { - file.dir = newDir - } - } - if dir, ok := internalNode.(*Dir); ok { - glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName)) - dir.name = newName - dir.id = uint64(newFsNode) - if found { - dir.parent = newDir - } - } - }) - - // change file handle - if !newEntry.IsDirectory() { - inodeId := oldPath.AsInode(entryFileMode) - dir.wfs.handlesLock.Lock() - if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle != nil { - glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath) - delete(dir.wfs.handles, inodeId) - existingHandle.handle = newPath.AsInode(entryFileMode) - existingHandle.f.entry.Name = newName - existingHandle.f.id = newPath.AsInode(entryFileMode) - dir.wfs.handles[newPath.AsInode(entryFileMode)] = existingHandle - } - dir.wfs.handlesLock.Unlock() - } - - } else if resp.EventNotification.OldEntry != nil { - // without new entry, only old entry name exists. This is the second step to delete old entry - if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil { - return err - } - } - - return nil - -} diff --git a/weed/filesys/dirty_pages_chunked.go b/weed/filesys/dirty_pages_chunked.go deleted file mode 100644 index 002922958..000000000 --- a/weed/filesys/dirty_pages_chunked.go +++ /dev/null @@ -1,100 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "sync" - "time" -) - -type ChunkedDirtyPages struct { - fh *FileHandle - writeWaitGroup sync.WaitGroup - chunkAddLock sync.Mutex - lastErr error - collection string - replication string - uploadPipeline *page_writer.UploadPipeline - hasWrites bool -} - -var ( - _ = page_writer.DirtyPages(&ChunkedDirtyPages{}) -) - -func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { - - dirtyPages := &ChunkedDirtyPages{ - fh: fh, - } - - dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.f.wfs.option.ConcurrentWriters) - - return dirtyPages -} - -func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) { - pages.hasWrites = true - - glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.f.fullpath(), offset, offset+int64(len(data))) - pages.uploadPipeline.SaveDataAt(data, offset) - - return -} - -func (pages *ChunkedDirtyPages) FlushData() error { - if !pages.hasWrites { - return nil - } - pages.uploadPipeline.FlushAll() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - return nil -} - -func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - if !pages.hasWrites { - return - } - return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) -} - -func (pages *ChunkedDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { - - mtime := time.Now().UnixNano() - defer cleanupFn() - - chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk}) - pages.fh.entryViewCache = nil - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size) - pages.chunkAddLock.Unlock() - -} - -func (pages ChunkedDirtyPages) Destroy() { - pages.uploadPipeline.Shutdown() -} - -func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) { - pages.uploadPipeline.LockForRead(startOffset, stopOffset) -} -func (pages *ChunkedDirtyPages) UnlockForRead(startOffset, stopOffset int64) { - pages.uploadPipeline.UnlockForRead(startOffset, stopOffset) -} diff --git a/weed/filesys/file.go b/weed/filesys/file.go deleted file mode 100644 index 8028d3912..000000000 --- a/weed/filesys/file.go +++ /dev/null @@ -1,406 +0,0 @@ -package filesys - -import ( - "context" - "os" - "sort" - "time" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -const blockSize = 512 - -var _ = fs.Node(&File{}) -var _ = fs.NodeIdentifier(&File{}) -var _ = fs.NodeOpener(&File{}) -var _ = fs.NodeFsyncer(&File{}) -var _ = fs.NodeSetattrer(&File{}) -var _ = fs.NodeGetxattrer(&File{}) -var _ = fs.NodeSetxattrer(&File{}) -var _ = fs.NodeRemovexattrer(&File{}) -var _ = fs.NodeListxattrer(&File{}) -var _ = fs.NodeForgetter(&File{}) - -type File struct { - Name string - dir *Dir - wfs *WFS - entry *filer_pb.Entry - isOpen int - dirtyMetadata bool - id uint64 -} - -func (file *File) fullpath() util.FullPath { - return util.NewFullPath(file.dir.FullPath(), file.Name) -} - -func (file *File) Id() uint64 { - return file.id -} - -func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) { - - glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if entry == nil { - return fuse.ENOENT - } - - attr.Inode = file.Id() - attr.Valid = time.Second - attr.Mode = os.FileMode(entry.Attributes.FileMode) - attr.Size = filer.FileSize(entry) - if file.isOpen > 0 { - attr.Size = entry.Attributes.FileSize - glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) - } - attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) - attr.Ctime = time.Unix(entry.Attributes.Mtime, 0) - attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - attr.Gid = entry.Attributes.Gid - attr.Uid = entry.Attributes.Uid - attr.BlockSize = blockSize - attr.Blocks = (attr.Size + blockSize - 1) / blockSize - if entry.HardLinkCounter > 0 { - attr.Nlink = uint32(entry.HardLinkCounter) - } - - return nil - -} - -func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - - // glog.V(4).Infof("file Getxattr %s", file.fullpath()) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - return getxattr(entry, req, resp) -} - -func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { - - glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - // resp.Flags |= fuse.OpenDirectIO - - handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) - - resp.Handle = fuse.HandleID(handle.handle) - - glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle) - - return handle, nil - -} - -func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - - glog.V(4).Infof("%v file setattr %+v mode=%d", file.fullpath(), req, req.Mode) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if req.Valid.Size() { - - glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks)) - if req.Size < filer.FileSize(entry) { - // fmt.Printf("truncate %v \n", fullPath) - var chunks []*filer_pb.FileChunk - var truncatedChunks []*filer_pb.FileChunk - for _, chunk := range entry.Chunks { - int64Size := int64(chunk.Size) - if chunk.Offset+int64Size > int64(req.Size) { - // this chunk is truncated - int64Size = int64(req.Size) - chunk.Offset - if int64Size > 0 { - chunks = append(chunks, chunk) - glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size) - chunk.Size = uint64(int64Size) - } else { - glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString()) - truncatedChunks = append(truncatedChunks, chunk) - } - } - } - // set the new chunks and reset entry cache - entry.Chunks = chunks - file.wfs.handlesLock.Lock() - existingHandle, found := file.wfs.handles[file.Id()] - file.wfs.handlesLock.Unlock() - if found { - existingHandle.entryViewCache = nil - } - - } - entry.Attributes.Mtime = time.Now().Unix() - entry.Attributes.FileSize = req.Size - file.dirtyMetadata = true - } - - if req.Valid.Mode() && entry.Attributes.FileMode != uint32(req.Mode) { - entry.Attributes.FileMode = uint32(req.Mode) - entry.Attributes.Mtime = time.Now().Unix() - file.dirtyMetadata = true - } - - if req.Valid.Uid() && entry.Attributes.Uid != req.Uid { - entry.Attributes.Uid = req.Uid - entry.Attributes.Mtime = time.Now().Unix() - file.dirtyMetadata = true - } - - if req.Valid.Gid() && entry.Attributes.Gid != req.Gid { - entry.Attributes.Gid = req.Gid - entry.Attributes.Mtime = time.Now().Unix() - file.dirtyMetadata = true - } - - if req.Valid.Crtime() { - entry.Attributes.Crtime = req.Crtime.Unix() - entry.Attributes.Mtime = time.Now().Unix() - file.dirtyMetadata = true - } - - if req.Valid.Mtime() && entry.Attributes.Mtime != req.Mtime.Unix() { - entry.Attributes.Mtime = req.Mtime.Unix() - file.dirtyMetadata = true - } - - if req.Valid.Handle() { - // fmt.Printf("file handle => %d\n", req.Handle) - } - - if file.isOpen > 0 { - return nil - } - - if !file.dirtyMetadata { - return nil - } - - return file.saveEntry(entry) - -} - -func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - - glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if err := setxattr(entry, req); err != nil { - return err - } - file.dirtyMetadata = true - - if file.isOpen > 0 { - return nil - } - - return file.saveEntry(entry) - -} - -func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - - glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if err := removexattr(entry, req); err != nil { - return err - } - file.dirtyMetadata = true - - if file.isOpen > 0 { - return nil - } - - return file.saveEntry(entry) - -} - -func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - - glog.V(4).Infof("file Listxattr %s", file.fullpath()) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if err := listxattr(entry, req, resp); err != nil { - return err - } - - return nil - -} - -func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { - - // write the file chunks to the filerGrpcAddress - glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) - - return file.wfs.Fsync(file, req.Header) - -} - -func (file *File) Forget() { - t := util.NewFullPath(file.dir.FullPath(), file.Name) - glog.V(4).Infof("Forget file %s", t) - file.wfs.ReleaseHandle(t, fuse.HandleID(t.AsInode(file.entry.FileMode()))) - -} - -func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) { - - file.wfs.handlesLock.Lock() - handle, found := file.wfs.handles[file.Id()] - file.wfs.handlesLock.Unlock() - entry = file.entry - if found { - // glog.V(4).Infof("maybeLoadEntry found opened file %s/%s", file.dir.FullPath(), file.Name) - entry = handle.f.entry - } - - if entry != nil { - if len(entry.HardLinkId) == 0 { - // only always reload hard link - return entry, nil - } - } - entry, err = file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) - if err != nil { - glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return entry, err - } - if entry != nil { - // file.entry = entry - } else { - glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err) - } - return entry, nil -} - -func lessThan(a, b *filer_pb.FileChunk) bool { - if a.Mtime == b.Mtime { - return a.Fid.FileKey < b.Fid.FileKey - } - return a.Mtime < b.Mtime -} - -func (file *File) addChunks(chunks []*filer_pb.FileChunk) { - - // find the earliest incoming chunk - newChunks := chunks - earliestChunk := newChunks[0] - for i := 1; i < len(newChunks); i++ { - if lessThan(earliestChunk, newChunks[i]) { - earliestChunk = newChunks[i] - } - } - - entry := file.getEntry() - if entry == nil { - return - } - - // pick out-of-order chunks from existing chunks - for _, chunk := range entry.Chunks { - if lessThan(earliestChunk, chunk) { - chunks = append(chunks, chunk) - } - } - - // sort incoming chunks - sort.Slice(chunks, func(i, j int) bool { - return lessThan(chunks[i], chunks[j]) - }) - - glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks)) - - entry.Chunks = append(entry.Chunks, newChunks...) -} - -func (file *File) saveEntry(entry *filer_pb.Entry) error { - return file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - file.wfs.mapPbIdFromLocalToFiler(entry) - defer file.wfs.mapPbIdFromFilerToLocal(entry) - - request := &filer_pb.CreateEntryRequest{ - Directory: file.dir.FullPath(), - Entry: entry, - Signatures: []int32{file.wfs.signature}, - } - - glog.V(4).Infof("save file entry: %v", request) - _, err := client.CreateEntry(context.Background(), request) - if err != nil { - glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return fuse.EIO - } - - file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) - - file.dirtyMetadata = false - - return nil - }) -} - -func (file *File) getEntry() *filer_pb.Entry { - return file.entry -} - -func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { - err := file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{ - Directory: file.dir.FullPath(), - Name: entry.Name, - } - - glog.V(4).Infof("download entry: %v", request) - resp, err := client.CacheRemoteObjectToLocalCluster(context.Background(), request) - if err != nil { - glog.Errorf("CacheRemoteObjectToLocalCluster file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return fuse.EIO - } - - entry = resp.Entry - - file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry)) - - file.dirtyMetadata = false - - return nil - }) - - return entry, err -} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go deleted file mode 100644 index 35f87373e..000000000 --- a/weed/filesys/filehandle.go +++ /dev/null @@ -1,346 +0,0 @@ -package filesys - -import ( - "context" - "fmt" - "io" - "net/http" - "os" - "sync" - "time" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -type FileHandle struct { - // cache file has been written to - dirtyPages *PageWriter - entryViewCache []filer.VisibleInterval - reader io.ReaderAt - contentType string - handle uint64 - sync.Mutex - - f *File - NodeId fuse.NodeID // file or directory the request is about - Uid uint32 // user ID of process making request - Gid uint32 // group ID of process making request - isDeleted bool -} - -func newFileHandle(file *File, uid, gid uint32) *FileHandle { - fh := &FileHandle{ - f: file, - Uid: uid, - Gid: gid, - } - // dirtyPages: newContinuousDirtyPages(file, writeOnly), - fh.dirtyPages = newPageWriter(fh, file.wfs.option.ChunkSizeLimit) - entry := fh.f.getEntry() - if entry != nil { - entry.Attributes.FileSize = filer.FileSize(entry) - } - - return fh -} - -var _ = fs.Handle(&FileHandle{}) - -// var _ = fs.HandleReadAller(&FileHandle{}) -var _ = fs.HandleReader(&FileHandle{}) -var _ = fs.HandleFlusher(&FileHandle{}) -var _ = fs.HandleWriter(&FileHandle{}) -var _ = fs.HandleReleaser(&FileHandle{}) - -func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - - fh.Lock() - defer fh.Unlock() - - glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) - - if req.Size <= 0 { - return nil - } - - buff := resp.Data[:cap(resp.Data)] - if req.Size > cap(resp.Data) { - // should not happen - buff = make([]byte, req.Size) - } - - fh.lockForRead(req.Offset, len(buff)) - defer fh.unlockForRead(req.Offset, len(buff)) - totalRead, err := fh.readFromChunks(buff, req.Offset) - if err == nil || err == io.EOF { - maxStop := fh.readFromDirtyPages(buff, req.Offset) - totalRead = max(maxStop-req.Offset, totalRead) - } - - if err == io.EOF { - err = nil - } - - if err != nil { - glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err) - return fuse.EIO - } - - if totalRead > int64(len(buff)) { - glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead) - totalRead = min(int64(len(buff)), totalRead) - } - if err == nil { - resp.Data = buff[:totalRead] - } - - return err -} - -func (fh *FileHandle) lockForRead(startOffset int64, size int) { - fh.dirtyPages.LockForRead(startOffset, startOffset+int64(size)) -} -func (fh *FileHandle) unlockForRead(startOffset int64, size int) { - fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size)) -} - -func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { - maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) - return -} - -func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { - - entry := fh.f.getEntry() - if entry == nil { - return 0, io.EOF - } - - if entry.IsInRemoteOnly() { - glog.V(4).Infof("download remote entry %s", fh.f.fullpath()) - newEntry, err := fh.f.downloadRemoteEntry(entry) - if err != nil { - glog.V(1).Infof("download remote entry %s: %v", fh.f.fullpath(), err) - return 0, err - } - entry = newEntry - } - - fileSize := int64(filer.FileSize(entry)) - fileFullPath := fh.f.fullpath() - - if fileSize == 0 { - glog.V(1).Infof("empty fh %v", fileFullPath) - return 0, io.EOF - } - - if offset+int64(len(buff)) <= int64(len(entry.Content)) { - totalRead := copy(buff, entry.Content[offset:]) - glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) - return int64(totalRead), nil - } - - var chunkResolveErr error - if fh.entryViewCache == nil { - fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, fileSize) - if chunkResolveErr != nil { - return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) - } - fh.reader = nil - } - - reader := fh.reader - if reader == nil { - chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, fileSize) - glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews)) - for _, chunkView := range chunkViews { - glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId) - } - reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) - } - fh.reader = reader - - totalRead, err := reader.ReadAt(buff, offset) - - if err != nil && err != io.EOF { - glog.Errorf("file handle read %s: %v", fileFullPath, err) - } - - glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) - - return int64(totalRead), err -} - -// Write to the file handle -func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { - - fh.dirtyPages.writerPattern.MonitorWriteAt(req.Offset, len(req.Data)) - - fh.Lock() - defer fh.Unlock() - - // write the request to volume servers - data := req.Data - if len(data) <= 512 && req.Offset == 0 { - // fuse message cacheable size - data = make([]byte, len(req.Data)) - copy(data, req.Data) - } - - entry := fh.f.getEntry() - if entry == nil { - return fuse.EIO - } - - entry.Content = nil - entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize))) - // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - - fh.dirtyPages.AddPage(req.Offset, data) - - resp.Size = len(data) - - if req.Offset == 0 { - // detect mime type - fh.contentType = http.DetectContentType(data) - fh.f.dirtyMetadata = true - } - - fh.f.dirtyMetadata = true - - return nil -} - -func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { - - glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen) - - fh.f.wfs.handlesLock.Lock() - fh.f.isOpen-- - fh.f.wfs.handlesLock.Unlock() - - if fh.f.isOpen <= 0 { - fh.f.entry = nil - fh.entryViewCache = nil - fh.reader = nil - - fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) - fh.dirtyPages.Destroy() - } - - if fh.f.isOpen < 0 { - glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0) - fh.f.isOpen = 0 - return nil - } - - return nil -} - -func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { - - glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle) - - if fh.isDeleted { - glog.V(4).Infof("Flush %v fh %d skip deleted", fh.f.fullpath(), fh.handle) - return nil - } - - fh.Lock() - defer fh.Unlock() - - if err := fh.doFlush(ctx, req.Header); err != nil { - glog.Errorf("Flush doFlush %s: %v", fh.f.Name, err) - return err - } - - return nil -} - -func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { - // flush works at fh level - // send the data to the OS - glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle) - - if err := fh.dirtyPages.FlushData(); err != nil { - glog.Errorf("%v doFlush: %v", fh.f.fullpath(), err) - return fuse.EIO - } - - if !fh.f.dirtyMetadata { - return nil - } - - err := fh.f.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - entry := fh.f.getEntry() - if entry == nil { - return nil - } - - if entry.Attributes != nil { - entry.Attributes.Mime = fh.contentType - if entry.Attributes.Uid == 0 { - entry.Attributes.Uid = header.Uid - } - if entry.Attributes.Gid == 0 { - entry.Attributes.Gid = header.Gid - } - if entry.Attributes.Crtime == 0 { - entry.Attributes.Crtime = time.Now().Unix() - } - entry.Attributes.Mtime = time.Now().Unix() - entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) - entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions() - } - - request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.FullPath(), - Entry: entry, - Signatures: []int32{fh.f.wfs.signature}, - } - - glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks)) - for i, chunk := range entry.Chunks { - glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) - } - - manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks) - - chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) - chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) - if manifestErr != nil { - // not good, but should be ok - glog.V(0).Infof("MaybeManifestize: %v", manifestErr) - } - entry.Chunks = append(chunks, manifestChunks...) - - fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry) - defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry) - - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) - return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) - } - - fh.f.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) - - return nil - }) - - if err == nil { - fh.f.dirtyMetadata = false - } - - if err != nil { - glog.Errorf("%v fh %d flush: %v", fh.f.fullpath(), fh.handle, err) - return fuse.EIO - } - - return nil -} diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go deleted file mode 100644 index 6b1012090..000000000 --- a/weed/filesys/fscache.go +++ /dev/null @@ -1,213 +0,0 @@ -package filesys - -import ( - "sync" - - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -type FsCache struct { - root *FsNode - sync.RWMutex -} -type FsNode struct { - parent *FsNode - node fs.Node - name string - childrenLock sync.RWMutex - children map[string]*FsNode -} - -func newFsCache(root fs.Node) *FsCache { - return &FsCache{ - root: &FsNode{ - node: root, - }, - } -} - -func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { - - c.RLock() - defer c.RUnlock() - - return c.doGetFsNode(path) -} - -func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { - t := c.root - for _, p := range path.Split() { - t = t.findChild(p) - if t == nil { - return nil - } - } - return t.node -} - -func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { - - c.Lock() - defer c.Unlock() - - c.doSetFsNode(path, node) -} - -func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { - t := c.root - for _, p := range path.Split() { - t = t.ensureChild(p) - } - t.node = node -} - -func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { - - c.Lock() - defer c.Unlock() - - t := c.doGetFsNode(path) - if t != nil { - return t - } - t = genNodeFn() - c.doSetFsNode(path, t) - return t -} - -func (c *FsCache) DeleteFsNode(path util.FullPath) { - - c.Lock() - defer c.Unlock() - - t := c.root - for _, p := range path.Split() { - t = t.findChild(p) - if t == nil { - return - } - } - if t.parent != nil { - t.parent.disconnectChild(t) - } - t.deleteSelf() -} - -// oldPath and newPath are full path including the new name -func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { - - c.Lock() - defer c.Unlock() - - // find old node - src := c.root - for _, p := range oldPath.Split() { - src = src.findChild(p) - if src == nil { - return src - } - } - if src.parent != nil { - src.parent.disconnectChild(src) - } - - // find new node - target := c.root - for _, p := range newPath.Split() { - target = target.ensureChild(p) - } - parent := target.parent - if dir, ok := src.node.(*Dir); ok { - dir.name = target.name // target is not Dir, but a shortcut - } - if f, ok := src.node.(*File); ok { - f.Name = target.name - entry := f.getEntry() - if entry != nil { - entry.Name = f.Name - } - } - parent.disconnectChild(target) - - target.deleteSelf() - - src.name = target.name - src.connectToParent(parent) - - return src -} - -func (n *FsNode) connectToParent(parent *FsNode) { - n.parent = parent - oldNode := parent.findChild(n.name) - if oldNode != nil { - oldNode.deleteSelf() - } - if dir, ok := n.node.(*Dir); ok { - if parent.node != nil { - dir.parent = parent.node.(*Dir) - } - } - if f, ok := n.node.(*File); ok { - if parent.node != nil { - f.dir = parent.node.(*Dir) - } - } - n.childrenLock.Lock() - parent.children[n.name] = n - n.childrenLock.Unlock() -} - -func (n *FsNode) findChild(name string) *FsNode { - n.childrenLock.RLock() - defer n.childrenLock.RUnlock() - - child, found := n.children[name] - if found { - return child - } - return nil -} - -func (n *FsNode) ensureChild(name string) *FsNode { - n.childrenLock.Lock() - defer n.childrenLock.Unlock() - - if n.children == nil { - n.children = make(map[string]*FsNode) - } - child, found := n.children[name] - if found { - return child - } - t := &FsNode{ - parent: n, - node: nil, - name: name, - children: nil, - } - n.children[name] = t - return t -} - -func (n *FsNode) disconnectChild(child *FsNode) { - n.childrenLock.Lock() - delete(n.children, child.name) - n.childrenLock.Unlock() - child.parent = nil -} - -func (n *FsNode) deleteSelf() { - n.childrenLock.Lock() - for _, child := range n.children { - child.deleteSelf() - } - n.children = nil - n.childrenLock.Unlock() - - n.node = nil - n.parent = nil - -} diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go deleted file mode 100644 index 1152eb32e..000000000 --- a/weed/filesys/fscache_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package filesys - -import ( - "testing" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -func TestPathSplit(t *testing.T) { - parts := util.FullPath("/").Split() - if len(parts) != 0 { - t.Errorf("expecting an empty list, but getting %d", len(parts)) - } - - parts = util.FullPath("/readme.md").Split() - if len(parts) != 1 { - t.Errorf("expecting an empty list, but getting %d", len(parts)) - } - -} - -func TestFsCache(t *testing.T) { - - cache := newFsCache(nil) - - x := cache.GetFsNode(util.FullPath("/y/x")) - if x != nil { - t.Errorf("wrong node!") - } - - p := util.FullPath("/a/b/c") - cache.SetFsNode(p, &File{Name: "cc"}) - tNode := cache.GetFsNode(p) - tFile := tNode.(*File) - if tFile.Name != "cc" { - t.Errorf("expecting a FsNode") - } - - cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) - cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) - cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"}) - cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) - cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) - - b := cache.GetFsNode(util.FullPath("/a/b")) - if b != nil { - t.Errorf("unexpected node!") - } - - a := cache.GetFsNode(util.FullPath("/a")) - if a == nil { - t.Errorf("missing node!") - } - - cache.DeleteFsNode(util.FullPath("/a")) - if b != nil { - t.Errorf("unexpected node!") - } - - a = cache.GetFsNode(util.FullPath("/a")) - if a != nil { - t.Errorf("wrong DeleteFsNode!") - } - - z := cache.GetFsNode(util.FullPath("/z")) - if z == nil { - t.Errorf("missing node!") - } - - y := cache.GetFsNode(util.FullPath("/x/y")) - if y != nil { - t.Errorf("wrong node!") - } - -} - -func TestFsCacheMove(t *testing.T) { - - cache := newFsCache(nil) - - cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) - cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) - cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) - cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) - - cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x")) - - d := cache.GetFsNode(util.FullPath("/z/x/d")) - if d == nil { - t.Errorf("unexpected nil node!") - } - if d.(*File).Name != "dd" { - t.Errorf("unexpected non dd node!") - } - -} - -func TestFsCacheMove2(t *testing.T) { - - cache := newFsCache(nil) - - cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) - cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) - - cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e")) - - d := cache.GetFsNode(util.FullPath("/a/b/e")) - if d == nil { - t.Errorf("unexpected nil node!") - } - if d.(*File).Name != "e" { - t.Errorf("unexpected node!") - } - -} diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go deleted file mode 100644 index e6593ebde..000000000 --- a/weed/filesys/meta_cache/cache_config.go +++ /dev/null @@ -1,32 +0,0 @@ -package meta_cache - -import "github.com/chrislusf/seaweedfs/weed/util" - -var ( - _ = util.Configuration(&cacheConfig{}) -) - -// implementing util.Configuraion -type cacheConfig struct { - dir string -} - -func (c cacheConfig) GetString(key string) string { - return c.dir -} - -func (c cacheConfig) GetBool(key string) bool { - panic("implement me") -} - -func (c cacheConfig) GetInt(key string) int { - panic("implement me") -} - -func (c cacheConfig) GetStringSlice(key string) []string { - panic("implement me") -} - -func (c cacheConfig) SetDefault(key string, value interface{}) { - panic("implement me") -} diff --git a/weed/filesys/meta_cache/id_mapper.go b/weed/filesys/meta_cache/id_mapper.go deleted file mode 100644 index 4a2179f31..000000000 --- a/weed/filesys/meta_cache/id_mapper.go +++ /dev/null @@ -1,101 +0,0 @@ -package meta_cache - -import ( - "fmt" - "strconv" - "strings" -) - -type UidGidMapper struct { - uidMapper *IdMapper - gidMapper *IdMapper -} - -type IdMapper struct { - localToFiler map[uint32]uint32 - filerToLocal map[uint32]uint32 -} - -// UidGidMapper translates local uid/gid to filer uid/gid -// The local storage always persists the same as the filer. -// The local->filer translation happens when updating the filer first and later saving to meta_cache. -// And filer->local happens when reading from the meta_cache. -func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) { - uidMapper, err := newIdMapper(uidPairsStr) - if err != nil { - return nil, err - } - gidMapper, err := newIdMapper(gidPairStr) - if err != nil { - return nil, err - } - - return &UidGidMapper{ - uidMapper: uidMapper, - gidMapper: gidMapper, - }, nil -} - -func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) { - return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid) -} -func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) { - return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid) -} - -func (m *IdMapper) LocalToFiler(id uint32) uint32 { - value, found := m.localToFiler[id] - if found { - return value - } - return id -} -func (m *IdMapper) FilerToLocal(id uint32) uint32 { - value, found := m.filerToLocal[id] - if found { - return value - } - return id -} - -func newIdMapper(pairsStr string) (*IdMapper, error) { - - localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr) - if err != nil { - return nil, err - } - - return &IdMapper{ - localToFiler: localToFiler, - filerToLocal: filerToLocal, - }, nil - -} - -func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) { - - if pairsStr == "" { - return - } - - localToFiler = make(map[uint32]uint32) - filerToLocal = make(map[uint32]uint32) - for _, pairStr := range strings.Split(pairsStr, ",") { - pair := strings.Split(pairStr, ":") - localUidStr, filerUidStr := pair[0], pair[1] - localUid, localUidErr := strconv.Atoi(localUidStr) - if localUidErr != nil { - err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr) - return - } - filerUid, filerUidErr := strconv.Atoi(filerUidStr) - if filerUidErr != nil { - err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr) - return - } - localToFiler[uint32(localUid)] = uint32(filerUid) - filerToLocal[uint32(filerUid)] = uint32(localUid) - } - - return -} diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go deleted file mode 100644 index dc8e6838f..000000000 --- a/weed/filesys/meta_cache/meta_cache.go +++ /dev/null @@ -1,154 +0,0 @@ -package meta_cache - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filer/leveldb" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" - "os" -) - -// need to have logic similar to FilerStoreWrapper -// e.g. fill fileId field for chunks - -type MetaCache struct { - localStore filer.VirtualFilerStore - // sync.RWMutex - visitedBoundary *bounded_tree.BoundedTree - uidGidMapper *UidGidMapper - invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry) -} - -func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache { - return &MetaCache{ - localStore: openMetaStore(dbFolder), - visitedBoundary: bounded_tree.NewBoundedTree(baseDir), - uidGidMapper: uidGidMapper, - invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) { - invalidateFunc(fullpath, entry) - }, - } -} - -func openMetaStore(dbFolder string) filer.VirtualFilerStore { - - os.RemoveAll(dbFolder) - os.MkdirAll(dbFolder, 0755) - - store := &leveldb.LevelDBStore{} - config := &cacheConfig{ - dir: dbFolder, - } - - if err := store.Initialize(config, ""); err != nil { - glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) - } - - return filer.NewFilerStoreWrapper(store) - -} - -func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - return mc.doInsertEntry(ctx, entry) -} - -func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error { - return mc.localStore.InsertEntry(ctx, entry) -} - -func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - - oldDir, _ := oldPath.DirAndName() - if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { - if oldPath != "" { - if newEntry != nil && oldPath == newEntry.FullPath { - // skip the unnecessary deletion - // leave the update to the following InsertEntry operation - } else { - glog.V(3).Infof("DeleteEntry %s", oldPath) - if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { - return err - } - } - } - } else { - // println("unknown old directory:", oldDir) - } - - if newEntry != nil { - newDir, _ := newEntry.DirAndName() - if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { - glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name()) - if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil { - return err - } - } - } - return nil -} - -func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - return mc.localStore.UpdateEntry(ctx, entry) -} - -func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) { - //mc.RLock() - //defer mc.RUnlock() - entry, err = mc.localStore.FindEntry(ctx, fp) - if err != nil { - return nil, err - } - mc.mapIdFromFilerToLocal(entry) - return -} - -func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - //mc.Lock() - //defer mc.Unlock() - return mc.localStore.DeleteEntry(ctx, fp) -} - -func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error { - //mc.RLock() - //defer mc.RUnlock() - - if !mc.visitedBoundary.HasVisited(dirPath) { - // if this request comes after renaming, it should be fine - glog.Warningf("unsynchronized dir: %v", dirPath) - } - - _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { - mc.mapIdFromFilerToLocal(entry) - return eachEntryFunc(entry) - }) - if err != nil { - return err - } - return err -} - -func (mc *MetaCache) Shutdown() { - //mc.Lock() - //defer mc.Unlock() - mc.localStore.Shutdown() -} - -func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) { - entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid) -} - -func (mc *MetaCache) Debug() { - if debuggable, ok := mc.localStore.(filer.Debuggable); ok { - println("start debugging") - debuggable.Debug(os.Stderr) - } -} diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go deleted file mode 100644 index 07098bf6b..000000000 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ /dev/null @@ -1,47 +0,0 @@ -package meta_cache - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { - - return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { - - glog.V(4).Infof("ReadDirAllEntries %s ...", path) - - util.Retry("ReadDirAllEntries", func() error { - err = filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { - entry := filer.FromPbEntry(string(path), pbEntry) - if IsHiddenSystemEntry(string(path), entry.Name()) { - return nil - } - if err := mc.doInsertEntry(context.Background(), entry); err != nil { - glog.V(0).Infof("read %s: %v", entry.FullPath, err) - return err - } - if entry.IsDirectory() { - childDirectories = append(childDirectories, entry.Name()) - } - return nil - }) - return err - }) - - if err != nil { - err = fmt.Errorf("list %s: %v", path, err) - } - - return - }) -} - -func IsHiddenSystemEntry(dir, name string) bool { - return dir == "/" && (name == "topics" || name == "etc") -} diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go deleted file mode 100644 index 881fee08f..000000000 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ /dev/null @@ -1,68 +0,0 @@ -package meta_cache - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { - - processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { - message := resp.EventNotification - - for _, sig := range message.Signatures { - if sig == selfSignature && selfSignature != 0 { - return nil - } - } - - dir := resp.Directory - var oldPath util.FullPath - var newEntry *filer.Entry - if message.OldEntry != nil { - oldPath = util.NewFullPath(dir, message.OldEntry.Name) - glog.V(4).Infof("deleting %v", oldPath) - } - - if message.NewEntry != nil { - if message.NewParentPath != "" { - dir = message.NewParentPath - } - key := util.NewFullPath(dir, message.NewEntry.Name) - glog.V(4).Infof("creating %v", key) - newEntry = filer.FromPbEntry(dir, message.NewEntry) - } - err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) - if err == nil { - if message.OldEntry != nil && message.NewEntry != nil { - oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) - mc.invalidateFunc(oldKey, message.OldEntry) - if message.OldEntry.Name != message.NewEntry.Name { - newKey := util.NewFullPath(dir, message.NewEntry.Name) - mc.invalidateFunc(newKey, message.NewEntry) - } - } else if message.OldEntry == nil && message.NewEntry != nil { - // no need to invaalidate - } else if message.OldEntry != nil && message.NewEntry == nil { - oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) - mc.invalidateFunc(oldKey, message.OldEntry) - } - } - - return err - - } - - util.RetryForever("followMetaUpdates", func() error { - return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true) - }, func(err error) bool { - glog.Errorf("follow metadata updates: %v", err) - return true - }) - - return nil -} diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go deleted file mode 100644 index b8e884f58..000000000 --- a/weed/filesys/page_writer.go +++ /dev/null @@ -1,98 +0,0 @@ -package filesys - -import ( - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" -) - -type PageWriter struct { - fh *FileHandle - collection string - replication string - chunkSize int64 - writerPattern *WriterPattern - - randomWriter page_writer.DirtyPages -} - -var ( - _ = page_writer.DirtyPages(&PageWriter{}) -) - -func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { - pw := &PageWriter{ - fh: fh, - chunkSize: chunkSize, - writerPattern: NewWriterPattern(chunkSize), - randomWriter: newMemoryChunkPages(fh, chunkSize), - // randomWriter: newTempFileDirtyPages(fh.f, chunkSize), - } - return pw -} - -func (pw *PageWriter) AddPage(offset int64, data []byte) { - - glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.fh.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) - - chunkIndex := offset / pw.chunkSize - for i := chunkIndex; len(data) > 0; i++ { - writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - pw.addToOneChunk(i, offset, data[:writeSize]) - offset += writeSize - data = data[writeSize:] - } -} - -func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { - pw.randomWriter.AddPage(offset, data) -} - -func (pw *PageWriter) FlushData() error { - pw.writerPattern.Reset() - return pw.randomWriter.FlushData() -} - -func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { - glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.f.fullpath(), offset, offset+int64(len(data))) - - chunkIndex := offset / pw.chunkSize - for i := chunkIndex; len(data) > 0; i++ { - readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - - maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) - - offset += readSize - data = data[readSize:] - } - - return -} - -func (pw *PageWriter) GetStorageOptions() (collection, replication string) { - return pw.randomWriter.GetStorageOptions() -} - -func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) { - pw.randomWriter.LockForRead(startOffset, stopOffset) -} - -func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) { - pw.randomWriter.UnlockForRead(startOffset, stopOffset) -} - -func (pw *PageWriter) Destroy() { - pw.randomWriter.Destroy() -} - -func max(x, y int64) int64 { - if x > y { - return x - } - return y -} -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go deleted file mode 100644 index e6dc5d1f5..000000000 --- a/weed/filesys/page_writer/chunk_interval_list.go +++ /dev/null @@ -1,115 +0,0 @@ -package page_writer - -import "math" - -// ChunkWrittenInterval mark one written interval within one page chunk -type ChunkWrittenInterval struct { - StartOffset int64 - stopOffset int64 - prev *ChunkWrittenInterval - next *ChunkWrittenInterval -} - -func (interval *ChunkWrittenInterval) Size() int64 { - return interval.stopOffset - interval.StartOffset -} - -func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool { - return interval.stopOffset-interval.StartOffset == chunkSize -} - -// ChunkWrittenIntervalList mark written intervals within one page chunk -type ChunkWrittenIntervalList struct { - head *ChunkWrittenInterval - tail *ChunkWrittenInterval -} - -func newChunkWrittenIntervalList() *ChunkWrittenIntervalList { - list := &ChunkWrittenIntervalList{ - head: &ChunkWrittenInterval{ - StartOffset: -1, - stopOffset: -1, - }, - tail: &ChunkWrittenInterval{ - StartOffset: math.MaxInt64, - stopOffset: math.MaxInt64, - }, - } - list.head.next = list.tail - list.tail.prev = list.head - return list -} - -func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) { - interval := &ChunkWrittenInterval{ - StartOffset: startOffset, - stopOffset: stopOffset, - } - list.addInterval(interval) -} - -func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool { - return list.size() == 1 && list.head.next.isComplete(chunkSize) -} -func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) { - for t := list.head; t != nil; t = t.next { - writtenByteCount += t.Size() - } - return -} - -func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { - - p := list.head - for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next { - } - q := list.tail - for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev { - } - - if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset { - // merge p and q together - p.stopOffset = q.stopOffset - unlinkNodesBetween(p, q.next) - return - } - if interval.StartOffset <= p.stopOffset { - // merge new interval into p - p.stopOffset = interval.stopOffset - unlinkNodesBetween(p, q) - return - } - if q.StartOffset <= interval.stopOffset { - // merge new interval into q - q.StartOffset = interval.StartOffset - unlinkNodesBetween(p, q) - return - } - - // add the new interval between p and q - unlinkNodesBetween(p, q) - p.next = interval - interval.prev = p - q.prev = interval - interval.next = q - -} - -// unlinkNodesBetween remove all nodes after start and before stop, exclusive -func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) { - if start.next == stop { - return - } - start.next.prev = nil - start.next = stop - stop.prev.next = nil - stop.prev = start -} - -func (list *ChunkWrittenIntervalList) size() int { - var count int - for t := list.head; t != nil; t = t.next { - count++ - } - return count - 2 -} diff --git a/weed/filesys/page_writer/chunk_interval_list_test.go b/weed/filesys/page_writer/chunk_interval_list_test.go deleted file mode 100644 index b22f5eb5d..000000000 --- a/weed/filesys/page_writer/chunk_interval_list_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_PageChunkWrittenIntervalList(t *testing.T) { - list := newChunkWrittenIntervalList() - - assert.Equal(t, 0, list.size(), "empty list") - - list.MarkWritten(0, 5) - assert.Equal(t, 1, list.size(), "one interval") - - list.MarkWritten(0, 5) - assert.Equal(t, 1, list.size(), "duplicated interval2") - - list.MarkWritten(95, 100) - assert.Equal(t, 2, list.size(), "two intervals") - - list.MarkWritten(50, 60) - assert.Equal(t, 3, list.size(), "three intervals") - - list.MarkWritten(50, 55) - assert.Equal(t, 3, list.size(), "three intervals merge") - - list.MarkWritten(40, 50) - assert.Equal(t, 3, list.size(), "three intervals grow forward") - - list.MarkWritten(50, 65) - assert.Equal(t, 3, list.size(), "three intervals grow backward") - - list.MarkWritten(70, 80) - assert.Equal(t, 4, list.size(), "four intervals") - - list.MarkWritten(60, 70) - assert.Equal(t, 3, list.size(), "three intervals merged") - - list.MarkWritten(59, 71) - assert.Equal(t, 3, list.size(), "covered three intervals") - - list.MarkWritten(5, 59) - assert.Equal(t, 2, list.size(), "covered two intervals") - - list.MarkWritten(70, 99) - assert.Equal(t, 1, list.size(), "covered one intervals") - -} diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go deleted file mode 100644 index 25b747fad..000000000 --- a/weed/filesys/page_writer/dirty_pages.go +++ /dev/null @@ -1,30 +0,0 @@ -package page_writer - -type DirtyPages interface { - AddPage(offset int64, data []byte) - FlushData() error - ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) - GetStorageOptions() (collection, replication string) - Destroy() - LockForRead(startOffset, stopOffset int64) - UnlockForRead(startOffset, stopOffset int64) -} - -func max(x, y int64) int64 { - if x > y { - return x - } - return y -} -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} -func minInt(x, y int) int { - if x < y { - return x - } - return y -} diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go deleted file mode 100644 index 4e8f31425..000000000 --- a/weed/filesys/page_writer/page_chunk.go +++ /dev/null @@ -1,16 +0,0 @@ -package page_writer - -import ( - "io" -) - -type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) - -type PageChunk interface { - FreeResource() - WriteDataAt(src []byte, offset int64) (n int) - ReadDataAt(p []byte, off int64) (maxStop int64) - IsComplete() bool - WrittenSize() int64 - SaveContent(saveFn SaveToStorageFunc) -} diff --git a/weed/filesys/page_writer/page_chunk_mem.go b/weed/filesys/page_writer/page_chunk_mem.go deleted file mode 100644 index dfd54c19e..000000000 --- a/weed/filesys/page_writer/page_chunk_mem.go +++ /dev/null @@ -1,69 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/mem" -) - -var ( - _ = PageChunk(&MemChunk{}) -) - -type MemChunk struct { - buf []byte - usage *ChunkWrittenIntervalList - chunkSize int64 - logicChunkIndex LogicChunkIndex -} - -func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { - return &MemChunk{ - logicChunkIndex: logicChunkIndex, - chunkSize: chunkSize, - buf: mem.Allocate(int(chunkSize)), - usage: newChunkWrittenIntervalList(), - } -} - -func (mc *MemChunk) FreeResource() { - mem.Free(mc.buf) -} - -func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) { - innerOffset := offset % mc.chunkSize - n = copy(mc.buf[innerOffset:], src) - mc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) - return -} - -func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { - memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize - for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { - logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) - if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) - } - } - return -} - -func (mc *MemChunk) IsComplete() bool { - return mc.usage.IsComplete(mc.chunkSize) -} - -func (mc *MemChunk) WrittenSize() int64 { - return mc.usage.WrittenSize() -} - -func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { - if saveFn == nil { - return - } - for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { - reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset]) - saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() { - }) - } -} diff --git a/weed/filesys/page_writer/page_chunk_swapfile.go b/weed/filesys/page_writer/page_chunk_swapfile.go deleted file mode 100644 index 486557629..000000000 --- a/weed/filesys/page_writer/page_chunk_swapfile.go +++ /dev/null @@ -1,121 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/mem" - "os" -) - -var ( - _ = PageChunk(&SwapFileChunk{}) -) - -type ActualChunkIndex int - -type SwapFile struct { - dir string - file *os.File - logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex - chunkSize int64 -} - -type SwapFileChunk struct { - swapfile *SwapFile - usage *ChunkWrittenIntervalList - logicChunkIndex LogicChunkIndex - actualChunkIndex ActualChunkIndex -} - -func NewSwapFile(dir string, chunkSize int64) *SwapFile { - return &SwapFile{ - dir: dir, - file: nil, - logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), - chunkSize: chunkSize, - } -} -func (sf *SwapFile) FreeResource() { - if sf.file != nil { - sf.file.Close() - os.Remove(sf.file.Name()) - } -} - -func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) { - if sf.file == nil { - var err error - sf.file, err = os.CreateTemp(sf.dir, "") - if err != nil { - glog.Errorf("create swap file: %v", err) - return nil - } - } - actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex] - if !found { - actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) - sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex - } - - return &SwapFileChunk{ - swapfile: sf, - usage: newChunkWrittenIntervalList(), - logicChunkIndex: logicChunkIndex, - actualChunkIndex: actualChunkIndex, - } -} - -func (sc *SwapFileChunk) FreeResource() { -} - -func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { - innerOffset := offset % sc.swapfile.chunkSize - var err error - n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) - if err == nil { - sc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) - } else { - glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err) - } - return -} - -func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { - chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize - for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { - logicStart := max(off, chunkStartOffset+t.StartOffset) - logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) - if logicStart < logicStop { - actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize - if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { - glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) - break - } - maxStop = max(maxStop, logicStop) - } - } - return -} - -func (sc *SwapFileChunk) IsComplete() bool { - return sc.usage.IsComplete(sc.swapfile.chunkSize) -} - -func (sc *SwapFileChunk) WrittenSize() int64 { - return sc.usage.WrittenSize() -} - -func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { - if saveFn == nil { - return - } - for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { - data := mem.Allocate(int(t.Size())) - sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) - reader := util.NewBytesReader(data) - saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() { - }) - mem.Free(data) - } - sc.usage = newChunkWrittenIntervalList() -} diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go deleted file mode 100644 index 53641e66d..000000000 --- a/weed/filesys/page_writer/upload_pipeline.go +++ /dev/null @@ -1,182 +0,0 @@ -package page_writer - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - "sync" - "sync/atomic" - "time" -) - -type LogicChunkIndex int - -type UploadPipeline struct { - filepath util.FullPath - ChunkSize int64 - writableChunks map[LogicChunkIndex]PageChunk - writableChunksLock sync.Mutex - sealedChunks map[LogicChunkIndex]*SealedChunk - sealedChunksLock sync.Mutex - uploaders *util.LimitedConcurrentExecutor - uploaderCount int32 - uploaderCountCond *sync.Cond - saveToStorageFn SaveToStorageFunc - activeReadChunks map[LogicChunkIndex]int - activeReadChunksLock sync.Mutex - bufferChunkLimit int -} - -type SealedChunk struct { - chunk PageChunk - referenceCounter int // track uploading or reading processes -} - -func (sc *SealedChunk) FreeReference(messageOnFree string) { - sc.referenceCounter-- - if sc.referenceCounter == 0 { - glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) - sc.chunk.FreeResource() - } -} - -func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline { - return &UploadPipeline{ - ChunkSize: chunkSize, - writableChunks: make(map[LogicChunkIndex]PageChunk), - sealedChunks: make(map[LogicChunkIndex]*SealedChunk), - uploaders: writers, - uploaderCountCond: sync.NewCond(&sync.Mutex{}), - saveToStorageFn: saveToStorageFn, - activeReadChunks: make(map[LogicChunkIndex]int), - bufferChunkLimit: bufferChunkLimit, - } -} - -func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() - - logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) - - memChunk, found := up.writableChunks[logicChunkIndex] - if !found { - if len(up.writableChunks) < up.bufferChunkLimit { - memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) - } else { - fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) - for lci, mc := range up.writableChunks { - chunkFullness := mc.WrittenSize() - if fullness < chunkFullness { - fullestChunkIndex = lci - fullness = chunkFullness - } - } - up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) - delete(up.writableChunks, fullestChunkIndex) - fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness) - memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) - } - up.writableChunks[logicChunkIndex] = memChunk - } - n = memChunk.WriteDataAt(p, off) - up.maybeMoveToSealed(memChunk, logicChunkIndex) - - return -} - -func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { - logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) - - // read from sealed chunks first - up.sealedChunksLock.Lock() - sealedChunk, found := up.sealedChunks[logicChunkIndex] - if found { - sealedChunk.referenceCounter++ - } - up.sealedChunksLock.Unlock() - if found { - maxStop = sealedChunk.chunk.ReadDataAt(p, off) - glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) - sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) - } - - // read from writable chunks last - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() - writableChunk, found := up.writableChunks[logicChunkIndex] - if !found { - return - } - writableMaxStop := writableChunk.ReadDataAt(p, off) - glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) - maxStop = max(maxStop, writableMaxStop) - - return -} - -func (up *UploadPipeline) FlushAll() { - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() - - for logicChunkIndex, memChunk := range up.writableChunks { - up.moveToSealed(memChunk, logicChunkIndex) - } - - up.waitForCurrentWritersToComplete() -} - -func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { - if memChunk.IsComplete() { - up.moveToSealed(memChunk, logicChunkIndex) - } -} - -func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { - atomic.AddInt32(&up.uploaderCount, 1) - glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) - - up.sealedChunksLock.Lock() - - if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { - oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) - } - sealedChunk := &SealedChunk{ - chunk: memChunk, - referenceCounter: 1, // default 1 is for uploading process - } - up.sealedChunks[logicChunkIndex] = sealedChunk - delete(up.writableChunks, logicChunkIndex) - - up.sealedChunksLock.Unlock() - - up.uploaders.Execute(func() { - // first add to the file chunks - sealedChunk.chunk.SaveContent(up.saveToStorageFn) - - // notify waiting process - atomic.AddInt32(&up.uploaderCount, -1) - glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount) - // Lock and Unlock are not required, - // but it may signal multiple times during one wakeup, - // and the waiting goroutine may miss some of them! - up.uploaderCountCond.L.Lock() - up.uploaderCountCond.Broadcast() - up.uploaderCountCond.L.Unlock() - - // wait for readers - for up.IsLocked(logicChunkIndex) { - time.Sleep(59 * time.Millisecond) - } - - // then remove from sealed chunks - up.sealedChunksLock.Lock() - defer up.sealedChunksLock.Unlock() - delete(up.sealedChunks, logicChunkIndex) - sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) - - }) -} - -func (up *UploadPipeline) Shutdown() { -} diff --git a/weed/filesys/page_writer/upload_pipeline_lock.go b/weed/filesys/page_writer/upload_pipeline_lock.go deleted file mode 100644 index 47a40ba37..000000000 --- a/weed/filesys/page_writer/upload_pipeline_lock.go +++ /dev/null @@ -1,63 +0,0 @@ -package page_writer - -import ( - "sync/atomic" -) - -func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) - if stopOffset%up.ChunkSize > 0 { - stopLogicChunkIndex += 1 - } - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := up.activeReadChunks[i]; found { - up.activeReadChunks[i] = count + 1 - } else { - up.activeReadChunks[i] = 1 - } - } -} - -func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) - if stopOffset%up.ChunkSize > 0 { - stopLogicChunkIndex += 1 - } - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := up.activeReadChunks[i]; found { - if count == 1 { - delete(up.activeReadChunks, i) - } else { - up.activeReadChunks[i] = count - 1 - } - } - } -} - -func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - if count, found := up.activeReadChunks[logicChunkIndex]; found { - return count > 0 - } - return false -} - -func (up *UploadPipeline) waitForCurrentWritersToComplete() { - up.uploaderCountCond.L.Lock() - t := int32(100) - for { - t = atomic.LoadInt32(&up.uploaderCount) - if t <= 0 { - break - } - up.uploaderCountCond.Wait() - } - up.uploaderCountCond.L.Unlock() -} diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go deleted file mode 100644 index 816fb228b..000000000 --- a/weed/filesys/page_writer/upload_pipeline_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/util" - "testing" -) - -func TestUploadPipeline(t *testing.T) { - - uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16) - - writeRange(uploadPipeline, 0, 131072) - writeRange(uploadPipeline, 131072, 262144) - writeRange(uploadPipeline, 262144, 1025536) - - confirmRange(t, uploadPipeline, 0, 1025536) - - writeRange(uploadPipeline, 1025536, 1296896) - - confirmRange(t, uploadPipeline, 1025536, 1296896) - - writeRange(uploadPipeline, 1296896, 2162688) - - confirmRange(t, uploadPipeline, 1296896, 2162688) - - confirmRange(t, uploadPipeline, 1296896, 2162688) -} - -// startOff and stopOff must be divided by 4 -func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) { - p := make([]byte, 4) - for i := startOff / 4; i < stopOff/4; i += 4 { - util.Uint32toBytes(p, uint32(i)) - uploadPipeline.SaveDataAt(p, i) - } -} - -func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) { - p := make([]byte, 4) - for i := startOff; i < stopOff/4; i += 4 { - uploadPipeline.MaybeReadDataAt(p, i) - x := util.BytesToUint32(p) - if x != uint32(i) { - t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4) - } - } -} diff --git a/weed/filesys/page_writer_pattern.go b/weed/filesys/page_writer_pattern.go deleted file mode 100644 index 51c63d472..000000000 --- a/weed/filesys/page_writer_pattern.go +++ /dev/null @@ -1,44 +0,0 @@ -package filesys - -type WriterPattern struct { - isStreaming bool - lastWriteOffset int64 - chunkSize int64 -} - -// For streaming write: only cache the first chunk -// For random write: fall back to temp file approach -// writes can only change from streaming mode to non-streaming mode - -func NewWriterPattern(chunkSize int64) *WriterPattern { - return &WriterPattern{ - isStreaming: true, - lastWriteOffset: -1, - chunkSize: chunkSize, - } -} - -func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) { - if rp.lastWriteOffset > offset { - rp.isStreaming = false - } - if rp.lastWriteOffset == -1 { - if offset != 0 { - rp.isStreaming = false - } - } - rp.lastWriteOffset = offset -} - -func (rp *WriterPattern) IsStreamingMode() bool { - return rp.isStreaming -} - -func (rp *WriterPattern) IsRandomMode() bool { - return !rp.isStreaming -} - -func (rp *WriterPattern) Reset() { - rp.isStreaming = true - rp.lastWriteOffset = -1 -} diff --git a/weed/filesys/permission.go b/weed/filesys/permission.go deleted file mode 100644 index 2edfd49dd..000000000 --- a/weed/filesys/permission.go +++ /dev/null @@ -1,63 +0,0 @@ -package filesys - -import ( - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/fuse" -) - -func checkPermission(entry *filer_pb.Entry, uid, gid uint32, isWrite bool) error { - if uid == 0 || gid == 0 { - return nil - } - if entry == nil { - return nil - } - if entry.Attributes == nil { - return nil - } - attr := entry.Attributes - if attr.Uid == uid { - if isWrite { - if attr.FileMode&0200 > 0 { - return nil - } else { - return fuse.EPERM - } - } else { - if attr.FileMode&0400 > 0 { - return nil - } else { - return fuse.EPERM - } - } - } else if attr.Gid == gid { - if isWrite { - if attr.FileMode&0020 > 0 { - return nil - } else { - return fuse.EPERM - } - } else { - if attr.FileMode&0040 > 0 { - return nil - } else { - return fuse.EPERM - } - } - } else { - if isWrite { - if attr.FileMode&0002 > 0 { - return nil - } else { - return fuse.EPERM - } - } else { - if attr.FileMode&0004 > 0 { - return nil - } else { - return fuse.EPERM - } - } - } - -} diff --git a/weed/filesys/unimplemented.go b/weed/filesys/unimplemented.go deleted file mode 100644 index 5c2dcf0e1..000000000 --- a/weed/filesys/unimplemented.go +++ /dev/null @@ -1,22 +0,0 @@ -package filesys - -import ( - "context" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" -) - -// https://github.com/bazil/fuse/issues/130 - -var _ = fs.NodeAccesser(&Dir{}) - -func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error { - return fuse.ENOSYS -} - -var _ = fs.NodeAccesser(&File{}) - -func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error { - return fuse.ENOSYS -} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go deleted file mode 100644 index 6c91246c1..000000000 --- a/weed/filesys/wfs.go +++ /dev/null @@ -1,319 +0,0 @@ -package filesys - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" - "math" - "math/rand" - "os" - "path" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/wdclient" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/util/grace" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" -) - -type Option struct { - MountDirectory string - FilerAddresses []pb.ServerAddress - filerIndex int - GrpcDialOption grpc.DialOption - FilerMountRootPath string - Collection string - Replication string - TtlSec int32 - DiskType types.DiskType - ChunkSizeLimit int64 - ConcurrentWriters int - CacheDir string - CacheSizeMB int64 - DataCenter string - Umask os.FileMode - - MountUid uint32 - MountGid uint32 - MountMode os.FileMode - MountCtime time.Time - MountMtime time.Time - MountParentInode uint64 - - VolumeServerAccess string // how to access volume servers - Cipher bool // whether encrypt data on volume server - UidGidMapper *meta_cache.UidGidMapper - - uniqueCacheDir string -} - -var _ = fs.FS(&WFS{}) -var _ = fs.FSStatfser(&WFS{}) - -type WFS struct { - option *Option - - // contains all open handles, protected by handlesLock - handlesLock sync.Mutex - handles map[uint64]*FileHandle - - bufPool sync.Pool - - stats statsCache - - root fs.Node - fsNodeCache *FsCache - - chunkCache *chunk_cache.TieredChunkCache - metaCache *meta_cache.MetaCache - signature int32 - - // throttle writers - concurrentWriters *util.LimitedConcurrentExecutor - Server *fs.Server -} -type statsCache struct { - filer_pb.StatisticsResponse - lastChecked int64 // unix time in seconds -} - -func NewSeaweedFileSystem(option *Option) *WFS { - wfs := &WFS{ - option: option, - handles: make(map[uint64]*FileHandle), - bufPool: sync.Pool{ - New: func() interface{} { - return make([]byte, option.ChunkSizeLimit) - }, - }, - signature: util.RandomInt32(), - } - wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses)) - wfs.option.setupUniqueCacheDirectory() - if option.CacheSizeMB > 0 { - wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024) - } - - wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) { - - fsNode := NodeWithId(filePath.AsInode(entry.FileMode())) - if err := wfs.Server.InvalidateNodeData(fsNode); err != nil { - glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err) - } - - dir, name := filePath.DirAndName() - parent := NodeWithId(util.FullPath(dir).AsInode(os.ModeDir)) - if dir == option.FilerMountRootPath { - parent = NodeWithId(1) - } - if err := wfs.Server.InvalidateEntry(parent, name); err != nil { - glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err) - } - }) - grace.OnInterrupt(func() { - wfs.metaCache.Shutdown() - os.RemoveAll(option.getUniqueCacheDir()) - }) - - wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1} - wfs.fsNodeCache = newFsCache(wfs.root) - - if wfs.option.ConcurrentWriters > 0 { - wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) - } - - return wfs -} - -func (wfs *WFS) StartBackgroundTasks() { - startTime := time.Now() - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) -} - -func (wfs *WFS) Root() (fs.Node, error) { - return wfs.root, nil -} - -func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { - - fullpath := file.fullpath() - glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid) - - inodeId := file.Id() - - wfs.handlesLock.Lock() - existingHandle, found := wfs.handles[inodeId] - if found && existingHandle != nil && existingHandle.f.isOpen > 0 { - existingHandle.f.isOpen++ - wfs.handlesLock.Unlock() - glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen) - return existingHandle - } - wfs.handlesLock.Unlock() - - entry, _ := file.maybeLoadEntry(context.Background()) - file.entry = entry - fileHandle = newFileHandle(file, uid, gid) - - wfs.handlesLock.Lock() - file.isOpen++ - wfs.handles[inodeId] = fileHandle - wfs.handlesLock.Unlock() - fileHandle.handle = inodeId - - glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen) - return -} - -func (wfs *WFS) Fsync(file *File, header fuse.Header) error { - - inodeId := file.Id() - - wfs.handlesLock.Lock() - existingHandle, found := wfs.handles[inodeId] - wfs.handlesLock.Unlock() - - if found && existingHandle != nil { - - existingHandle.Lock() - defer existingHandle.Unlock() - - if existingHandle.f.isOpen > 0 { - return existingHandle.doFlush(context.Background(), header) - } - - } - - return nil -} - -func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { - wfs.handlesLock.Lock() - defer wfs.handlesLock.Unlock() - - glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles)) - - delete(wfs.handles, uint64(handleId)) - - return -} - -// Statfs is called to obtain file system metadata. Implements fuse.FSStatfser -func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error { - - glog.V(4).Infof("reading fs stats: %+v", req) - - if wfs.stats.lastChecked < time.Now().Unix()-20 { - - err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.StatisticsRequest{ - Collection: wfs.option.Collection, - Replication: wfs.option.Replication, - Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec), - DiskType: string(wfs.option.DiskType), - } - - glog.V(4).Infof("reading filer stats: %+v", request) - resp, err := client.Statistics(context.Background(), request) - if err != nil { - glog.V(0).Infof("reading filer stats %v: %v", request, err) - return err - } - glog.V(4).Infof("read filer stats: %+v", resp) - - wfs.stats.TotalSize = resp.TotalSize - wfs.stats.UsedSize = resp.UsedSize - wfs.stats.FileCount = resp.FileCount - wfs.stats.lastChecked = time.Now().Unix() - - return nil - }) - if err != nil { - glog.V(0).Infof("filer Statistics: %v", err) - return err - } - } - - totalDiskSize := wfs.stats.TotalSize - usedDiskSize := wfs.stats.UsedSize - actualFileCount := wfs.stats.FileCount - - // Compute the total number of available blocks - resp.Blocks = totalDiskSize / blockSize - - // Compute the number of used blocks - numBlocks := uint64(usedDiskSize / blockSize) - - // Report the number of free and available blocks for the block size - resp.Bfree = resp.Blocks - numBlocks - resp.Bavail = resp.Blocks - numBlocks - resp.Bsize = uint32(blockSize) - - // Report the total number of possible files in the file system (and those free) - resp.Files = math.MaxInt64 - resp.Ffree = math.MaxInt64 - actualFileCount - - // Report the maximum length of a name and the minimum fragment size - resp.Namelen = 1024 - resp.Frsize = uint32(blockSize) - - return nil -} - -func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) { - if entry.Attributes == nil { - return - } - entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid) -} -func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { - if entry.Attributes == nil { - return - } - entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid) -} - -func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { - if wfs.option.VolumeServerAccess == "filerProxy" { - return func(fileId string) (targetUrls []string, err error) { - return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil - } - } - return filer.LookupFn(wfs) -} -func (wfs *WFS) getCurrentFiler() pb.ServerAddress { - return wfs.option.FilerAddresses[wfs.option.filerIndex] -} - -func (option *Option) setupUniqueCacheDirectory() { - cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8] - option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId) - os.MkdirAll(option.uniqueCacheDir, os.FileMode(0777)&^option.Umask) -} - -func (option *Option) getUniqueCacheDir() string { - return option.uniqueCacheDir -} - -type NodeWithId uint64 - -func (n NodeWithId) Id() uint64 { - return uint64(n) -} -func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error { - return nil -} diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go deleted file mode 100644 index 4feef867e..000000000 --- a/weed/filesys/wfs_filer_client.go +++ /dev/null @@ -1,51 +0,0 @@ -package filesys - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -var _ = filer_pb.FilerClient(&WFS{}) - -func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { - - return util.Retry("filer grpc", func() error { - - i := wfs.option.filerIndex - n := len(wfs.option.FilerAddresses) - for x := 0; x < n; x++ { - - filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress() - err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, filerGrpcAddress, wfs.option.GrpcDialOption) - - if err != nil { - glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) - } else { - wfs.option.filerIndex = i - return nil - } - - i++ - if i >= n { - i = 0 - } - - } - return err - }) - -} - -func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { - if wfs.option.VolumeServerAccess == "publicUrl" { - return location.PublicUrl - } - return location.Url -} diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go deleted file mode 100644 index 17489547c..000000000 --- a/weed/filesys/wfs_write.go +++ /dev/null @@ -1,84 +0,0 @@ -package filesys - -import ( - "context" - "fmt" - "io" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { - - return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { - var fileId, host string - var auth security.EncodedJwt - - if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return util.Retry("assignVolume", func() error { - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: wfs.option.Replication, - Collection: wfs.option.Collection, - TtlSec: wfs.option.TtlSec, - DiskType: string(wfs.option.DiskType), - DataCenter: wfs.option.DataCenter, - Path: string(fullPath), - } - - resp, err := client.AssignVolume(context.Background(), request) - if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth) - loc := resp.Location - host = wfs.AdjustedUrl(loc) - collection, replication = resp.Collection, resp.Replication - - return nil - }) - }); err != nil { - return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) - } - - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - if wfs.option.VolumeServerAccess == "filerProxy" { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) - } - uploadOption := &operation.UploadOption{ - UploadUrl: fileUrl, - Filename: filename, - Cipher: wfs.option.Cipher, - IsInputCompressed: false, - MimeType: "", - PairMap: nil, - Jwt: auth, - } - uploadResult, err, data := operation.Upload(reader, uploadOption) - if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) - return nil, "", "", fmt.Errorf("upload data: %v", err) - } - if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) - return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) - } - - if offset == 0 { - wfs.chunkCache.SetChunk(fileId, data) - } - - chunk = uploadResult.ToPbFileChunk(fileId, offset) - return chunk, collection, replication, nil - } -} diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go deleted file mode 100644 index 818652f64..000000000 --- a/weed/filesys/xattr.go +++ /dev/null @@ -1,153 +0,0 @@ -package filesys - -import ( - "context" - "strings" - "syscall" - - "github.com/seaweedfs/fuse" - - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -const ( - XATTR_PREFIX = "xattr-" // same as filer -) - -func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - - if entry == nil { - return fuse.ErrNoXattr - } - if entry.Extended == nil { - return fuse.ErrNoXattr - } - data, found := entry.Extended[XATTR_PREFIX+req.Name] - if !found { - return fuse.ErrNoXattr - } - if req.Position < uint32(len(data)) { - size := req.Size - if req.Position+size >= uint32(len(data)) { - size = uint32(len(data)) - req.Position - } - if size == 0 { - resp.Xattr = data[req.Position:] - } else { - resp.Xattr = data[req.Position : req.Position+size] - } - } - - return nil - -} - -func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error { - - if entry == nil { - return fuse.EIO - } - - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - data, _ := entry.Extended[XATTR_PREFIX+req.Name] - - newData := make([]byte, int(req.Position)+len(req.Xattr)) - - copy(newData, data) - - copy(newData[int(req.Position):], req.Xattr) - - entry.Extended[XATTR_PREFIX+req.Name] = newData - - return nil - -} - -func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error { - - if entry == nil { - return fuse.ErrNoXattr - } - - if entry.Extended == nil { - return fuse.ErrNoXattr - } - - _, found := entry.Extended[XATTR_PREFIX+req.Name] - - if !found { - return fuse.ErrNoXattr - } - - delete(entry.Extended, XATTR_PREFIX+req.Name) - - return nil - -} - -func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - - if entry == nil { - return fuse.EIO - } - - for k := range entry.Extended { - if strings.HasPrefix(k, XATTR_PREFIX) { - resp.Append(k[len(XATTR_PREFIX):]) - } - } - - size := req.Size - if req.Position+size >= uint32(len(resp.Xattr)) { - size = uint32(len(resp.Xattr)) - req.Position - } - - if size == 0 { - resp.Xattr = resp.Xattr[req.Position:] - } else { - resp.Xattr = resp.Xattr[req.Position : req.Position+size] - } - - return nil - -} - -func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) { - - fullpath := util.NewFullPath(dir, name) - // glog.V(3).Infof("read entry cache miss %s", fullpath) - - // return a valid entry for the mount root - if string(fullpath) == wfs.option.FilerMountRootPath { - return &filer_pb.Entry{ - Name: name, - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: wfs.option.MountMtime.Unix(), - FileMode: uint32(wfs.option.MountMode), - Uid: wfs.option.MountUid, - Gid: wfs.option.MountGid, - Crtime: wfs.option.MountCtime.Unix(), - }, - }, nil - } - - // read from async meta cache - meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) - cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - return cachedEntry.ToProtoEntry(), cacheErr -} - -func checkName(name string) error { - if len(name) >= 256 { - return syscall.ENAMETOOLONG - } - return nil -} diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index 7a9c0afd6..ca44df227 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -2,8 +2,8 @@ package mount import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mount/page_writer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" "sync" diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go index 8685b3d15..9da892f00 100644 --- a/weed/mount/page_writer.go +++ b/weed/mount/page_writer.go @@ -1,8 +1,8 @@ package mount import ( - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mount/page_writer" ) type PageWriter struct {