This commit is contained in:
chrislu 2022-01-17 13:40:41 -08:00
parent da7f13e73e
commit 1734017ba1
2 changed files with 52 additions and 1 deletions

View file

@ -142,9 +142,10 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic
cw.sealedChunksLock.Unlock()
cw.writers.Execute(func() {
// first add to the file chunks
cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex)
// remove from sealed chunks
// then remove from sealed chunks
sealedChunk.FreeReference()
cw.sealedChunksLock.Lock()
defer cw.sealedChunksLock.Unlock()
@ -162,6 +163,9 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic
}
func (cw *UploadPipeline) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
if cw.saveToStorageFn == nil {
return
}
for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {

View file

@ -0,0 +1,47 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/util"
"testing"
)
func TestUploadPipeline(t *testing.T) {
uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil)
writeRange(uploadPipeline, 0, 131072)
writeRange(uploadPipeline, 131072, 262144)
writeRange(uploadPipeline, 262144, 1025536)
confirmRange(t, uploadPipeline, 0, 1025536)
writeRange(uploadPipeline, 1025536, 1296896)
confirmRange(t, uploadPipeline, 1025536, 1296896)
writeRange(uploadPipeline, 1296896, 2162688)
confirmRange(t, uploadPipeline, 1296896, 2162688)
confirmRange(t, uploadPipeline, 1296896, 2162688)
}
// startOff and stopOff must be divided by 4
func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i))
uploadPipeline.SaveDataAt(p, i)
}
}
func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff; i < stopOff/4; i += 4 {
uploadPipeline.MaybeReadDataAt(p, i)
x := util.BytesToUint32(p)
if x != uint32(i) {
t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
}
}
}