From 5cba8e51c5bb8925b3676b03a368f7816215cc68 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 13 Mar 2022 18:34:57 -0700 Subject: [PATCH] refactor --- weed/mount/page_writer/upload_pipeline.go | 26 +++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 2286cdf00..e084ca58f 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -24,7 +24,7 @@ type UploadPipeline struct { saveToStorageFn SaveToStorageFunc activeReadChunks map[LogicChunkIndex]int activeReadChunksLock sync.Mutex - bufferChunkLimit int + writableChunkLimit int swapFile *SwapFile } @@ -43,15 +43,15 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline { return &UploadPipeline{ - ChunkSize: chunkSize, - writableChunks: make(map[LogicChunkIndex]PageChunk), - sealedChunks: make(map[LogicChunkIndex]*SealedChunk), - uploaders: writers, - uploaderCountCond: sync.NewCond(&sync.Mutex{}), - saveToStorageFn: saveToStorageFn, - activeReadChunks: make(map[LogicChunkIndex]int), - bufferChunkLimit: bufferChunkLimit, - swapFile: NewSwapFile(swapFileDir, chunkSize), + ChunkSize: chunkSize, + writableChunks: make(map[LogicChunkIndex]PageChunk), + sealedChunks: make(map[LogicChunkIndex]*SealedChunk), + uploaders: writers, + uploaderCountCond: sync.NewCond(&sync.Mutex{}), + saveToStorageFn: saveToStorageFn, + activeReadChunks: make(map[LogicChunkIndex]int), + writableChunkLimit: bufferChunkLimit, + swapFile: NewSwapFile(swapFileDir, chunkSize), } } @@ -63,7 +63,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n pageChunk, found := up.writableChunks[logicChunkIndex] if !found { - if len(up.writableChunks) > up.bufferChunkLimit { + if len(up.writableChunks) > up.writableChunkLimit { // if current file chunks is over the per file buffer count limit fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) for lci, mc := range up.writableChunks { @@ -78,8 +78,8 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) } if isSequential && - len(up.writableChunks) < up.bufferChunkLimit && - atomic.LoadInt64(&memChunkCounter) < 4*int64(up.bufferChunkLimit) { + len(up.writableChunks) < up.writableChunkLimit && + atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } else { pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)