This commit is contained in:
Chris Lu 2018-05-28 14:32:16 -07:00
parent be0e88a606
commit 74332e1a61
2 changed files with 24 additions and 6 deletions

View file

@ -117,6 +117,24 @@ func TestIntervalMerging(t *testing.T) {
{start: 0, stop: 100, fileId: "abc"}, {start: 0, stop: 100, fileId: "abc"},
}, },
}, },
// case 7: real updates
{
Chunks: []*filer_pb.FileChunk{
{Offset: 0, Size: 2097152, FileId: "7,0294cbb9892b", Mtime: 123},
{Offset: 0, Size: 3145728, FileId: "3,029565bf3092", Mtime: 130},
{Offset: 2097152, Size: 3145728, FileId: "6,029632f47ae2", Mtime: 140},
{Offset: 5242880, Size: 3145728, FileId: "2,029734c5aa10", Mtime: 150},
{Offset: 8388608, Size: 3145728, FileId: "5,02982f80de50", Mtime: 160},
{Offset: 11534336, Size: 2842193, FileId: "7,0299ad723803", Mtime: 170},
},
Expected: []*visibleInterval{
{start: 0, stop: 2097152, fileId: "3,029565bf3092"},
{start: 2097152, stop: 5242880, fileId: "6,029632f47ae2"},
{start: 5242880, stop: 8388608, fileId: "2,029734c5aa10"},
{start: 8388608, stop: 11534336, fileId: "5,02982f80de50"},
{start: 11534336, stop: 14376529, fileId: "7,0299ad723803"},
},
},
} }
for i, testcase := range testcases { for i, testcase := range testcases {
@ -125,6 +143,8 @@ func TestIntervalMerging(t *testing.T) {
for x, interval := range intervals { for x, interval := range intervals {
log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
i, x, interval.start, interval.stop, interval.fileId) i, x, interval.start, interval.stop, interval.fileId)
}
for x, interval := range intervals {
if interval.start != testcase.Expected[x].start { if interval.start != testcase.Expected[x].start {
t.Fatalf("failed on test case %d, interval %d, start %d, expect %d", t.Fatalf("failed on test case %d, interval %d, start %d, expect %d",
i, x, interval.start, testcase.Expected[x].start) i, x, interval.start, testcase.Expected[x].start)

View file

@ -2,7 +2,6 @@ package filesys
import ( import (
"sync" "sync"
"sort"
"fmt" "fmt"
"bytes" "bytes"
"io" "io"
@ -63,7 +62,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
if pages.f.wfs.chunkSizeLimit > 0 && pages.totalSize() >= pages.f.wfs.chunkSizeLimit { if pages.f.wfs.chunkSizeLimit > 0 && pages.totalSize() >= pages.f.wfs.chunkSizeLimit {
chunk, err = pages.saveToStorage(ctx) chunk, err = pages.saveToStorage(ctx)
pages.pages = nil pages.pages = nil
glog.V(3).Infof("%s/%s add split [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) glog.V(3).Infof("%s/%s over size limit [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
} }
return return
} }
@ -87,6 +86,9 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f
if chunk, err = pages.saveToStorage(ctx); err == nil { if chunk, err = pages.saveToStorage(ctx); err == nil {
pages.pages = nil pages.pages = nil
if chunk != nil {
glog.V(3).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
} }
return return
} }
@ -104,10 +106,6 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context) (*filer_pb
return nil, nil return nil, nil
} }
sort.Slice(pages.pages, func(i, j int) bool {
return pages.pages[i].Offset < pages.pages[j].Offset
})
var fileId, host string var fileId, host string
if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {