diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 47664ed55..05300ef1c 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -19,9 +19,9 @@ type UploadPipeline struct { writableChunksLock sync.Mutex sealedChunks map[LogicChunkIndex]*SealedChunk sealedChunksLock sync.Mutex - writers *util.LimitedConcurrentExecutor - activeWriterCond *sync.Cond - activeWriterCount int32 + uploaders *util.LimitedConcurrentExecutor + uploaderCount int32 + uploaderCountCond *sync.Cond saveToStorageFn SaveToStorageFunc activeReadChunks map[LogicChunkIndex]int activeReadChunksLock sync.Mutex @@ -42,14 +42,14 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { return &UploadPipeline{ - ChunkSize: chunkSize, - writableChunks: make(map[LogicChunkIndex]*MemChunk), - sealedChunks: make(map[LogicChunkIndex]*SealedChunk), - writers: writers, - activeWriterCond: sync.NewCond(&sync.Mutex{}), - saveToStorageFn: saveToStorageFn, - filepath: filepath, - activeReadChunks: make(map[LogicChunkIndex]int), + ChunkSize: chunkSize, + writableChunks: make(map[LogicChunkIndex]*MemChunk), + sealedChunks: make(map[LogicChunkIndex]*SealedChunk), + uploaders: writers, + uploaderCountCond: sync.NewCond(&sync.Mutex{}), + saveToStorageFn: saveToStorageFn, + filepath: filepath, + activeReadChunks: make(map[LogicChunkIndex]int), } } @@ -162,17 +162,16 @@ func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { } func (cw *UploadPipeline) waitForCurrentWritersToComplete() { - cw.activeWriterCond.L.Lock() + cw.uploaderCountCond.L.Lock() t := int32(100) for { - t = atomic.LoadInt32(&cw.activeWriterCount) + t = atomic.LoadInt32(&cw.uploaderCount) if t <= 0 { break } - glog.V(4).Infof("activeWriterCond is %d", t) - cw.activeWriterCond.Wait() + cw.uploaderCountCond.Wait() } - cw.activeWriterCond.L.Unlock() + cw.uploaderCountCond.L.Unlock() } func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { @@ -182,8 +181,8 @@ func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex } func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { - atomic.AddInt32(&cw.activeWriterCount, 1) - glog.V(4).Infof("%s activeWriterCount %d ++> %d", cw.filepath, cw.activeWriterCount-1, cw.activeWriterCount) + atomic.AddInt32(&cw.uploaderCount, 1) + glog.V(4).Infof("%s uploaderCount %d ++> %d", cw.filepath, cw.uploaderCount-1, cw.uploaderCount) cw.sealedChunksLock.Lock() @@ -199,19 +198,19 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.sealedChunksLock.Unlock() - cw.writers.Execute(func() { + cw.uploaders.Execute(func() { // first add to the file chunks cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) // notify waiting process - atomic.AddInt32(&cw.activeWriterCount, -1) - glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount) + atomic.AddInt32(&cw.uploaderCount, -1) + glog.V(4).Infof("%s uploaderCount %d --> %d", cw.filepath, cw.uploaderCount+1, cw.uploaderCount) // Lock and Unlock are not required, // but it may signal multiple times during one wakeup, // and the waiting goroutine may miss some of them! - cw.activeWriterCond.L.Lock() - cw.activeWriterCond.Broadcast() - cw.activeWriterCond.L.Unlock() + cw.uploaderCountCond.L.Lock() + cw.uploaderCountCond.Broadcast() + cw.uploaderCountCond.L.Unlock() // wait for readers for cw.IsLocked(logicChunkIndex) {