Chris Lu 00707ec00f mount: outsideContainerClusterMode proxy through filer
Running mount outside of the cluster would not need to expose all the volume servers to outside of the cluster. The chunk read and write will go through the filer.
2021-01-24 19:01:58 -08:00

611 lines
15 KiB

package weed_server
import (
type WebDavOption struct {
Filer string
FilerGrpcAddress string
DomainName string
BucketsPath string
GrpcDialOption grpc.DialOption
Collection string
Uid uint32
Gid uint32
Cipher bool
CacheDir string
CacheSizeMB int64
type WebDavServer struct {
option *WebDavOption
secret security.SigningKey
filer *filer.Filer
grpcDialOption grpc.DialOption
Handler *webdav.Handler
func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
fs, _ := NewWebDavFileSystem(option)
ws = &WebDavServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
Handler: &webdav.Handler{
FileSystem: fs,
LockSystem: webdav.NewMemLS(),
return ws, nil
// adapted from
type WebDavFileSystem struct {
option *WebDavOption
secret security.SigningKey
filer *filer.Filer
grpcDialOption grpc.DialOption
chunkCache *chunk_cache.TieredChunkCache
signature int32
type FileInfo struct {
name string
size int64
mode os.FileMode
modifiledTime time.Time
isDirectory bool
func (fi *FileInfo) Name() string { return }
func (fi *FileInfo) Size() int64 { return fi.size }
func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
func (fi *FileInfo) Sys() interface{} { return nil }
type WebDavFile struct {
fs *WebDavFileSystem
name string
isDirectory bool
off int64
entry *filer_pb.Entry
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
bufWriter *buffered_writer.BufferedWriteCloser
collection string
replication string
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024)
return &WebDavFileSystem{
option: option,
chunkCache: chunkCache,
signature: util.RandomInt32(),
}, nil
var _ = filer_pb.FilerClient(&WebDavFileSystem{})
func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/")
name = path.Clean(name)
if !strings.HasSuffix(name, "/") && slashed {
name += "/"
if !strings.HasPrefix(name, "/") {
return "", os.ErrInvalid
return name, nil
func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
if !strings.HasSuffix(fullDirPath, "/") {
fullDirPath += "/"
var err error
if fullDirPath, err = clearName(fullDirPath); err != nil {
return err
_, err = fs.stat(ctx, fullDirPath)
if err == nil {
return os.ErrExist
return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(fullDirPath).DirAndName()
request := &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(perm | os.ModeDir),
Uid: fs.option.Uid,
Gid: fs.option.Gid,
Signatures: []int32{fs.signature},
glog.V(1).Infof("mkdir: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
return nil
func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
var err error
if fullFilePath, err = clearName(fullFilePath); err != nil {
return nil, err
if flag&os.O_CREATE != 0 {
// file should not have / suffix.
if strings.HasSuffix(fullFilePath, "/") {
return nil, os.ErrInvalid
_, err = fs.stat(ctx, fullFilePath)
if err == nil {
if flag&os.O_EXCL != 0 {
return nil, os.ErrExist
fs.removeAll(ctx, fullFilePath)
dir, name := util.FullPath(fullFilePath).DirAndName()
err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: perm&os.ModeDir > 0,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(perm),
Uid: fs.option.Uid,
Gid: fs.option.Gid,
Collection: fs.option.Collection,
Replication: "000",
TtlSec: 0,
Signatures: []int32{fs.signature},
}); err != nil {
return fmt.Errorf("create %s: %v", fullFilePath, err)
return nil
if err != nil {
return nil, err
return &WebDavFile{
fs: fs,
name: fullFilePath,
isDirectory: false,
bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
fi, err := fs.stat(ctx, fullFilePath)
if err != nil {
return nil, os.ErrNotExist
if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
fullFilePath += "/"
return &WebDavFile{
fs: fs,
name: fullFilePath,
isDirectory: false,
bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
var err error
if fullFilePath, err = clearName(fullFilePath); err != nil {
return err
dir, name := util.FullPath(fullFilePath).DirAndName()
return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
return fs.removeAll(ctx, name)
func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
var err error
if oldName, err = clearName(oldName); err != nil {
return err
if newName, err = clearName(newName); err != nil {
return err
of, err := fs.stat(ctx, oldName)
if err != nil {
return os.ErrExist
if of.IsDir() {
if strings.HasSuffix(oldName, "/") {
oldName = strings.TrimRight(oldName, "/")
if strings.HasSuffix(newName, "/") {
newName = strings.TrimRight(newName, "/")
_, err = fs.stat(ctx, newName)
if err == nil {
return os.ErrExist
oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
newDir, newBaseName := util.FullPath(newName).DirAndName()
return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: oldDir,
OldName: oldBaseName,
NewDirectory: newDir,
NewName: newBaseName,
_, err := client.AtomicRenameEntry(ctx, request)
if err != nil {
return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
return nil
func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
var err error
if fullFilePath, err = clearName(fullFilePath); err != nil {
return nil, err
fullpath := util.FullPath(fullFilePath)
var fi FileInfo
entry, err := filer_pb.GetEntry(fs, fullpath)
if entry == nil {
return nil, os.ErrNotExist
if err != nil {
return nil, err
fi.size = int64(filer.FileSize(entry)) = string(fullpath)
fi.mode = os.FileMode(entry.Attributes.FileMode)
fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
fi.isDirectory = entry.IsDirectory
if == "/" {
fi.modifiledTime = time.Now()
fi.isDirectory = true
return &fi, nil
func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
return fs.stat(ctx, name)
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
var auth security.EncodedJwt
if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: "",
Collection: f.fs.option.Collection,
Path: name,
resp, err := client.AssignVolume(ctx, request)
if err != nil {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
if resp.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
f.collection, f.replication = resp.Collection, resp.Replication
return nil
}); flushErr != nil {
return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
uploadResult, flushErr, _ := operation.Upload(fileUrl,, f.fs.option.Cipher, reader, false, "", nil, auth)
if flushErr != nil {
glog.V(0).Infof("upload data %v to %s: %v",, fileUrl, flushErr)
return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
if uploadResult.Error != "" {
glog.V(0).Infof("upload failure %v to %s: %v",, fileUrl, flushErr)
return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
func (f *WebDavFile) Write(buf []byte) (int, error) {
glog.V(2).Infof("WebDavFileSystem.Write %v",
dir, _ := util.FullPath(
var getErr error
ctx := context.Background()
if f.entry == nil {
f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(
if f.entry == nil {
return 0, getErr
if getErr != nil {
return 0, getErr
if f.bufWriter.FlushFunc == nil {
f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
var chunk *filer_pb.FileChunk
chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data),, offset)
if flushErr != nil {
return fmt.Errorf("%s upload result: %v",, flushErr)
f.entry.Content = nil
f.entry.Chunks = append(f.entry.Chunks, chunk)
return flushErr
f.bufWriter.CloseFunc = func() error {
manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
if manifestErr != nil {
// not good, but should be ok
glog.V(0).Infof("file %s close MaybeManifestize: %v",, manifestErr)
} else {
f.entry.Chunks = manifestedChunks
flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
f.entry.Attributes.Mtime = time.Now().Unix()
f.entry.Attributes.Collection = f.collection
f.entry.Attributes.Replication = f.replication
request := &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: f.entry,
Signatures: []int32{f.fs.signature},
if _, err := client.UpdateEntry(ctx, request); err != nil {
return fmt.Errorf("update %s: %v",, err)
return nil
return flushErr
written, err := f.bufWriter.Write(buf)
if err == nil {
glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)",,, += int64(written)
return written, err
func (f *WebDavFile) Close() error {
glog.V(2).Infof("WebDavFileSystem.Close %v",
err := f.bufWriter.Close()
if f.entry != nil {
f.entry = nil
f.entryViewCache = nil
return err
func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
glog.V(2).Infof("WebDavFileSystem.Read %v",
if f.entry == nil {
f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(
if f.entry == nil {
return 0, err
if err != nil {
return 0, err
fileSize := int64(filer.FileSize(f.entry))
if fileSize == 0 {
return 0, io.EOF
if f.entryViewCache == nil {
f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
f.reader = nil
if f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
readSize, err = f.reader.ReadAt(p,
glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)",,, += int64(readSize)
if err != nil && err != io.EOF {
glog.Errorf("file read %s: %v",, err)
func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d",, count)
dir, _ := util.FullPath(
err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
fi := FileInfo{
size: int64(filer.FileSize(entry)),
name: entry.Name,
mode: os.FileMode(entry.Attributes.FileMode),
modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
isDirectory: entry.IsDirectory,
if !strings.HasSuffix(, "/") && fi.IsDir() { += "/"
glog.V(4).Infof("entry: %v",
ret = append(ret, &fi)
return nil
old :=
if old >= int64(len(ret)) {
if count > 0 {
return nil, io.EOF
return nil, nil
if count > 0 { += int64(count)
if > int64(len(ret)) { = int64(len(ret))
} else { = int64(len(ret))
old = 0
return ret[], nil
func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
glog.V(2).Infof("WebDavFile.Seek %v %v %v",, offset, whence)
ctx := context.Background()
var err error
switch whence {
case io.SeekStart: = 0
case io.SeekEnd:
if fi, err := f.fs.stat(ctx,; err != nil {
return 0, err
} else { = fi.Size()
} += offset
return, err
func (f *WebDavFile) Stat() (os.FileInfo, error) {
glog.V(2).Infof("WebDavFile.Stat %v",
ctx := context.Background()
return f.fs.stat(ctx,