From 452c6ef18313bbf9655c133e5b5ed9456b983006 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 6 Jun 2021 23:05:17 -0700 Subject: [PATCH] limits concurrent uploads for one file --- .../filer_server_handlers_write_upload.go | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index cc9bb0dc0..32a722507 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -10,6 +10,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -38,11 +39,21 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque var uploadErr error var wg sync.WaitGroup + var bytesBufferCounter int64 + bytesBufferLimitCond := sync.NewCond(new(sync.Mutex)) for { - // need to throttle this for large files + // need to throttle used byte buffer + bytesBufferLimitCond.L.Lock() + for atomic.LoadInt64(&bytesBufferCounter) >= 4 { + glog.V(4).Infof("waiting for byte buffer %d", bytesBufferCounter) + bytesBufferLimitCond.Wait() + } + atomic.AddInt64(&bytesBufferCounter, 1) + bytesBufferLimitCond.L.Unlock() + bytesBuffer := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(bytesBuffer) + glog.V(4).Infof("received byte buffer %d", bytesBufferCounter) limitedReader := io.LimitReader(partReader, int64(chunkSize)) @@ -52,6 +63,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque // data, err := ioutil.ReadAll(limitedReader) if err != nil || dataSize == 0 { + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() return nil, md5Hash, 0, err, nil } if chunkOffset == 0 && !isAppend(r) { @@ -59,13 +73,21 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque chunkOffset += dataSize smallContent = make([]byte, dataSize) bytesBuffer.Read(smallContent) + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() break } } wg.Add(1) go func(offset int64) { - defer wg.Done() + defer func() { + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() + wg.Done() + }() chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so) if toChunkErr != nil {