delay deleting from memory unless the metadata chunks is updated

This commit is contained in:
chrislu 2022-01-15 07:40:29 -08:00
parent 2c95008a1a
commit 1dc25218cd
3 changed files with 5 additions and 7 deletions

View file

@ -52,7 +52,6 @@ func (pages *StreamDirtyPages) FlushData() error {
if pages.lastErr != nil { if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr) return fmt.Errorf("flush data: %v", pages.lastErr)
} }
pages.chunkedStream.Reset()
return nil return nil
} }
@ -102,5 +101,5 @@ func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader,
} }
func (pages StreamDirtyPages) Destroy() { func (pages StreamDirtyPages) Destroy() {
pages.chunkedStream.Reset() pages.chunkedStream.Destroy()
} }

View file

@ -57,7 +57,6 @@ func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) {
if memChunk.usage.IsComplete(cw.ChunkSize) { if memChunk.usage.IsComplete(cw.ChunkSize) {
if cw.saveToStorageFn != nil { if cw.saveToStorageFn != nil {
cw.saveOneChunk(memChunk, logicChunkIndex) cw.saveOneChunk(memChunk, logicChunkIndex)
delete(cw.activeChunks, logicChunkIndex)
} }
} }
@ -92,7 +91,6 @@ func (cw *ChunkedStreamWriter) FlushAll() {
for logicChunkIndex, memChunk := range cw.activeChunks { for logicChunkIndex, memChunk := range cw.activeChunks {
if cw.saveToStorageFn != nil { if cw.saveToStorageFn != nil {
cw.saveOneChunk(memChunk, logicChunkIndex) cw.saveOneChunk(memChunk, logicChunkIndex)
delete(cw.activeChunks, logicChunkIndex)
} }
} }
} }
@ -102,6 +100,7 @@ func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex
for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
delete(cw.activeChunks, logicChunkIndex)
atomic.AddInt32(&referenceCounter, -1) atomic.AddInt32(&referenceCounter, -1)
if atomic.LoadInt32(&referenceCounter) == 0 { if atomic.LoadInt32(&referenceCounter) == 0 {
mem.Free(memChunk.buf) mem.Free(memChunk.buf)
@ -110,8 +109,8 @@ func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex
} }
} }
// Reset releases used resources // Destroy releases used resources
func (cw *ChunkedStreamWriter) Reset() { func (cw *ChunkedStreamWriter) Destroy() {
for t, memChunk := range cw.activeChunks { for t, memChunk := range cw.activeChunks {
mem.Free(memChunk.buf) mem.Free(memChunk.buf)
delete(cw.activeChunks, t) delete(cw.activeChunks, t)

View file

@ -8,7 +8,7 @@ import (
func TestWriteChunkedStream(t *testing.T) { func TestWriteChunkedStream(t *testing.T) {
x := NewChunkedStreamWriter(20) x := NewChunkedStreamWriter(20)
defer x.Reset() defer x.Destroy()
y := NewChunkedFileWriter(os.TempDir(), 12) y := NewChunkedFileWriter(os.TempDir(), 12)
defer y.Destroy() defer y.Destroy()