diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index d7c31bf0f..3331c3fa2 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -104,7 +104,7 @@ type ChunkView struct { FileId string Offset int64 Size uint64 - LogicOffset int64 + LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk ChunkSize uint64 CipherKey []byte IsGzipped bool @@ -139,7 +139,7 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int if chunkStart < chunkStop { views = append(views, &ChunkView{ FileId: chunk.fileId, - Offset: chunkStart - chunk.start, + Offset: chunkStart - chunk.start + chunk.chunkOffset, Size: uint64(chunkStop - chunkStart), LogicOffset: chunkStart, ChunkSize: chunk.chunkSize, @@ -170,7 +170,7 @@ var bufPool = sync.Pool{ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { - newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsCompressed) + newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed) length := len(visibles) if length == 0 { @@ -182,13 +182,13 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. } logPrintf(" before", visibles) + chunkStop := chunk.Offset + int64(chunk.Size) for _, v := range visibles { if v.start < chunk.Offset && chunk.Offset < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped)) + newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)) } - chunkStop := chunk.Offset + int64(chunk.Size) if v.start < chunkStop && chunkStop < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped)) + newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)) } if chunkStop <= v.start || v.stop <= chunk.Offset { newVisibles = append(newVisibles, v) @@ -244,17 +244,19 @@ type VisibleInterval struct { stop int64 modifiedTime int64 fileId string + chunkOffset int64 chunkSize uint64 cipherKey []byte isGzipped bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { return VisibleInterval{ start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime, + chunkOffset: chunkOffset, // the starting position in the chunk chunkSize: chunkSize, cipherKey: cipherKey, isGzipped: isGzipped, diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index 2390d4fb2..70da6e16c 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -1,12 +1,11 @@ package filer2 import ( + "fmt" "log" "math" "testing" - "fmt" - "github.com/stretchr/testify/assert" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -95,12 +94,12 @@ func TestIntervalMerging(t *testing.T) { // case 2: updates overwrite part of previous chunks { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134}, + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 70, FileId: "b", Mtime: 134}, }, Expected: []*VisibleInterval{ - {start: 0, stop: 50, fileId: "asdf"}, - {start: 50, stop: 100, fileId: "abc"}, + {start: 0, stop: 70, fileId: "b"}, + {start: 70, stop: 100, fileId: "a", chunkOffset: 70}, }, }, // case 3: updates overwrite full chunks @@ -130,14 +129,14 @@ func TestIntervalMerging(t *testing.T) { // case 5: updates overwrite full chunks { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184}, - {Offset: 70, Size: 150, FileId: "abc", Mtime: 143}, - {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "d", Mtime: 184}, + {Offset: 70, Size: 150, FileId: "c", Mtime: 143}, + {Offset: 80, Size: 100, FileId: "b", Mtime: 134}, }, Expected: []*VisibleInterval{ - {start: 0, stop: 200, fileId: "asdf"}, - {start: 200, stop: 220, fileId: "abc"}, + {start: 0, stop: 200, fileId: "d"}, + {start: 200, stop: 220, fileId: "c", chunkOffset: 130}, }, }, // case 6: same updates @@ -208,6 +207,10 @@ func TestIntervalMerging(t *testing.T) { t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s", i, x, interval.fileId, testcase.Expected[x].fileId) } + if interval.chunkOffset != testcase.Expected[x].chunkOffset { + t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d", + i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset) + } } if len(intervals) != len(testcase.Expected) { t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected)) @@ -255,14 +258,14 @@ func TestChunksReading(t *testing.T) { // case 2: updates overwrite part of previous chunks { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134}, + {Offset: 3, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 10, Size: 50, FileId: "b", Mtime: 134}, }, - Offset: 25, - Size: 50, + Offset: 30, + Size: 40, Expected: []*ChunkView{ - {Offset: 25, Size: 25, FileId: "asdf", LogicOffset: 25}, - {Offset: 0, Size: 25, FileId: "abc", LogicOffset: 50}, + {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30}, + {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60}, }, }, // case 3: updates overwrite full chunks @@ -296,16 +299,16 @@ func TestChunksReading(t *testing.T) { // case 5: updates overwrite full chunks { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184}, - {Offset: 70, Size: 150, FileId: "abc", Mtime: 143}, + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "c", Mtime: 184}, + {Offset: 70, Size: 150, FileId: "b", Mtime: 143}, {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, }, Offset: 0, Size: 220, Expected: []*ChunkView{ - {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0}, - {Offset: 0, Size: 20, FileId: "abc", LogicOffset: 200}, + {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0}, + {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200}, }, }, // case 6: same updates @@ -374,18 +377,21 @@ func TestChunksReading(t *testing.T) { } for i, testcase := range testcases { + if i != 2 { + // continue + } log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) for x, chunk := range chunks { log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", i, x, chunk.Offset, chunk.Size, chunk.FileId) if chunk.Offset != testcase.Expected[x].Offset { - t.Fatalf("failed on read case %d, chunk %d, Offset %d, expect %d", - i, x, chunk.Offset, testcase.Expected[x].Offset) + t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d", + i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset) } if chunk.Size != testcase.Expected[x].Size { - t.Fatalf("failed on read case %d, chunk %d, Size %d, expect %d", - i, x, chunk.Size, testcase.Expected[x].Size) + t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d", + i, chunk.FileId, chunk.Size, testcase.Expected[x].Size) } if chunk.FileId != testcase.Expected[x].FileId { t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s", @@ -426,18 +432,18 @@ func BenchmarkCompactFileChunks(b *testing.B) { func TestViewFromVisibleIntervals(t *testing.T) { visibles := []VisibleInterval{ { - start: 0, - stop: 25, + start: 0, + stop: 25, fileId: "fid1", }, { - start: 4096, - stop: 8192, + start: 4096, + stop: 8192, fileId: "fid2", }, { - start: 16384, - stop: 18551, + start: 16384, + stop: 18551, fileId: "fid3", }, } @@ -448,4 +454,48 @@ func TestViewFromVisibleIntervals(t *testing.T) { assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") } -} \ No newline at end of file +} + +func TestViewFromVisibleIntervals2(t *testing.T) { + visibles := []VisibleInterval{ + { + start: 344064, + stop: 348160, + fileId: "fid1", + }, + { + start: 348160, + stop: 356352, + fileId: "fid2", + }, + } + + views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) + + if len(views) != len(visibles) { + assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + } + +} + +func TestViewFromVisibleIntervals3(t *testing.T) { + visibles := []VisibleInterval{ + { + start: 1000, + stop: 2000, + fileId: "fid1", + }, + { + start: 3000, + stop: 4000, + fileId: "fid2", + }, + } + + views := ViewFromVisibleIntervals(visibles, 1700, 1500) + + if len(views) != len(visibles) { + assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + } + +} diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index b5bd85cbb..b0702e03e 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -16,7 +16,7 @@ type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews []*ChunkView buffer []byte - bufferOffset int64 + bufferFileId string lookupFileId func(fileId string) (targetUrl string, err error) readerLock sync.Mutex fileSize int64 @@ -60,7 +60,6 @@ func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []* return &ChunkReadAt{ chunkViews: chunkViews, lookupFileId: LookupFn(filerClient), - bufferOffset: -1, chunkCache: chunkCache, fileSize: fileSize, } @@ -83,71 +82,71 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { var found bool var chunkStart, chunkStop int64 + var chunkView *ChunkView for _, chunk := range c.chunkViews { - // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d), %v && %v\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size), chunk.LogicOffset <= offset, offset < chunk.LogicOffset+int64(chunk.Size)) + // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) chunkStart, chunkStop = max(chunk.LogicOffset, offset), min(chunk.LogicOffset+int64(chunk.Size), offset+int64(len(p))) + chunkView = chunk if chunkStart < chunkStop { + // fmt.Printf(">>> found [%d,%d), chunk %s [%d,%d)\n", chunkStart, chunkStop, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) found = true - if c.bufferOffset != chunk.LogicOffset { - c.buffer, err = c.fetchChunkData(chunk) - if err != nil { - glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - } - c.bufferOffset = chunk.LogicOffset + c.buffer, err = c.fetchWholeChunkData(chunk) + if err != nil { + glog.Errorf("fetching chunk %+v: %v\n", chunk, err) } + c.bufferFileId = chunk.FileId break } } - // fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d), found:%v, err:%v\n", offset, offset+int64(len(p)), c.bufferOffset, c.bufferOffset+int64(len(c.buffer)), found, err) + // fmt.Printf("> doReadAt [%d,%d), buffer %s:%d, found:%v, err:%v\n", offset, offset+int64(len(p)), c.bufferFileId, int64(len(c.buffer)), found, err) if err != nil { return } if found { - n = int(chunkStart-offset) + copy(p[chunkStart-offset:chunkStop-offset], c.buffer[chunkStart-c.bufferOffset:chunkStop-c.bufferOffset]) + bufferOffset := chunkStart - chunkView.LogicOffset + chunkView.Offset + /* + skipped, copied := chunkStart-offset, chunkStop-chunkStart + fmt.Printf("+++ copy %d+%d=%d fill:[%d, %d) p[%d,%d) <- buffer:[%d,%d) buffer %s:%d\nchunkView:%+v\n\n", + skipped, copied, skipped+copied, + chunkStart, chunkStop, + chunkStart-offset, chunkStop-offset, bufferOffset, bufferOffset+chunkStop-chunkStart, + c.bufferFileId, len(c.buffer), + chunkView) + */ + n = int(chunkStart-offset) + + copy(p[chunkStart-offset:chunkStop-offset], c.buffer[bufferOffset:bufferOffset+chunkStop-chunkStart]) return } n = len(p) if offset+int64(n) >= c.fileSize { err = io.EOF - n = int(c.fileSize - offset) } + // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err) return } -func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err error) { +func (c *ChunkReadAt) fetchWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) { - glog.V(5).Infof("fetchChunkData %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + glog.V(4).Infof("fetchWholeChunkData %s offset %d [%d,%d)\n", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) - hasDataInCache := false - chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) if chunkData != nil { - glog.V(5).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) - hasDataInCache = true + glog.V(5).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset + int64(len(chunkData))) } else { chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) if err != nil { - return nil, err + return } - } - - if int64(len(chunkData)) < chunkView.Offset+int64(chunkView.Size) { - glog.Errorf("unexpected larger cached:%v chunk %s [%d,%d) than %d", hasDataInCache, chunkView.FileId, chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData)) - return nil, fmt.Errorf("unexpected larger cached:%v chunk %s [%d,%d) than %d", hasDataInCache, chunkView.FileId, chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData)) - } - - data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)] - - if !hasDataInCache { c.chunkCache.SetChunk(chunkView.FileId, chunkData) } - return data, nil + return } func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {