webdav: improve webdav upload speed

This commit is contained in:
Chris Lu 2020-12-01 15:32:27 -08:00
parent 005a6123e9
commit 04062c56c7
2 changed files with 140 additions and 47 deletions

View file

@ -1,6 +1,7 @@
package weed_server package weed_server
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io" "io"
@ -10,6 +11,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
"golang.org/x/net/webdav" "golang.org/x/net/webdav"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -96,6 +98,9 @@ type WebDavFile struct {
entry *filer_pb.Entry entry *filer_pb.Entry
entryViewCache []filer.VisibleInterval entryViewCache []filer.VisibleInterval
reader io.ReaderAt reader io.ReaderAt
bufWriter *buffered_writer.BufferedWriteCloser
collection string
replication string
} }
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
@ -232,6 +237,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs, fs: fs,
name: fullFilePath, name: fullFilePath,
isDirectory: false, isDirectory: false,
bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil }, nil
} }
@ -247,6 +253,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs, fs: fs,
name: fullFilePath, name: fullFilePath,
isDirectory: false, isDirectory: false,
bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil }, nil
} }
@ -358,36 +365,20 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
return fs.stat(ctx, name) return fs.stat(ctx, name)
} }
func (f *WebDavFile) Write(buf []byte) (int, error) { func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
dir, _ := util.FullPath(f.name).DirAndName()
var err error
ctx := context.Background()
if f.entry == nil {
f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
}
if f.entry == nil {
return 0, err
}
if err != nil {
return 0, err
}
var fileId, host string var fileId, host string
var auth security.EncodedJwt var auth security.EncodedJwt
var collection, replication string
if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
request := &filer_pb.AssignVolumeRequest{ request := &filer_pb.AssignVolumeRequest{
Count: 1, Count: 1,
Replication: "", Replication: "",
Collection: f.fs.option.Collection, Collection: f.fs.option.Collection,
Path: f.name, Path: name,
} }
resp, err := client.AssignVolume(ctx, request) resp, err := client.AssignVolume(ctx, request)
@ -400,31 +391,74 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
} }
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
collection, replication = resp.Collection, resp.Replication f.collection, f.replication = resp.Collection, resp.Replication
return nil return nil
}); err != nil { }); flushErr != nil {
return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err) return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
} }
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth) uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
if err != nil { if flushErr != nil {
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err) glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
return 0, fmt.Errorf("upload data: %v", err) return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
} }
if uploadResult.Error != "" { if uploadResult.Error != "" {
glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err) glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
return 0, fmt.Errorf("upload result: %v", uploadResult.Error) return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
}
return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
}
func (f *WebDavFile) Write(buf []byte) (int, error) {
glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
dir, _ := util.FullPath(f.name).DirAndName()
var getErr error
ctx := context.Background()
if f.entry == nil {
f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
}
if f.entry == nil {
return 0, getErr
}
if getErr != nil {
return 0, getErr
}
if f.bufWriter.FlushFunc == nil {
f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
var chunk *filer_pb.FileChunk
chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
if flushErr != nil {
return fmt.Errorf("%s upload result: %v", f.name, flushErr)
} }
f.entry.Content = nil f.entry.Content = nil
f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off)) f.entry.Chunks = append(f.entry.Chunks, chunk)
err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { return flushErr
}
f.bufWriter.CloseFunc = func() error {
manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
if manifestErr != nil {
// not good, but should be ok
glog.V(0).Infof("file %s close MaybeManifestize: %v", manifestErr)
} else {
f.entry.Chunks = manifestedChunks
}
flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
f.entry.Attributes.Mtime = time.Now().Unix() f.entry.Attributes.Mtime = time.Now().Unix()
f.entry.Attributes.Collection = collection f.entry.Attributes.Collection = f.collection
f.entry.Attributes.Replication = replication f.entry.Attributes.Replication = f.replication
request := &filer_pb.UpdateEntryRequest{ request := &filer_pb.UpdateEntryRequest{
Directory: dir, Directory: dir,
@ -438,25 +472,32 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
return nil return nil
}) })
return flushErr
}
}
written, err := f.bufWriter.Write(buf)
if err == nil { if err == nil {
glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf))) glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
f.off += int64(len(buf)) f.off += int64(written)
} }
return len(buf), err return written, err
} }
func (f *WebDavFile) Close() error { func (f *WebDavFile) Close() error {
glog.V(2).Infof("WebDavFileSystem.Close %v", f.name) glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
err := f.bufWriter.Close()
if f.entry != nil { if f.entry != nil {
f.entry = nil f.entry = nil
f.entryViewCache = nil f.entryViewCache = nil
} }
return nil return err
} }
func (f *WebDavFile) Read(p []byte) (readSize int, err error) { func (f *WebDavFile) Read(p []byte) (readSize int, err error) {

View file

@ -0,0 +1,52 @@
package buffered_writer
import (
"bytes"
"io"
)
var _ = io.WriteCloser(&BufferedWriteCloser{})
type BufferedWriteCloser struct {
buffer bytes.Buffer
bufferLimit int
position int64
nextFlushOffset int64
FlushFunc func([]byte, int64) error
CloseFunc func() error
}
func NewBufferedWriteCloser(bufferLimit int) *BufferedWriteCloser {
return &BufferedWriteCloser{
bufferLimit: bufferLimit,
}
}
func (b *BufferedWriteCloser) Write(p []byte) (n int, err error) {
if b.buffer.Len()+len(p) >= b.bufferLimit {
if err := b.FlushFunc(b.buffer.Bytes(), b.nextFlushOffset); err != nil {
return 0, err
}
b.nextFlushOffset += int64(b.buffer.Len())
b.buffer.Reset()
}
return b.buffer.Write(p)
}
func (b *BufferedWriteCloser) Close() error {
if b.buffer.Len() > 0 {
if err := b.FlushFunc(b.buffer.Bytes(), b.nextFlushOffset); err != nil {
return err
}
}
if b.CloseFunc != nil {
if err := b.CloseFunc(); err != nil {
return err
}
}
return nil
}