This commit is contained in:
chrislu 2022-01-22 01:46:10 -08:00
parent 4acfc098e9
commit 3b4a9addaf

View file

@ -19,9 +19,9 @@ type UploadPipeline struct {
writableChunksLock sync.Mutex
sealedChunks map[LogicChunkIndex]*SealedChunk
sealedChunksLock sync.Mutex
writers *util.LimitedConcurrentExecutor
activeWriterCond *sync.Cond
activeWriterCount int32
uploaders *util.LimitedConcurrentExecutor
uploaderCount int32
uploaderCountCond *sync.Cond
saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
@ -42,14 +42,14 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline {
return &UploadPipeline{
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]*MemChunk),
sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
writers: writers,
activeWriterCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
filepath: filepath,
activeReadChunks: make(map[LogicChunkIndex]int),
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]*MemChunk),
sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
filepath: filepath,
activeReadChunks: make(map[LogicChunkIndex]int),
}
}
@ -162,17 +162,16 @@ func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
}
func (cw *UploadPipeline) waitForCurrentWritersToComplete() {
cw.activeWriterCond.L.Lock()
cw.uploaderCountCond.L.Lock()
t := int32(100)
for {
t = atomic.LoadInt32(&cw.activeWriterCount)
t = atomic.LoadInt32(&cw.uploaderCount)
if t <= 0 {
break
}
glog.V(4).Infof("activeWriterCond is %d", t)
cw.activeWriterCond.Wait()
cw.uploaderCountCond.Wait()
}
cw.activeWriterCond.L.Unlock()
cw.uploaderCountCond.L.Unlock()
}
func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
@ -182,8 +181,8 @@ func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex
}
func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
atomic.AddInt32(&cw.activeWriterCount, 1)
glog.V(4).Infof("%s activeWriterCount %d ++> %d", cw.filepath, cw.activeWriterCount-1, cw.activeWriterCount)
atomic.AddInt32(&cw.uploaderCount, 1)
glog.V(4).Infof("%s uploaderCount %d ++> %d", cw.filepath, cw.uploaderCount-1, cw.uploaderCount)
cw.sealedChunksLock.Lock()
@ -199,19 +198,19 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic
cw.sealedChunksLock.Unlock()
cw.writers.Execute(func() {
cw.uploaders.Execute(func() {
// first add to the file chunks
cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex)
// notify waiting process
atomic.AddInt32(&cw.activeWriterCount, -1)
glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount)
atomic.AddInt32(&cw.uploaderCount, -1)
glog.V(4).Infof("%s uploaderCount %d --> %d", cw.filepath, cw.uploaderCount+1, cw.uploaderCount)
// Lock and Unlock are not required,
// but it may signal multiple times during one wakeup,
// and the waiting goroutine may miss some of them!
cw.activeWriterCond.L.Lock()
cw.activeWriterCond.Broadcast()
cw.activeWriterCond.L.Unlock()
cw.uploaderCountCond.L.Lock()
cw.uploaderCountCond.Broadcast()
cw.uploaderCountCond.L.Unlock()
// wait for readers
for cw.IsLocked(logicChunkIndex) {