From 8e80f3cd65112ee3970b74880f2b022356fccdad Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 22 Jan 2022 08:09:55 -0800 Subject: [PATCH] move upload pipeline locking to a different file --- weed/filesys/page_writer/upload_pipeline.go | 58 ----------------- .../page_writer/upload_pipeline_lock.go | 63 +++++++++++++++++++ 2 files changed, 63 insertions(+), 58 deletions(-) create mode 100644 weed/filesys/page_writer/upload_pipeline_lock.go diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index ee85cf6c8..a46c13c4b 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -119,64 +119,6 @@ func (up *UploadPipeline) FlushAll() { up.waitForCurrentWritersToComplete() } -func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) - if stopOffset%up.ChunkSize > 0 { - stopLogicChunkIndex += 1 - } - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := up.activeReadChunks[i]; found { - up.activeReadChunks[i] = count + 1 - } else { - up.activeReadChunks[i] = 1 - } - } -} - -func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) - if stopOffset%up.ChunkSize > 0 { - stopLogicChunkIndex += 1 - } - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := up.activeReadChunks[i]; found { - if count == 1 { - delete(up.activeReadChunks, i) - } else { - up.activeReadChunks[i] = count - 1 - } - } - } -} - -func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - if count, found := up.activeReadChunks[logicChunkIndex]; found { - return count > 0 - } - return false -} - -func (up *UploadPipeline) waitForCurrentWritersToComplete() { - up.uploaderCountCond.L.Lock() - t := int32(100) - for { - t = atomic.LoadInt32(&up.uploaderCount) - if t <= 0 { - break - } - up.uploaderCountCond.Wait() - } - up.uploaderCountCond.L.Unlock() -} - func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { if memChunk.IsComplete() { up.moveToSealed(memChunk, logicChunkIndex) diff --git a/weed/filesys/page_writer/upload_pipeline_lock.go b/weed/filesys/page_writer/upload_pipeline_lock.go new file mode 100644 index 000000000..47a40ba37 --- /dev/null +++ b/weed/filesys/page_writer/upload_pipeline_lock.go @@ -0,0 +1,63 @@ +package page_writer + +import ( + "sync/atomic" +) + +func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) + if stopOffset%up.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := up.activeReadChunks[i]; found { + up.activeReadChunks[i] = count + 1 + } else { + up.activeReadChunks[i] = 1 + } + } +} + +func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) + if stopOffset%up.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := up.activeReadChunks[i]; found { + if count == 1 { + delete(up.activeReadChunks, i) + } else { + up.activeReadChunks[i] = count - 1 + } + } + } +} + +func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() + if count, found := up.activeReadChunks[logicChunkIndex]; found { + return count > 0 + } + return false +} + +func (up *UploadPipeline) waitForCurrentWritersToComplete() { + up.uploaderCountCond.L.Lock() + t := int32(100) + for { + t = atomic.LoadInt32(&up.uploaderCount) + if t <= 0 { + break + } + up.uploaderCountCond.Wait() + } + up.uploaderCountCond.L.Unlock() +}