mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge branch 'mount_from_outside_cluster'
This commit is contained in:
commit
17f7c1c43f
|
@ -169,6 +169,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
|
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
|
||||||
|
FilerAddress: filer,
|
||||||
FilerGrpcAddress: filerGrpcAddress,
|
FilerGrpcAddress: filerGrpcAddress,
|
||||||
GrpcDialOption: grpcDialOption,
|
GrpcDialOption: grpcDialOption,
|
||||||
FilerMountRootPath: mountRoot,
|
FilerMountRootPath: mountRoot,
|
||||||
|
|
|
@ -71,7 +71,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, loc := range locations.Locations {
|
for _, loc := range locations.Locations {
|
||||||
volumeServerAddress := filerClient.AdjustedUrl(loc)
|
volumeServerAddress := loc.Url
|
||||||
targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
|
targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
|
||||||
targetUrls = append(targetUrls, targetUrl)
|
targetUrls = append(targetUrls, targetUrl)
|
||||||
}
|
}
|
||||||
|
@ -85,11 +85,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
|
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
|
||||||
|
|
||||||
return &ChunkReadAt{
|
return &ChunkReadAt{
|
||||||
chunkViews: chunkViews,
|
chunkViews: chunkViews,
|
||||||
lookupFileId: LookupFn(filerClient),
|
lookupFileId: lookupFn,
|
||||||
chunkCache: chunkCache,
|
chunkCache: chunkCache,
|
||||||
fileSize: fileSize,
|
fileSize: fileSize,
|
||||||
}
|
}
|
||||||
|
|
|
@ -404,11 +404,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
||||||
inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode()
|
inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode()
|
||||||
delete(dir.wfs.handles, inodeId)
|
delete(dir.wfs.handles, inodeId)
|
||||||
|
|
||||||
// delete the chunks last
|
|
||||||
if isDeleteData {
|
|
||||||
dir.wfs.deleteFileChunks(entry.Chunks)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -147,9 +147,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
file.entry.Chunks = chunks
|
file.entry.Chunks = chunks
|
||||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks)
|
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
|
||||||
file.reader = nil
|
file.reader = nil
|
||||||
file.wfs.deleteFileChunks(truncatedChunks)
|
|
||||||
}
|
}
|
||||||
file.entry.Attributes.FileSize = req.Size
|
file.entry.Attributes.FileSize = req.Size
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
|
@ -329,7 +328,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
|
||||||
|
|
||||||
func (file *File) setEntry(entry *filer_pb.Entry) {
|
func (file *File) setEntry(entry *filer_pb.Entry) {
|
||||||
file.entry = entry
|
file.entry = entry
|
||||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks)
|
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
|
||||||
file.reader = nil
|
file.reader = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -119,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
||||||
|
|
||||||
var chunkResolveErr error
|
var chunkResolveErr error
|
||||||
if fh.f.entryViewCache == nil {
|
if fh.f.entryViewCache == nil {
|
||||||
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
|
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
|
||||||
if chunkResolveErr != nil {
|
if chunkResolveErr != nil {
|
||||||
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
|
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
|
||||||
}
|
}
|
||||||
|
@ -129,7 +129,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
||||||
reader := fh.f.reader
|
reader := fh.f.reader
|
||||||
if reader == nil {
|
if reader == nil {
|
||||||
chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
|
chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
|
||||||
reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
|
reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
|
||||||
}
|
}
|
||||||
fh.f.reader = reader
|
fh.f.reader = reader
|
||||||
|
|
||||||
|
@ -271,7 +271,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
|
||||||
|
|
||||||
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
|
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
|
||||||
|
|
||||||
chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
|
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
|
||||||
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
|
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
|
||||||
if manifestErr != nil {
|
if manifestErr != nil {
|
||||||
// not good, but should be ok
|
// not good, but should be ok
|
||||||
|
|
|
@ -3,6 +3,8 @@ package filesys
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
@ -24,6 +26,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Option struct {
|
type Option struct {
|
||||||
|
FilerAddress string
|
||||||
FilerGrpcAddress string
|
FilerGrpcAddress string
|
||||||
GrpcDialOption grpc.DialOption
|
GrpcDialOption grpc.DialOption
|
||||||
FilerMountRootPath string
|
FilerMountRootPath string
|
||||||
|
@ -237,3 +240,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
|
||||||
}
|
}
|
||||||
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
|
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
|
||||||
|
if wfs.option.OutsideContainerClusterMode {
|
||||||
|
return func(fileId string) (targetUrls []string, err error) {
|
||||||
|
return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filer.LookupFn(wfs)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -1,84 +0,0 @@
|
||||||
package filesys
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
|
|
||||||
if len(chunks) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var fileIds []string
|
|
||||||
for _, chunk := range chunks {
|
|
||||||
if !chunk.IsChunkManifest {
|
|
||||||
fileIds = append(fileIds, chunk.GetFileIdString())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk)
|
|
||||||
if manifestResolveErr != nil {
|
|
||||||
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
|
|
||||||
}
|
|
||||||
for _, dChunk := range dataChunks {
|
|
||||||
fileIds = append(fileIds, dChunk.GetFileIdString())
|
|
||||||
}
|
|
||||||
fileIds = append(fileIds, chunk.GetFileIdString())
|
|
||||||
}
|
|
||||||
|
|
||||||
wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
||||||
wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
|
|
||||||
|
|
||||||
var vids []string
|
|
||||||
for _, fileId := range fileIds {
|
|
||||||
vids = append(vids, filer.VolumeId(fileId))
|
|
||||||
}
|
|
||||||
|
|
||||||
lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
|
|
||||||
|
|
||||||
m := make(map[string]operation.LookupResult)
|
|
||||||
|
|
||||||
glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids)
|
|
||||||
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
|
||||||
VolumeIds: vids,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return m, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, vid := range vids {
|
|
||||||
lr := operation.LookupResult{
|
|
||||||
VolumeId: vid,
|
|
||||||
Locations: nil,
|
|
||||||
}
|
|
||||||
locations, found := resp.LocationsMap[vid]
|
|
||||||
if !found {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, loc := range locations.Locations {
|
|
||||||
lr.Locations = append(lr.Locations, operation.Location{
|
|
||||||
Url: wfs.AdjustedUrl(loc),
|
|
||||||
PublicUrl: loc.PublicUrl,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
m[vid] = lr
|
|
||||||
}
|
|
||||||
|
|
||||||
return m, err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
|
@ -25,10 +25,3 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
|
||||||
return err
|
return err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
|
|
||||||
if wfs.option.OutsideContainerClusterMode {
|
|
||||||
return location.PublicUrl
|
|
||||||
}
|
|
||||||
return location.Url
|
|
||||||
}
|
|
||||||
|
|
|
@ -44,7 +44,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
|
||||||
Url: resp.Url,
|
Url: resp.Url,
|
||||||
PublicUrl: resp.PublicUrl,
|
PublicUrl: resp.PublicUrl,
|
||||||
}
|
}
|
||||||
host = wfs.AdjustedUrl(loc)
|
host = loc.Url
|
||||||
collection, replication = resp.Collection, resp.Replication
|
collection, replication = resp.Collection, resp.Replication
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -53,6 +53,9 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
|
||||||
}
|
}
|
||||||
|
|
||||||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
||||||
|
if wfs.option.OutsideContainerClusterMode {
|
||||||
|
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
|
||||||
|
}
|
||||||
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
|
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
|
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
|
||||||
|
|
|
@ -107,7 +107,3 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
|
||||||
return
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
|
|
||||||
return location.Url
|
|
||||||
}
|
|
||||||
|
|
|
@ -4,12 +4,11 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"google.golang.org/grpc"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ var (
|
||||||
|
|
||||||
type FilerClient interface {
|
type FilerClient interface {
|
||||||
WithFilerClient(fn func(SeaweedFilerClient) error) error
|
WithFilerClient(fn func(SeaweedFilerClient) error) error
|
||||||
AdjustedUrl(location *Location) string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
|
func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
|
||||||
|
|
|
@ -128,6 +128,3 @@ func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error)
|
||||||
}, fs.grpcAddress, fs.grpcDialOption)
|
}, fs.grpcAddress, fs.grpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string {
|
|
||||||
return location.Url
|
|
||||||
}
|
|
||||||
|
|
|
@ -124,10 +124,6 @@ func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) erro
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string {
|
|
||||||
return location.Url
|
|
||||||
}
|
|
||||||
|
|
||||||
func volumeId(fileId string) string {
|
func volumeId(fileId string) string {
|
||||||
lastCommaIndex := strings.LastIndex(fileId, ",")
|
lastCommaIndex := strings.LastIndex(fileId, ",")
|
||||||
if lastCommaIndex > 0 {
|
if lastCommaIndex > 0 {
|
||||||
|
|
|
@ -50,9 +50,6 @@ func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) err
|
||||||
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
|
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
|
|
||||||
return location.Url
|
|
||||||
}
|
|
||||||
|
|
||||||
// If none of the http routes match respond with MethodNotAllowed
|
// If none of the http routes match respond with MethodNotAllowed
|
||||||
func notFoundHandler(w http.ResponseWriter, r *http.Request) {
|
func notFoundHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
|
@ -123,9 +123,6 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient)
|
||||||
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
|
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
|
|
||||||
return location.Url
|
|
||||||
}
|
|
||||||
|
|
||||||
func clearName(name string) (string, error) {
|
func clearName(name string) (string, error) {
|
||||||
slashed := strings.HasSuffix(name, "/")
|
slashed := strings.HasSuffix(name, "/")
|
||||||
|
@ -523,7 +520,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
|
||||||
}
|
}
|
||||||
if f.reader == nil {
|
if f.reader == nil {
|
||||||
chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
|
chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
|
||||||
f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
|
f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
readSize, err = f.reader.ReadAt(p, f.off)
|
readSize, err = f.reader.ReadAt(p, f.off)
|
||||||
|
|
|
@ -102,10 +102,6 @@ func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
|
|
||||||
return location.Url
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
|
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
|
||||||
if strings.HasPrefix(entryPath, "http") {
|
if strings.HasPrefix(entryPath, "http") {
|
||||||
var u *url.URL
|
var u *url.URL
|
||||||
|
|
Loading…
Reference in a new issue