diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index d093676bc..f2b996ec5 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "net/http" + "sort" "strings" "sync" "time" @@ -34,10 +35,15 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque chunkOffset := int64(0) var smallContent []byte + var uploadErr error - bytesBuffer := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(bytesBuffer) + var wg sync.WaitGroup for { + + // need to throttle this for large files + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) + limitedReader := io.LimitReader(partReader, int64(chunkSize)) bytesBuffer.Reset() @@ -45,8 +51,8 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque dataSize, err := bytesBuffer.ReadFrom(limitedReader) // data, err := ioutil.ReadAll(limitedReader) - if err != nil { - return nil, nil, 0, err, nil + if err != nil || dataSize == 0 { + return nil, md5Hash, 0, err, nil } if chunkOffset == 0 && !isAppend(r) { if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && dataSize < 4*1024 { @@ -57,30 +63,39 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque } } - chunk, uploadErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), chunkOffset, so, md5Hash) + wg.Add(1) + go func(offset int64) { + defer wg.Done() - if uploadErr != nil { - return nil, nil, 0, uploadErr, nil - } - - // if last chunk exhausted the reader exactly at the border - if chunk == nil { - break - } - // Save to chunk manifest structure - fileChunks = append(fileChunks, chunk) - - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, chunkOffset, chunkOffset+int64(chunk.Size)) + chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so, md5Hash) + if toChunkErr != nil { + uploadErr = toChunkErr + } + if chunk != nil { + fileChunks = append(fileChunks, chunk) + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) + } + }(chunkOffset) // reset variables for the next chunk - chunkOffset = chunkOffset + int64(chunk.Size) + chunkOffset = chunkOffset + dataSize // if last chunk was not at full chunk size, but already exhausted the reader - if int64(chunk.Size) < int64(chunkSize) { + if dataSize < int64(chunkSize) { break } } + wg.Wait() + + if uploadErr != nil { + return nil, md5Hash, 0, uploadErr, nil + } + + sort.Slice(fileChunks, func(i, j int) bool { + return fileChunks[i].Offset < fileChunks[j].Offset + }) + return fileChunks, md5Hash, chunkOffset, nil, smallContent }