From b30c14b6314c96e0cb0c110e2aa1fc206857a066 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 3 May 2019 00:24:35 -0700 Subject: [PATCH] webdav: can read now --- weed/command/webdav.go | 18 ++ weed/filer2/filer_client_util.go | 163 ++++++++++++++++ weed/filesys/dir.go | 92 ++------- weed/filesys/dir_link.go | 2 +- weed/filesys/dir_rename.go | 2 +- weed/filesys/dirty_page.go | 2 +- weed/filesys/file.go | 4 +- weed/filesys/filehandle.go | 89 +-------- weed/filesys/wfs.go | 4 +- weed/filesys/wfs_deletion.go | 2 +- weed/server/webdav_server.go | 314 ++++++++++++++++++++++++------- 11 files changed, 459 insertions(+), 233 deletions(-) create mode 100644 weed/filer2/filer_client_util.go diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 71933d885..9ab2ce9de 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -3,6 +3,8 @@ package command import ( "fmt" "net/http" + "os/user" + "strconv" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -19,6 +21,7 @@ var ( type WebDavOption struct { filer *string port *int + collection *string tlsPrivateKey *string tlsCertificate *string } @@ -27,6 +30,7 @@ func init() { cmdWebDav.Run = runWebDav // break init cycle webDavStandaloneOptions.filer = cmdWebDav.Flag.String("filer", "localhost:8888", "filer server address") webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port") + webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files") webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") } @@ -57,10 +61,24 @@ func (wo *WebDavOption) startWebDav() bool { return false } + // detect current user + uid, gid := uint32(0), uint32(0) + if u, err := user.Current(); err == nil { + if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil { + uid = uint32(parsedId) + } + if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil { + gid = uint32(parsedId) + } + } + ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{ Filer: *wo.filer, FilerGrpcAddress: filerGrpcAddress, GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + Collection: *wo.collection, + Uid: uid, + Gid: gid, }) if webdavServer_err != nil { glog.Fatalf("WebDav Server startup error: %v", webdavServer_err) diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go new file mode 100644 index 000000000..71daa9ee1 --- /dev/null +++ b/weed/filer2/filer_client_util.go @@ -0,0 +1,163 @@ +package filer2 + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func VolumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} + +type FilerClient interface { + WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error +} + +func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { + var vids []string + for _, chunkView := range chunkViews { + vids = append(vids, VolumeId(chunkView.FileId)) + } + + vid2Locations := make(map[string]*filer_pb.Locations) + + err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + glog.V(4).Infof("read fh lookup volume id locations: %v", vids) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) + if err != nil { + return err + } + + vid2Locations = resp.LocationsMap + + return nil + }) + + if err != nil { + return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) + } + + var wg sync.WaitGroup + for _, chunkView := range chunkViews { + wg.Add(1) + go func(chunkView *ChunkView) { + defer wg.Done() + + glog.V(4).Infof("read fh reading chunk: %+v", chunkView) + + locations := vid2Locations[VolumeId(chunkView.FileId)] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", chunkView.FileId) + err = fmt.Errorf("failed to locate %s", chunkView.FileId) + return + } + + var n int64 + n, err = util.ReadUrl( + fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), + chunkView.Offset, + int(chunkView.Size), + buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], + !chunkView.IsFullChunk) + + if err != nil { + + glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) + + err = fmt.Errorf("failed to read http://%s/%s: %v", + locations.Locations[0].Url, chunkView.FileId, err) + return + } + + glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) + totalRead += n + + }(chunkView) + } + wg.Wait() + return +} + +func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) { + + dir, name := FullPath(fullFilePath).DirAndName() + + err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + } + + glog.V(1).Infof("read %s request: %v", fullFilePath, request) + resp, err := client.LookupDirectoryEntry(ctx, request) + if err != nil { + if err == ErrNotFound { + return nil + } + glog.V(0).Infof("read %s attr %v: %v", fullFilePath, request, err) + return err + } + + if resp.Entry != nil { + entry = resp.Entry + } + + return nil + }) + + return +} + +func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) { + + err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + paginationLimit := 1024 + + lastEntryName := "" + + for { + + request := &filer_pb.ListEntriesRequest{ + Directory: fullDirPath, + StartFromFileName: lastEntryName, + Limit: uint32(paginationLimit), + } + + glog.V(4).Infof("read directory: %v", request) + resp, err := client.ListEntries(ctx, request) + if err != nil { + return fmt.Errorf("list %s: %v", fullDirPath, err) + } + + for _, entry := range resp.Entries { + fn(entry) + lastEntryName = entry.Name + } + + if len(resp.Entries) < paginationLimit { + break + } + + } + + return nil + + }) + + return +} diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 767212103..7b6cf2000 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -4,7 +4,6 @@ import ( "context" "os" "path" - "path/filepath" "time" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -54,39 +53,12 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { return nil } - parent, name := filepath.Split(dir.Path) - - err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: parent, - Name: name, - } - - glog.V(1).Infof("read dir %s request: %v", dir.Path, request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - if err == filer2.ErrNotFound { - return nil - } - glog.V(0).Infof("read dir %s attr %v: %v", dir.Path, request, err) - return err - } - - if resp.Entry != nil { - dir.attributes = resp.Entry.Attributes - } - glog.V(2).Infof("read dir %s attr: %v", dir.Path, dir.attributes) - - // dir.wfs.listDirectoryEntriesCache.Set(dir.Path, resp.Entry, dir.wfs.option.EntryCacheTtl) - - return nil - }) - + entry, err := filer2.GetEntry(ctx, dir.wfs, dir.Path) if err != nil { glog.V(2).Infof("read dir %s attr: %v, error: %v", dir.Path, dir.attributes, err) return err } + dir.attributes = entry.Attributes glog.V(2).Infof("dir %s: %v perm: %v", dir.Path, dir.attributes, os.FileMode(dir.attributes.FileMode)) @@ -133,7 +105,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, glog.V(1).Infof("create: %v", request) if request.Entry.IsDirectory { - if err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if _, err := client.CreateEntry(ctx, request); err != nil { glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err) return fuse.EIO @@ -156,7 +128,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { - err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: dir.Path, @@ -193,33 +165,18 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { var entry *filer_pb.Entry + fullFilePath := path.Join(dir.Path, req.Name) - item := dir.wfs.listDirectoryEntriesCache.Get(path.Join(dir.Path, req.Name)) + item := dir.wfs.listDirectoryEntriesCache.Get(fullFilePath) if item != nil && !item.Expired() { entry = item.Value().(*filer_pb.Entry) } if entry == nil { - err = dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.Name, - } - - glog.V(4).Infof("lookup directory entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) - return fuse.ENOENT - } - - entry = resp.Entry - - // dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, dir.wfs.option.EntryCacheTtl) - - return nil - }) + entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath) + if err != nil { + return nil, err + } } if entry != nil { @@ -244,7 +201,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - err = dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { paginationLimit := 1024 remaining := dir.wfs.option.DirListingLimit @@ -306,33 +263,14 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error { - var entry *filer_pb.Entry - err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.Name, - } - - glog.V(4).Infof("lookup to-be-removed entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) - return fuse.ENOENT - } - - entry = resp.Entry - - return nil - }) - + entry, err := filer2.GetEntry(ctx, dir.wfs, path.Join(dir.Path, req.Name)) if err != nil { return err } dir.wfs.deleteFileChunks(ctx, entry.Chunks) - return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir.Path, @@ -356,7 +294,7 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { - return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir.Path, @@ -402,7 +340,7 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus } parentDir, name := filer2.FullPath(dir.Path).DirAndName() - return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 4f631bc88..92cf04d58 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -35,7 +35,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, }, } - err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if _, err := client.CreateEntry(ctx, request); err != nil { glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err) return fuse.EIO diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 7a415ff82..e72a15758 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -12,7 +12,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector newDir := newDirectory.(*Dir) - return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: dir.Path, diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 0044cfd87..5a7d51a91 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -167,7 +167,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte var fileId, host string var auth security.EncodedJwt - if err := pages.f.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/filesys/file.go b/weed/filesys/file.go index eb4b03f64..3015354a6 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -109,7 +109,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - return file.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: file.dir.Path, @@ -144,7 +144,7 @@ func (file *File) maybeLoadAttributes(ctx context.Context) error { file.setEntry(entry) // glog.V(1).Infof("file attr read cached %v attributes", file.Name) } else { - err := file.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: file.Name, diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index e87e0608e..feb19f525 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,20 +3,18 @@ package filesys import ( "context" "fmt" + "mime" + "path" + "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/gabriel-vasile/mimetype" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" "google.golang.org/grpc" - "mime" - "path" - "strings" - "sync" - "time" ) type FileHandle struct { @@ -68,72 +66,7 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size) - var vids []string - for _, chunkView := range chunkViews { - vids = append(vids, volumeId(chunkView.FileId)) - } - - vid2Locations := make(map[string]*filer_pb.Locations) - - err := fh.f.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return err - } - - vid2Locations = resp.LocationsMap - - return nil - }) - - if err != nil { - glog.V(4).Infof("%v/%v read fh lookup volume ids: %v", fh.f.dir.Path, fh.f.Name, err) - return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) - } - - var totalRead int64 - var wg sync.WaitGroup - for _, chunkView := range chunkViews { - wg.Add(1) - go func(chunkView *filer2.ChunkView) { - defer wg.Done() - - glog.V(4).Infof("read fh reading chunk: %+v", chunkView) - - locations := vid2Locations[volumeId(chunkView.FileId)] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", chunkView.FileId) - err = fmt.Errorf("failed to locate %s", chunkView.FileId) - return - } - - var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)], - !chunkView.IsFullChunk) - - if err != nil { - - glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.f.dir.Path, fh.f.Name, locations.Locations[0].Url, chunkView.FileId, n, err) - - err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) - return - } - - glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) - totalRead += n - - }(chunkView) - } - wg.Wait() + totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, req.Offset) resp.Data = buff[:totalRead] @@ -205,7 +138,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil } - return fh.f.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType @@ -243,7 +176,7 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f var vids []string for _, fileId := range fileIds { - vids = append(vids, volumeId(fileId)) + vids = append(vids, filer2.VolumeId(fileId)) } lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { @@ -280,11 +213,3 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f return err } - -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index f8be24e5e..b4d1b0608 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -73,7 +73,7 @@ func (wfs *WFS) Root() (fs.Node, error) { return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil } -func (wfs *WFS) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) @@ -133,7 +133,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index 16f8af594..dd7992816 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -15,7 +15,7 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu fileIds = append(fileIds, chunk.FileId) } - wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds) return nil }) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index b06c055e2..5930e99be 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -2,6 +2,7 @@ package weed_server import ( "context" + "fmt" "io" "net/http" "net/url" @@ -10,6 +11,8 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "golang.org/x/net/webdav" "google.golang.org/grpc" @@ -25,6 +28,9 @@ type WebDavOption struct { DomainName string BucketsPath string GrpcDialOption grpc.DialOption + Collection string + Uid uint32 + Gid uint32 } type WebDavServer struct { @@ -37,7 +43,7 @@ type WebDavServer struct { func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { - fs, _ := NewWebDavFileSystem() + fs, _ := NewWebDavFileSystem(option) ws = &WebDavServer{ option: option, @@ -78,31 +84,49 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go type WebDavFileSystem struct { + option *WebDavOption + secret security.SigningKey + filer *filer2.Filer + grpcDialOption grpc.DialOption } type FileInfo struct { - name string - size int64 - mode os.FileMode - mod_time time.Time + name string + size int64 + mode os.FileMode + modifiledTime time.Time + isDirectory bool } func (fi *FileInfo) Name() string { return fi.name } func (fi *FileInfo) Size() int64 { return fi.size } func (fi *FileInfo) Mode() os.FileMode { return fi.mode } -func (fi *FileInfo) ModTime() time.Time { return fi.mod_time } -func (fi *FileInfo) IsDir() bool { return fi.mode.IsDir() } +func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime } +func (fi *FileInfo) IsDir() bool { return fi.isDirectory } func (fi *FileInfo) Sys() interface{} { return nil } type WebDavFile struct { - fs *WebDavFileSystem - name string - isDirectory bool - off int64 + fs *WebDavFileSystem + name string + isDirectory bool + off int64 + entry *filer_pb.Entry + entryViewCache []filer2.VisibleInterval } -func NewWebDavFileSystem() (webdav.FileSystem, error) { - return &WebDavFileSystem{}, nil +func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { + return &WebDavFileSystem{ + option: option, + }, nil +} + +func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { + + return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) + } func clearName(name string) (string, error) { @@ -117,32 +141,55 @@ func clearName(name string) (string, error) { return name, nil } -func (fs *WebDavFileSystem) Mkdir(ctx context.Context, name string, perm os.FileMode) error { +func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error { - glog.V(2).Infof("WebDavFileSystem.Mkdir %v", name) + glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath) - if !strings.HasSuffix(name, "/") { - name += "/" + if !strings.HasSuffix(fullDirPath, "/") { + fullDirPath += "/" } var err error - if name, err = clearName(name); err != nil { + if fullDirPath, err = clearName(fullDirPath); err != nil { return err } - _, err = fs.stat(name) + _, err = fs.stat(ctx, fullDirPath) if err == nil { return os.ErrExist } base := "/" - for _, elem := range strings.Split(strings.Trim(name, "/"), "/") { + for _, elem := range strings.Split(strings.Trim(fullDirPath, "/"), "/") { base += elem + "/" - _, err = fs.stat(base) + _, err = fs.stat(ctx, base) if err != os.ErrNotExist { return err } - // _, err = fs.db.Exec(`insert into filesystem(name, content, mode, mod_time) values(?, '', ?, now())`, base, perm.Perm()|os.ModeDir) + err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + dir, name := filer2.FullPath(base).DirAndName() + request := &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(perm), + Uid: fs.option.Uid, + Gid: fs.option.Gid, + }, + }, + } + + glog.V(1).Infof("mkdir: %v", request) + if _, err := client.CreateEntry(ctx, request); err != nil { + return fmt.Errorf("mkdir %s/%s: %v", dir, name, err) + } + + return nil + }) if err != nil { return err } @@ -150,68 +197,116 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, name string, perm os.File return nil } -func (fs *WebDavFileSystem) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) { +func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) { - glog.V(2).Infof("WebDavFileSystem.OpenFile %v", name) + glog.V(2).Infof("WebDavFileSystem.OpenFile %v", fullFilePath) var err error - if name, err = clearName(name); err != nil { + if fullFilePath, err = clearName(fullFilePath); err != nil { return nil, err } if flag&os.O_CREATE != 0 { // file should not have / suffix. - if strings.HasSuffix(name, "/") { + if strings.HasSuffix(fullFilePath, "/") { return nil, os.ErrInvalid } // based directory should be exists. - dir, _ := path.Split(name) - _, err := fs.stat(dir) + dir, _ := path.Split(fullFilePath) + _, err := fs.stat(ctx, dir) if err != nil { return nil, os.ErrInvalid } - _, err = fs.stat(name) + _, err = fs.stat(ctx, fullFilePath) if err == nil { if flag&os.O_EXCL != 0 { return nil, os.ErrExist } - fs.removeAll(name) + fs.removeAll(ctx, fullFilePath) } - // _, err = fs.db.Exec(`insert into filesystem(name, content, mode, mod_time) values(?, '', ?, now())`, name, perm.Perm()) + + dir, name := filer2.FullPath(fullFilePath).DirAndName() + err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: perm&os.ModeDir > 0, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(perm), + Uid: fs.option.Uid, + Gid: fs.option.Gid, + Collection: fs.option.Collection, + Replication: "000", + TtlSec: 0, + }, + }, + }); err != nil { + return fmt.Errorf("create %s: %v", fullFilePath, err) + } + return nil + }) if err != nil { return nil, err } - return &WebDavFile{fs, name, false, 0}, nil + return &WebDavFile{ + fs: fs, + name: fullFilePath, + isDirectory: false, + }, nil } - fi, err := fs.stat(name) + fi, err := fs.stat(ctx, fullFilePath) if err != nil { return nil, os.ErrNotExist } - if !strings.HasSuffix(name, "/") && fi.IsDir() { - name += "/" + if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() { + fullFilePath += "/" } - return &WebDavFile{fs, name, true, 0}, nil + return &WebDavFile{ + fs: fs, + name: fullFilePath, + isDirectory: false, + }, nil } -func (fs *WebDavFileSystem) removeAll(name string) error { +func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error { var err error - if name, err = clearName(name); err != nil { + if fullFilePath, err = clearName(fullFilePath); err != nil { return err } - fi, err := fs.stat(name) + fi, err := fs.stat(ctx, fullFilePath) if err != nil { return err } if fi.IsDir() { - //_, err = fs.db.Exec(`delete from filesystem where name like $1 escape '\'`, strings.Replace(name, `%`, `\%`, -1)+`%`) + //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`) } else { - //_, err = fs.db.Exec(`delete from filesystem where name = ?`, name) + //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath) } + dir, name := filer2.FullPath(fullFilePath).DirAndName() + err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.DeleteEntryRequest{ + Directory: dir, + Name: name, + IsDeleteData: true, + } + + glog.V(3).Infof("removing entry: %v", request) + _, err := client.DeleteEntry(ctx, request) + if err != nil { + return fmt.Errorf("remove %s: %v", fullFilePath, err) + } + + return nil + }) return err } @@ -219,7 +314,7 @@ func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error { glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name) - return fs.removeAll(name) + return fs.removeAll(ctx, name) } func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error { @@ -234,7 +329,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) return err } - of, err := fs.stat(oldName) + of, err := fs.stat(ctx, oldName) if err != nil { return os.ErrExist } @@ -243,34 +338,55 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) newName += "/" } - _, err = fs.stat(newName) + _, err = fs.stat(ctx, newName) if err == nil { return os.ErrExist } - //_, err = fs.db.Exec(`update filesystem set name = ? where name = ?`, newName, oldName) - return err + oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName() + newDir, newBaseName := filer2.FullPath(newName).DirAndName() + + return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: oldDir, + OldName: oldBaseName, + NewDirectory: newDir, + NewName: newBaseName, + } + + _, err := client.AtomicRenameEntry(ctx, request) + if err != nil { + return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err) + } + + return nil + + }) } -func (fs *WebDavFileSystem) stat(name string) (os.FileInfo, error) { +func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) { var err error - if name, err = clearName(name); err != nil { + if fullFilePath, err = clearName(fullFilePath); err != nil { return nil, err } - //rows, err := fs.db.Query(`select name, format(length(content)/2, 0), mode, mod_time from filesystem where name = ?`, name) - if err != nil { - return nil, err - } var fi FileInfo - // err = rows.Scan(&fi.name, &fi.size, &fi.mode, &fi.mod_time) + entry, err := filer2.GetEntry(ctx, fs, fullFilePath) if err != nil { return nil, err } + fi.size = int64(filer2.TotalSize(entry.GetChunks())) + fi.name = fullFilePath + fi.mode = os.FileMode(entry.Attributes.FileMode) + fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0) + fi.isDirectory = entry.IsDirectory + _, fi.name = path.Split(path.Clean(fi.name)) if fi.name == "" { fi.name = "/" - fi.mod_time = time.Now() + fi.modifiledTime = time.Now() + fi.isDirectory = true } return &fi, nil } @@ -279,7 +395,7 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, glog.V(2).Infof("WebDavFileSystem.Stat %v", name) - return fs.stat(name) + return fs.stat(ctx, name) } func (f *WebDavFile) Write(p []byte) (int, error) { @@ -299,41 +415,105 @@ func (f *WebDavFile) Close() error { glog.V(2).Infof("WebDavFileSystem.Close %v", f.name) + if f.entry != nil { + f.entry = nil + f.entryViewCache = nil + } + return nil } -func (f *WebDavFile) Read(p []byte) (int, error) { +func (f *WebDavFile) Read(p []byte) (readSize int, err error) { glog.V(2).Infof("WebDavFileSystem.Read %v", f.name) + ctx := context.Background() - var err error - //rows, err := f.fs.db.Query(`select mode, substr(content, ?, ?) from filesystem where name = ?`, 1+f.off*2, len(p)*2, f.name) + if f.entry == nil { + f.entry, err = filer2.GetEntry(ctx, f.fs, f.name) + } if err != nil { return 0, err } - //defer rows.Close() + if len(f.entry.Chunks) == 0 { + return 0, io.EOF + } + if f.entryViewCache == nil { + f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + } + chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p)) - return 0, io.EOF + totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, f.name, p, chunkViews, f.off) + if err != nil { + return 0, err + } + readSize = int(totalRead) + + f.off += totalRead + if readSize == 0 { + return 0, io.EOF + } + return } -func (f *WebDavFile) Readdir(count int) ([]os.FileInfo, error) { +func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { - glog.V(2).Infof("WebDavFileSystem.Readdir %v", f.name) + glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count) + ctx := context.Background() - // return f.children[old:f.off], nil - return nil, nil + dir := f.name + if dir != "/" && strings.HasSuffix(dir, "/") { + dir = dir[:len(dir)-1] + } + + err = filer2.ReadDirAllEntries(ctx, f.fs, dir, func(entry *filer_pb.Entry) { + fi := FileInfo{ + size: int64(filer2.TotalSize(entry.GetChunks())), + name: entry.Name, + mode: os.FileMode(entry.Attributes.FileMode), + modifiledTime: time.Unix(entry.Attributes.Mtime, 0), + isDirectory: entry.IsDirectory, + } + + if !strings.HasSuffix(fi.name, "/") && fi.IsDir() { + fi.name += "/" + } + glog.V(4).Infof("entry: %v", fi.name) + ret = append(ret, &fi) + }) + + + old := f.off + if old >= int64(len(ret)) { + if count > 0 { + return nil, io.EOF + } + return nil, nil + } + if count > 0 { + f.off += int64(count) + if f.off > int64(len(ret)) { + f.off = int64(len(ret)) + } + } else { + f.off = int64(len(ret)) + old = 0 + } + + return ret[old:f.off], nil } func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) { glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence) + ctx := context.Background() + var err error switch whence { case 0: f.off = 0 case 2: - if fi, err := f.fs.stat(f.name); err != nil { + if fi, err := f.fs.stat(ctx, f.name); err != nil { return 0, err } else { f.off = fi.Size() @@ -347,5 +527,7 @@ func (f *WebDavFile) Stat() (os.FileInfo, error) { glog.V(2).Infof("WebDavFile.Stat %v", f.name) - return f.fs.stat(f.name) + ctx := context.Background() + + return f.fs.stat(ctx, f.name) }