seaweedfs/weed/filesys/page_writer/chunked_stream_writer.go

135 lines
3.6 KiB
Go

package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"sync"
"sync/atomic"
)
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode
type ChunkedStreamWriter struct {
activeChunks map[LogicChunkIndex]*MemChunk
activeChunksLock sync.Mutex
ChunkSize int64
saveToStorageFn SaveToStorageFunc
sync.Mutex
}
type MemChunk struct {
buf []byte
usage *ChunkWrittenIntervalList
}
var _ = io.WriterAt(&ChunkedStreamWriter{})
func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter {
return &ChunkedStreamWriter{
ChunkSize: chunkSize,
activeChunks: make(map[LogicChunkIndex]*MemChunk),
}
}
func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) {
cw.saveToStorageFn = saveToStorageFn
}
func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) {
cw.Lock()
defer cw.Unlock()
logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
offsetRemainder := off % cw.ChunkSize
memChunk, found := cw.activeChunks[logicChunkIndex]
if !found {
memChunk = &MemChunk{
buf: mem.Allocate(int(cw.ChunkSize)),
usage: newChunkWrittenIntervalList(),
}
cw.activeChunks[logicChunkIndex] = memChunk
}
n = copy(memChunk.buf[offsetRemainder:], p)
memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n))
if memChunk.usage.IsComplete(cw.ChunkSize) {
if cw.saveToStorageFn != nil {
cw.saveOneChunk(memChunk, logicChunkIndex)
}
}
return
}
func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
cw.Lock()
defer cw.Unlock()
logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize
memChunk, found := cw.activeChunks[logicChunkIndex]
if !found {
return
}
for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop)
isAllZero := true
for i := logicStart - off; i < logicStop-off; i++ {
if p[i] != 0 {
isAllZero = false
break
}
}
if isAllZero {
glog.Errorf("Copied content is all Zero [%d,%d)", logicStart-off, logicStop-off)
}
}
}
return
}
func (cw *ChunkedStreamWriter) FlushAll() {
cw.Lock()
defer cw.Unlock()
for logicChunkIndex, memChunk := range cw.activeChunks {
if cw.saveToStorageFn != nil {
cw.saveOneChunk(memChunk, logicChunkIndex)
}
}
}
func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
var referenceCounter = int32(memChunk.usage.size())
for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
atomic.AddInt32(&referenceCounter, -1)
if atomic.LoadInt32(&referenceCounter) == 0 {
mem.Free(memChunk.buf)
delete(cw.activeChunks, logicChunkIndex)
}
})
}
}
// Destroy releases used resources
func (cw *ChunkedStreamWriter) Destroy() {
cw.Lock()
defer cw.Unlock()
for t, memChunk := range cw.activeChunks {
mem.Free(memChunk.buf)
delete(cw.activeChunks, t)
}
}