diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 0c9e13649..b87061adb 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -142,9 +142,10 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.sealedChunksLock.Unlock() cw.writers.Execute(func() { + // first add to the file chunks cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) - // remove from sealed chunks + // then remove from sealed chunks sealedChunk.FreeReference() cw.sealedChunksLock.Lock() defer cw.sealedChunksLock.Unlock() @@ -162,6 +163,9 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic } func (cw *UploadPipeline) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + if cw.saveToStorageFn == nil { + return + } for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go new file mode 100644 index 000000000..81191868f --- /dev/null +++ b/weed/filesys/page_writer/upload_pipeline_test.go @@ -0,0 +1,47 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "testing" +) + +func TestUploadPipeline(t *testing.T) { + + uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil) + + writeRange(uploadPipeline, 0, 131072) + writeRange(uploadPipeline, 131072, 262144) + writeRange(uploadPipeline, 262144, 1025536) + + confirmRange(t, uploadPipeline, 0, 1025536) + + writeRange(uploadPipeline, 1025536, 1296896) + + confirmRange(t, uploadPipeline, 1025536, 1296896) + + writeRange(uploadPipeline, 1296896, 2162688) + + confirmRange(t, uploadPipeline, 1296896, 2162688) + + confirmRange(t, uploadPipeline, 1296896, 2162688) +} + +// startOff and stopOff must be divided by 4 +func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) { + p := make([]byte, 4) + for i := startOff / 4; i < stopOff/4; i += 4 { + util.Uint32toBytes(p, uint32(i)) + uploadPipeline.SaveDataAt(p, i) + } +} + +func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) { + p := make([]byte, 4) + for i := startOff; i < stopOff/4; i += 4 { + uploadPipeline.MaybeReadDataAt(p, i) + x := util.BytesToUint32(p) + if x != uint32(i) { + t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4) + } + } +}