diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index e0c3a91de..4297ec62d 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -51,7 +51,6 @@ func (pages *TempFileDirtyPages) FlushData() error { if pages.lastErr != nil { return fmt.Errorf("flush data: %v", pages.lastErr) } - pages.chunkedFile.Reset() return nil } @@ -68,6 +67,7 @@ func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) { reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size()) + interval.MarkFlushed() }) } @@ -102,5 +102,5 @@ func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reade } func (pages TempFileDirtyPages) Destroy() { - pages.chunkedFile.Reset() + pages.chunkedFile.Destroy() } diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go index dca9a1740..838b1cdfe 100644 --- a/weed/filesys/page_writer/chunk_interval_list.go +++ b/weed/filesys/page_writer/chunk_interval_list.go @@ -6,6 +6,7 @@ import "math" type ChunkWrittenInterval struct { StartOffset int64 stopOffset int64 + flushed bool prev *ChunkWrittenInterval next *ChunkWrittenInterval } @@ -18,6 +19,10 @@ func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool { return interval.stopOffset-interval.StartOffset == chunkSize } +func (interval *ChunkWrittenInterval) MarkFlushed() { + interval.flushed = true +} + // ChunkWrittenIntervalList mark written intervals within one page chunk type ChunkWrittenIntervalList struct { head *ChunkWrittenInterval @@ -64,18 +69,21 @@ func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset { // merge p and q together p.stopOffset = q.stopOffset + p.flushed = false unlinkNodesBetween(p, q.next) return } if interval.StartOffset <= p.stopOffset { // merge new interval into p p.stopOffset = interval.stopOffset + p.flushed = false unlinkNodesBetween(p, q) return } if q.StartOffset <= interval.stopOffset { // merge new interval into q q.StartOffset = interval.StartOffset + q.flushed = false unlinkNodesBetween(p, q) return } diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go index b0e1c2844..d2f94dafd 100644 --- a/weed/filesys/page_writer/chunked_file_writer.go +++ b/weed/filesys/page_writer/chunked_file_writer.go @@ -106,13 +106,15 @@ func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, log for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { chunkUsage := cw.chunkUsages[actualChunkIndex] for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - process(cw.file, logicChunkIndex, t) + if !t.flushed { + process(cw.file, logicChunkIndex, t) + } } } } -// Reset releases used resources -func (cw *ChunkedFileWriter) Reset() { +// Destroy releases used resources +func (cw *ChunkedFileWriter) Destroy() { if cw.file != nil { cw.file.Close() os.Remove(cw.file.Name()) diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go index 244ed62c3..1c72c77d4 100644 --- a/weed/filesys/page_writer/chunked_file_writer_test.go +++ b/weed/filesys/page_writer/chunked_file_writer_test.go @@ -35,9 +35,9 @@ func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { func TestWriteChunkedFile(t *testing.T) { x := NewChunkedFileWriter(os.TempDir(), 20) - defer x.Reset() + defer x.Destroy() y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() + defer y.Destroy() batchSize := 4 buf := make([]byte, batchSize) diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go index 3c55a91ad..364148c14 100644 --- a/weed/filesys/page_writer/chunked_stream_writer_test.go +++ b/weed/filesys/page_writer/chunked_stream_writer_test.go @@ -10,7 +10,7 @@ func TestWriteChunkedStream(t *testing.T) { x := NewChunkedStreamWriter(20) defer x.Reset() y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() + defer y.Destroy() batchSize := 4 buf := make([]byte, batchSize)