diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 9e955e344..a95ecd567 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -169,6 +169,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { } seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ + FilerAddress: filer, FilerGrpcAddress: filerGrpcAddress, GrpcDialOption: grpcDialOption, FilerMountRootPath: mountRoot, diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 307224f35..41d177210 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -71,7 +71,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } for _, loc := range locations.Locations { - volumeServerAddress := filerClient.AdjustedUrl(loc) + volumeServerAddress := loc.Url targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) targetUrls = append(targetUrls, targetUrl) } @@ -85,11 +85,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, - lookupFileId: LookupFn(filerClient), + lookupFileId: lookupFn, chunkCache: chunkCache, fileSize: fileSize, } diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index d86d92ac9..3d0a00a8b 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -404,11 +404,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode() delete(dir.wfs.handles, inodeId) - // delete the chunks last - if isDeleteData { - dir.wfs.deleteFileChunks(entry.Chunks) - } - return nil } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index a2b6660d8..a8d6dac29 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -147,9 +147,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } file.entry.Chunks = chunks - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks) + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks) file.reader = nil - file.wfs.deleteFileChunks(truncatedChunks) } file.entry.Attributes.FileSize = req.Size file.dirtyMetadata = true @@ -329,7 +328,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks) + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks) file.reader = nil } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 6225ab968..da42ae562 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -119,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { var chunkResolveErr error if fh.f.entryViewCache == nil { - fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) } @@ -128,7 +128,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { if fh.f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64) - fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize) + fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) } totalRead, err := fh.f.reader.ReadAt(buff, offset) @@ -269,7 +269,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) - chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks) + chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) if manifestErr != nil { // not good, but should be ok diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index cd14e8032..236ecdacb 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -3,6 +3,8 @@ package filesys import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/wdclient" "math" "os" "path" @@ -24,6 +26,7 @@ import ( ) type Option struct { + FilerAddress string FilerGrpcAddress string GrpcDialOption grpc.DialOption FilerMountRootPath string @@ -237,3 +240,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { } entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid) } + +func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { + if wfs.option.OutsideContainerClusterMode { + return func(fileId string) (targetUrls []string, err error) { + return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil + } + } + return filer.LookupFn(wfs) + +} diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go deleted file mode 100644 index a245b6795..000000000 --- a/weed/filesys/wfs_deletion.go +++ /dev/null @@ -1,84 +0,0 @@ -package filesys - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { - if len(chunks) == 0 { - return - } - - var fileIds []string - for _, chunk := range chunks { - if !chunk.IsChunkManifest { - fileIds = append(fileIds, chunk.GetFileIdString()) - continue - } - dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk) - if manifestResolveErr != nil { - glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) - } - for _, dChunk := range dataChunks { - fileIds = append(fileIds, dChunk.GetFileIdString()) - } - fileIds = append(fileIds, chunk.GetFileIdString()) - } - - wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) - return nil - }) -} - -func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { - - var vids []string - for _, fileId := range fileIds { - vids = append(vids, filer.VolumeId(fileId)) - } - - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { - - m := make(map[string]operation.LookupResult) - - glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return m, err - } - - for _, vid := range vids { - lr := operation.LookupResult{ - VolumeId: vid, - Locations: nil, - } - locations, found := resp.LocationsMap[vid] - if !found { - continue - } - for _, loc := range locations.Locations { - lr.Locations = append(lr.Locations, operation.Location{ - Url: wfs.AdjustedUrl(loc), - PublicUrl: loc.PublicUrl, - }) - } - m[vid] = lr - } - - return m, err - } - - _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) - - return err -} diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index ef4213af1..e0d352a7b 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -25,10 +25,3 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro return err } - -func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { - if wfs.option.OutsideContainerClusterMode { - return location.PublicUrl - } - return location.Url -} diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 83e40e7f5..dfe6e57a6 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -44,7 +44,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun Url: resp.Url, PublicUrl: resp.PublicUrl, } - host = wfs.AdjustedUrl(loc) + host = loc.Url collection, replication = resp.Collection, resp.Replication return nil @@ -53,6 +53,9 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if wfs.option.OutsideContainerClusterMode { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId) + } uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 8e5b56fd0..67c9bcb79 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -107,7 +107,3 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient return } - -func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 9868a411d..65baaddf2 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -4,12 +4,11 @@ import ( "context" "errors" "fmt" + "google.golang.org/grpc" "net/http" "strings" "sync" - "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 079fbd671..7198de95c 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -20,7 +20,6 @@ var ( type FilerClient interface { WithFilerClient(fn func(SeaweedFilerClient) error) error - AdjustedUrl(location *Location) string } func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index b062adcfe..544b84995 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -128,6 +128,3 @@ func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) }, fs.grpcAddress, fs.grpcDialOption) } -func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 3982360b0..eff1da8dc 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -124,10 +124,6 @@ func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) erro } -func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} - func volumeId(fileId string) string { lastCommaIndex := strings.LastIndex(fileId, ",") if lastCommaIndex > 0 { diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 6935c75bd..57b26f3dd 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -50,9 +50,6 @@ func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) err }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) } -func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} // If none of the http routes match respond with MethodNotAllowed func notFoundHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 2b238e534..5bd92a136 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -123,9 +123,6 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } -func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} func clearName(name string) (string, error) { slashed := strings.HasSuffix(name, "/") @@ -523,7 +520,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) - f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize) + f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize) } readSize, err = f.reader.ReadAt(p, f.off) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 0e285214b..6e1348ca5 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -102,10 +102,6 @@ func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error } -func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} - func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { if strings.HasPrefix(entryPath, "http") { var u *url.URL