This commit is contained in:
chrislu 2022-03-13 18:34:57 -07:00
parent f2f68f675e
commit 5cba8e51c5

View file

@ -24,7 +24,7 @@ type UploadPipeline struct {
saveToStorageFn SaveToStorageFunc saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex activeReadChunksLock sync.Mutex
bufferChunkLimit int writableChunkLimit int
swapFile *SwapFile swapFile *SwapFile
} }
@ -43,15 +43,15 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline { func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline {
return &UploadPipeline{ return &UploadPipeline{
ChunkSize: chunkSize, ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk), writableChunks: make(map[LogicChunkIndex]PageChunk),
sealedChunks: make(map[LogicChunkIndex]*SealedChunk), sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
uploaders: writers, uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}), uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn, saveToStorageFn: saveToStorageFn,
activeReadChunks: make(map[LogicChunkIndex]int), activeReadChunks: make(map[LogicChunkIndex]int),
bufferChunkLimit: bufferChunkLimit, writableChunkLimit: bufferChunkLimit,
swapFile: NewSwapFile(swapFileDir, chunkSize), swapFile: NewSwapFile(swapFileDir, chunkSize),
} }
} }
@ -63,7 +63,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
pageChunk, found := up.writableChunks[logicChunkIndex] pageChunk, found := up.writableChunks[logicChunkIndex]
if !found { if !found {
if len(up.writableChunks) > up.bufferChunkLimit { if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit // if current file chunks is over the per file buffer count limit
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks { for lci, mc := range up.writableChunks {
@ -78,8 +78,8 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
} }
if isSequential && if isSequential &&
len(up.writableChunks) < up.bufferChunkLimit && len(up.writableChunks) < up.writableChunkLimit &&
atomic.LoadInt64(&memChunkCounter) < 4*int64(up.bufferChunkLimit) { atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else { } else {
pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)