limits concurrent uploads for one file

This commit is contained in:
Chris Lu 2021-06-06 23:05:17 -07:00
parent 8295e2feb6
commit 452c6ef183

View file

@ -10,6 +10,7 @@ import (
"sort" "sort"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
@ -38,11 +39,21 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var uploadErr error var uploadErr error
var wg sync.WaitGroup var wg sync.WaitGroup
var bytesBufferCounter int64
bytesBufferLimitCond := sync.NewCond(new(sync.Mutex))
for { for {
// need to throttle this for large files // need to throttle used byte buffer
bytesBufferLimitCond.L.Lock()
for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
glog.V(4).Infof("waiting for byte buffer %d", bytesBufferCounter)
bytesBufferLimitCond.Wait()
}
atomic.AddInt64(&bytesBufferCounter, 1)
bytesBufferLimitCond.L.Unlock()
bytesBuffer := bufPool.Get().(*bytes.Buffer) bytesBuffer := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bytesBuffer) glog.V(4).Infof("received byte buffer %d", bytesBufferCounter)
limitedReader := io.LimitReader(partReader, int64(chunkSize)) limitedReader := io.LimitReader(partReader, int64(chunkSize))
@ -52,6 +63,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
// data, err := ioutil.ReadAll(limitedReader) // data, err := ioutil.ReadAll(limitedReader)
if err != nil || dataSize == 0 { if err != nil || dataSize == 0 {
bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1)
bytesBufferLimitCond.Signal()
return nil, md5Hash, 0, err, nil return nil, md5Hash, 0, err, nil
} }
if chunkOffset == 0 && !isAppend(r) { if chunkOffset == 0 && !isAppend(r) {
@ -59,13 +73,21 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
chunkOffset += dataSize chunkOffset += dataSize
smallContent = make([]byte, dataSize) smallContent = make([]byte, dataSize)
bytesBuffer.Read(smallContent) bytesBuffer.Read(smallContent)
bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1)
bytesBufferLimitCond.Signal()
break break
} }
} }
wg.Add(1) wg.Add(1)
go func(offset int64) { go func(offset int64) {
defer wg.Done() defer func() {
bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1)
bytesBufferLimitCond.Signal()
wg.Done()
}()
chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so) chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
if toChunkErr != nil { if toChunkErr != nil {