mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
keep dirty pages based on temp file
This commit is contained in:
parent
c87b8f4c30
commit
9274557552
112
weed/filesys/dirty_pages_temp_file.go
Normal file
112
weed/filesys/dirty_pages_temp_file.go
Normal file
|
@ -0,0 +1,112 @@
|
||||||
|
package filesys
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TempFileDirtyPages struct {
|
||||||
|
f *File
|
||||||
|
writeWaitGroup sync.WaitGroup
|
||||||
|
pageAddLock sync.Mutex
|
||||||
|
chunkAddLock sync.Mutex
|
||||||
|
lastErr error
|
||||||
|
collection string
|
||||||
|
replication string
|
||||||
|
chunkedFile *page_writer.ChunkedFileWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages {
|
||||||
|
|
||||||
|
tempFile := &TempFileDirtyPages{
|
||||||
|
f: file,
|
||||||
|
chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
return tempFile
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
|
||||||
|
|
||||||
|
pages.pageAddLock.Lock()
|
||||||
|
defer pages.pageAddLock.Unlock()
|
||||||
|
|
||||||
|
glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data)))
|
||||||
|
if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil {
|
||||||
|
pages.lastErr = err
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *TempFileDirtyPages) FlushData() error {
|
||||||
|
pages.saveChunkedFileToStorage()
|
||||||
|
pages.writeWaitGroup.Wait()
|
||||||
|
if pages.lastErr != nil {
|
||||||
|
return fmt.Errorf("flush data: %v", pages.lastErr)
|
||||||
|
}
|
||||||
|
pages.chunkedFile.Reset()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
|
||||||
|
return pages.chunkedFile.ReadDataAt(data, startOffset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
|
||||||
|
return pages.collection, pages.replication
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *TempFileDirtyPages) saveChunkedFileToStorage() {
|
||||||
|
|
||||||
|
pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) {
|
||||||
|
reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval)
|
||||||
|
pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size())
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) {
|
||||||
|
|
||||||
|
mtime := time.Now().UnixNano()
|
||||||
|
pages.writeWaitGroup.Add(1)
|
||||||
|
writer := func() {
|
||||||
|
defer pages.writeWaitGroup.Done()
|
||||||
|
|
||||||
|
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
|
||||||
|
pages.lastErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
chunk.Mtime = mtime
|
||||||
|
pages.collection, pages.replication = collection, replication
|
||||||
|
pages.chunkAddLock.Lock()
|
||||||
|
defer pages.chunkAddLock.Unlock()
|
||||||
|
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
|
||||||
|
glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pages.f.wfs.concurrentWriters != nil {
|
||||||
|
pages.f.wfs.concurrentWriters.Execute(writer)
|
||||||
|
} else {
|
||||||
|
go writer()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages TempFileDirtyPages) Destroy() {
|
||||||
|
pages.chunkedFile.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *TempFileDirtyPages) LockForRead(startOffset, stopOffset int64) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *TempFileDirtyPages) UnlockForRead(startOffset, stopOffset int64) {
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
|
||||||
chunkSize: chunkSize,
|
chunkSize: chunkSize,
|
||||||
writerPattern: NewWriterPattern(chunkSize),
|
writerPattern: NewWriterPattern(chunkSize),
|
||||||
randomWriter: newMemoryChunkPages(fh, chunkSize),
|
randomWriter: newMemoryChunkPages(fh, chunkSize),
|
||||||
|
// randomWriter: newTempFileDirtyPages(fh.f, chunkSize),
|
||||||
}
|
}
|
||||||
return pw
|
return pw
|
||||||
}
|
}
|
||||||
|
|
159
weed/filesys/page_writer/chunked_file_writer.go
Normal file
159
weed/filesys/page_writer/chunked_file_writer.go
Normal file
|
@ -0,0 +1,159 @@
|
||||||
|
package page_writer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ActualChunkIndex int
|
||||||
|
|
||||||
|
// ChunkedFileWriter assumes the write requests will come in within chunks
|
||||||
|
type ChunkedFileWriter struct {
|
||||||
|
dir string
|
||||||
|
file *os.File
|
||||||
|
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
|
||||||
|
chunkUsages []*ChunkWrittenIntervalList
|
||||||
|
ChunkSize int64
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ = io.WriterAt(&ChunkedFileWriter{})
|
||||||
|
|
||||||
|
func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
|
||||||
|
return &ChunkedFileWriter{
|
||||||
|
dir: dir,
|
||||||
|
file: nil,
|
||||||
|
logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
|
||||||
|
ChunkSize: chunkSize,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
|
||||||
|
cw.Lock()
|
||||||
|
defer cw.Unlock()
|
||||||
|
|
||||||
|
if cw.file == nil {
|
||||||
|
cw.file, err = os.CreateTemp(cw.dir, "")
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("create temp file: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
actualOffset, chunkUsage := cw.toActualWriteOffset(off)
|
||||||
|
n, err = cw.file.WriteAt(p, actualOffset)
|
||||||
|
if err == nil {
|
||||||
|
startOffset := off % cw.ChunkSize
|
||||||
|
chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
|
||||||
|
cw.Lock()
|
||||||
|
defer cw.Unlock()
|
||||||
|
|
||||||
|
if cw.file == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logicChunkIndex := off / cw.ChunkSize
|
||||||
|
actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
|
||||||
|
if chunkUsage != nil {
|
||||||
|
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
|
||||||
|
logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset)
|
||||||
|
logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
|
||||||
|
if logicStart < logicStop {
|
||||||
|
actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
|
||||||
|
_, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("reading temp file: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
maxStop = max(maxStop, logicStop)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) {
|
||||||
|
logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
|
||||||
|
offsetRemainder := logicOffset % cw.ChunkSize
|
||||||
|
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
|
||||||
|
if found {
|
||||||
|
return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
|
||||||
|
}
|
||||||
|
cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages))
|
||||||
|
chunkUsage = newChunkWrittenIntervalList()
|
||||||
|
cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
|
||||||
|
return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) {
|
||||||
|
logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
|
||||||
|
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
|
||||||
|
if found {
|
||||||
|
return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
|
||||||
|
}
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) {
|
||||||
|
for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
|
||||||
|
chunkUsage := cw.chunkUsages[actualChunkIndex]
|
||||||
|
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
|
||||||
|
process(cw.file, logicChunkIndex, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset releases used resources
|
||||||
|
func (cw *ChunkedFileWriter) Reset() {
|
||||||
|
if cw.file != nil {
|
||||||
|
cw.file.Close()
|
||||||
|
os.Remove(cw.file.Name())
|
||||||
|
cw.file = nil
|
||||||
|
}
|
||||||
|
cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex)
|
||||||
|
cw.chunkUsages = cw.chunkUsages[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
type FileIntervalReader struct {
|
||||||
|
f *os.File
|
||||||
|
startOffset int64
|
||||||
|
stopOffset int64
|
||||||
|
position int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ = io.Reader(&FileIntervalReader{})
|
||||||
|
|
||||||
|
func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader {
|
||||||
|
actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
|
||||||
|
if !found {
|
||||||
|
// this should never happen
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &FileIntervalReader{
|
||||||
|
f: cw.file,
|
||||||
|
startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset,
|
||||||
|
stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
|
||||||
|
position: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
|
||||||
|
readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
|
||||||
|
n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
|
||||||
|
if err == nil || err == io.EOF {
|
||||||
|
fr.position += int64(n)
|
||||||
|
if fr.stopOffset-fr.startOffset-fr.position == 0 {
|
||||||
|
// return a tiny bit faster
|
||||||
|
err = io.EOF
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
60
weed/filesys/page_writer/chunked_file_writer_test.go
Normal file
60
weed/filesys/page_writer/chunked_file_writer_test.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package page_writer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestChunkedFileWriter_toActualOffset(t *testing.T) {
|
||||||
|
cw := NewChunkedFileWriter("", 16)
|
||||||
|
|
||||||
|
writeToFile(cw, 50, 60)
|
||||||
|
writeToFile(cw, 60, 64)
|
||||||
|
|
||||||
|
writeToFile(cw, 32, 40)
|
||||||
|
writeToFile(cw, 42, 48)
|
||||||
|
|
||||||
|
writeToFile(cw, 48, 50)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered")
|
||||||
|
assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) {
|
||||||
|
|
||||||
|
_, chunkUsage := cw.toActualWriteOffset(startOffset)
|
||||||
|
|
||||||
|
// skip doing actual writing
|
||||||
|
|
||||||
|
innerOffset := startOffset % cw.ChunkSize
|
||||||
|
chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteChunkedFile(t *testing.T) {
|
||||||
|
x := NewChunkedFileWriter(os.TempDir(), 20)
|
||||||
|
defer x.Reset()
|
||||||
|
y := NewChunkedFileWriter(os.TempDir(), 12)
|
||||||
|
defer y.Reset()
|
||||||
|
|
||||||
|
batchSize := 4
|
||||||
|
buf := make([]byte, batchSize)
|
||||||
|
for i := 0; i < 256; i++ {
|
||||||
|
for x := 0; x < batchSize; x++ {
|
||||||
|
buf[x] = byte(i)
|
||||||
|
}
|
||||||
|
x.WriteAt(buf, int64(i*batchSize))
|
||||||
|
y.WriteAt(buf, int64((255-i)*batchSize))
|
||||||
|
}
|
||||||
|
|
||||||
|
a := make([]byte, 1)
|
||||||
|
b := make([]byte, 1)
|
||||||
|
for i := 0; i < 256*batchSize; i++ {
|
||||||
|
x.ReadDataAt(a, int64(i))
|
||||||
|
y.ReadDataAt(b, int64(256*batchSize-1-i))
|
||||||
|
assert.Equal(t, a[0], b[0], "same read")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in a new issue