Merge pull request #2694 from banjiaojuhao/filer_http-write_with_offset

filer_http: support uploading file with offset
This commit is contained in:
Chris Lu 2022-02-22 01:36:12 -08:00 committed by GitHub
commit 5f6f9e9a53
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 9 deletions

View file

@ -126,10 +126,6 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
return return
} }
func isAppend(r *http.Request) bool {
return r.URL.Query().Get("op") == "append"
}
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) { func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode // detect file mode
@ -161,8 +157,11 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
var entry *filer.Entry var entry *filer.Entry
var mergedChunks []*filer_pb.FileChunk var mergedChunks []*filer_pb.FileChunk
isAppend := r.URL.Query().Get("op") == "append"
isOffsetWrite := fileChunks[0].Offset > 0
// when it is an append // when it is an append
if isAppend(r) { if isAppend || isOffsetWrite {
existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path)) existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
if findErr != nil && findErr != filer_pb.ErrNotFound { if findErr != nil && findErr != filer_pb.ErrNotFound {
glog.V(0).Infof("failing to find %s: %v", path, findErr) glog.V(0).Infof("failing to find %s: %v", path, findErr)
@ -173,11 +172,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
entry.Mtime = time.Now() entry.Mtime = time.Now()
entry.Md5 = nil entry.Md5 = nil
// adjust chunk offsets // adjust chunk offsets
for _, chunk := range fileChunks { if isAppend {
chunk.Offset += int64(entry.FileSize) for _, chunk := range fileChunks {
chunk.Offset += int64(entry.FileSize)
}
entry.FileSize += uint64(chunkOffset)
} }
mergedChunks = append(entry.Chunks, fileChunks...) mergedChunks = append(entry.Chunks, fileChunks...)
entry.FileSize += uint64(chunkOffset)
// TODO // TODO
if len(entry.Content) > 0 { if len(entry.Content) > 0 {
@ -215,6 +216,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return return
} }
entry.Chunks = mergedChunks entry.Chunks = mergedChunks
if isOffsetWrite {
entry.Md5 = nil
entry.FileSize = entry.Size()
}
filerResult = &FilerPostResult{ filerResult = &FilerPostResult{
Name: fileName, Name: fileName,

View file

@ -3,10 +3,12 @@ package weed_server
import ( import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
"fmt"
"hash" "hash"
"io" "io"
"net/http" "net/http"
"sort" "sort"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -28,6 +30,22 @@ var bufPool = sync.Pool{
} }
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
query := r.URL.Query()
isAppend := query.Get("op") == "append"
if query.Has("offset") {
offset := query.Get("offset")
offsetInt, err := strconv.ParseInt(offset, 10, 64)
if err != nil || offsetInt < 0 {
err = fmt.Errorf("invalid 'offset': '%s'", offset)
return nil, nil, 0, err, nil
}
if isAppend && offsetInt > 0 {
err = fmt.Errorf("cannot set offset when op=append")
return nil, nil, 0, err, nil
}
chunkOffset = offsetInt
}
md5Hash = md5.New() md5Hash = md5.New()
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash)) var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
@ -63,7 +81,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
bytesBufferLimitCond.Signal() bytesBufferLimitCond.Signal()
break break
} }
if chunkOffset == 0 && !isAppend(r) { if chunkOffset == 0 && !isAppend {
if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) { if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) {
chunkOffset += dataSize chunkOffset += dataSize
smallContent = make([]byte, dataSize) smallContent = make([]byte, dataSize)