filer: parallel data upload

This commit is contained in:
Chris Lu 2021-06-06 20:23:36 -07:00
parent e00443a940
commit bb45dea15a

View file

@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"net/http"
"sort"
"strings"
"sync"
"time"
@ -34,10 +35,15 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
chunkOffset := int64(0)
var smallContent []byte
var uploadErr error
bytesBuffer := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bytesBuffer)
var wg sync.WaitGroup
for {
// need to throttle this for large files
bytesBuffer := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bytesBuffer)
limitedReader := io.LimitReader(partReader, int64(chunkSize))
bytesBuffer.Reset()
@ -45,8 +51,8 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
dataSize, err := bytesBuffer.ReadFrom(limitedReader)
// data, err := ioutil.ReadAll(limitedReader)
if err != nil {
return nil, nil, 0, err, nil
if err != nil || dataSize == 0 {
return nil, md5Hash, 0, err, nil
}
if chunkOffset == 0 && !isAppend(r) {
if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && dataSize < 4*1024 {
@ -57,30 +63,39 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
}
}
chunk, uploadErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), chunkOffset, so, md5Hash)
wg.Add(1)
go func(offset int64) {
defer wg.Done()
if uploadErr != nil {
return nil, nil, 0, uploadErr, nil
}
// if last chunk exhausted the reader exactly at the border
if chunk == nil {
break
}
// Save to chunk manifest structure
fileChunks = append(fileChunks, chunk)
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, chunkOffset, chunkOffset+int64(chunk.Size))
chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so, md5Hash)
if toChunkErr != nil {
uploadErr = toChunkErr
}
if chunk != nil {
fileChunks = append(fileChunks, chunk)
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size))
}
}(chunkOffset)
// reset variables for the next chunk
chunkOffset = chunkOffset + int64(chunk.Size)
chunkOffset = chunkOffset + dataSize
// if last chunk was not at full chunk size, but already exhausted the reader
if int64(chunk.Size) < int64(chunkSize) {
if dataSize < int64(chunkSize) {
break
}
}
wg.Wait()
if uploadErr != nil {
return nil, md5Hash, 0, uploadErr, nil
}
sort.Slice(fileChunks, func(i, j int) bool {
return fileChunks[i].Offset < fileChunks[j].Offset
})
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}