package page_writer import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" "sync" "sync/atomic" ) type LogicChunkIndex int type UploadPipeline struct { uploaderCount int32 uploaderCountCond *sync.Cond filepath util.FullPath ChunkSize int64 uploaders *util.LimitedConcurrentExecutor saveToStorageFn SaveToStorageFunc writableChunkLimit int swapFile *SwapFile chunksLock sync.Mutex writableChunks map[LogicChunkIndex]PageChunk sealedChunks map[LogicChunkIndex]*SealedChunk activeReadChunks map[LogicChunkIndex]int readerCountCond *sync.Cond } type SealedChunk struct { chunk PageChunk referenceCounter int // track uploading or reading processes } func (sc *SealedChunk) FreeReference(messageOnFree string) { sc.referenceCounter-- if sc.referenceCounter == 0 { glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) sc.chunk.FreeResource() } } func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline { t := &UploadPipeline{ ChunkSize: chunkSize, writableChunks: make(map[LogicChunkIndex]PageChunk), sealedChunks: make(map[LogicChunkIndex]*SealedChunk), uploaders: writers, uploaderCountCond: sync.NewCond(&sync.Mutex{}), saveToStorageFn: saveToStorageFn, activeReadChunks: make(map[LogicChunkIndex]int), writableChunkLimit: bufferChunkLimit, swapFile: NewSwapFile(swapFileDir, chunkSize), } t.readerCountCond = sync.NewCond(&t.chunksLock) return t } func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) { up.chunksLock.Lock() defer up.chunksLock.Unlock() logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) pageChunk, found := up.writableChunks[logicChunkIndex] if !found { if len(up.writableChunks) > up.writableChunkLimit { // if current file chunks is over the per file buffer count limit fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) for lci, mc := range up.writableChunks { chunkFullness := mc.WrittenSize() if fullness < chunkFullness { fullestChunkIndex = lci fullness = chunkFullness } } up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) } if isSequential && len(up.writableChunks) < up.writableChunkLimit && atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } else { pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) } up.writableChunks[logicChunkIndex] = pageChunk } n = pageChunk.WriteDataAt(p, off) up.maybeMoveToSealed(pageChunk, logicChunkIndex) return } func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) up.chunksLock.Lock() defer func() { up.readerCountCond.Signal() up.chunksLock.Unlock() }() // read from sealed chunks first sealedChunk, found := up.sealedChunks[logicChunkIndex] if found { sealedChunk.referenceCounter++ } if found { maxStop = sealedChunk.chunk.ReadDataAt(p, off) glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) } // read from writable chunks last writableChunk, found := up.writableChunks[logicChunkIndex] if !found { return } writableMaxStop := writableChunk.ReadDataAt(p, off) glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) maxStop = max(maxStop, writableMaxStop) return } func (up *UploadPipeline) FlushAll() { up.chunksLock.Lock() defer up.chunksLock.Unlock() for logicChunkIndex, memChunk := range up.writableChunks { up.moveToSealed(memChunk, logicChunkIndex) } up.waitForCurrentWritersToComplete() } func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { if memChunk.IsComplete() { up.moveToSealed(memChunk, logicChunkIndex) } } func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { atomic.AddInt32(&up.uploaderCount, 1) glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) } sealedChunk := &SealedChunk{ chunk: memChunk, referenceCounter: 1, // default 1 is for uploading process } up.sealedChunks[logicChunkIndex] = sealedChunk delete(up.writableChunks, logicChunkIndex) // unlock before submitting the uploading jobs up.chunksLock.Unlock() up.uploaders.Execute(func() { // first add to the file chunks sealedChunk.chunk.SaveContent(up.saveToStorageFn) // notify waiting process atomic.AddInt32(&up.uploaderCount, -1) glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount) // Lock and Unlock are not required, // but it may signal multiple times during one wakeup, // and the waiting goroutine may miss some of them! up.uploaderCountCond.L.Lock() up.uploaderCountCond.Broadcast() up.uploaderCountCond.L.Unlock() // wait for readers up.chunksLock.Lock() defer up.chunksLock.Unlock() for up.IsLocked(logicChunkIndex) { up.readerCountCond.Wait() } // then remove from sealed chunks delete(up.sealedChunks, logicChunkIndex) sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) }) up.chunksLock.Lock() } func (up *UploadPipeline) Shutdown() { up.swapFile.FreeResource() up.chunksLock.Lock() defer up.chunksLock.Unlock() for logicChunkIndex, sealedChunk := range up.sealedChunks { sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex)) } }