more solid weed mount (#4089)

* compare chunks by timestamp

* fix slab clearing error

* fix test compilation

* move oldest chunk to sealed, instead of by fullness

* lock on fh.entryViewCache

* remove verbose logs

* revert slat clearing

* less logs

* less logs

* track write and read by timestamp

* remove useless logic

* add entry lock on file handle release

* use mem chunk only, swap file chunk has problems

* comment out code that maybe used later

* add debug mode to compare data read and write

* more efficient readResolvedChunks with linked list

* small optimization

* fix test compilation

* minor fix on writer

* add SeparateGarbageChunks

* group chunks into sections

* turn off debug mode

* fix tests

* fix tests

* tmp enable swap file chunk

* Revert "tmp enable swap file chunk"

This reverts commit 985137ec47.

* simple refactoring

* simple refactoring

* do not re-use swap file chunk. Sealed chunks should not be re-used.

* comment out debugging facilities

* either mem chunk or swap file chunk is fine now

* remove orderedMutex  as *semaphore.Weighted

not found impactful

* optimize size calculation for changing large files

* optimize performance to avoid going through the long list of chunks

* still problems with swap file chunk

* rename

* tiny optimization

* swap file chunk save only successfully read data

* fix

* enable both mem and swap file chunk

* resolve chunks with range

* rename

* fix chunk interval list

* also change file handle chunk group when adding chunks

* pick in-active chunk with time-decayed counter

* fix compilation

* avoid nil with empty fh.entry

* refactoring

* rename

* rename

* refactor visible intervals to *list.List

* refactor chunkViews to *list.List

* add IntervalList for generic interval list

* change visible interval to use IntervalList in generics

* cahnge chunkViews to *IntervalList[*ChunkView]

* use NewFileChunkSection to create

* rename variables

* refactor

* fix renaming leftover

* renaming

* renaming

* add insert interval

* interval list adds lock

* incrementally add chunks to readers

Fixes:
1. set start and stop offset for the value object
2. clone the value object
3. use pointer instead of copy-by-value when passing to interval.Value
4. use insert interval since adding chunk could be out of order

* fix tests compilation

* fix tests compilation
This commit is contained in:
Chris Lu 2023-01-02 23:20:45 -08:00 committed by GitHub
parent 367353b936
commit d4566d4aaa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
45 changed files with 1834 additions and 805 deletions

View file

@ -365,7 +365,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if flushErr != nil {
return flushErr
}
chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0))
chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0, time.Now().UnixNano()))
}
if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@ -450,7 +450,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
uploadError = fmt.Errorf("upload %v result: %v\n", fileName, uploadResult.Error)
return
}
chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize)
chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano())
fmt.Printf("uploaded %s-%d [%d,%d)\n", fileName, i+1, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
@ -530,7 +530,7 @@ func detectMimeType(f *os.File) string {
return mimeType
}
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
worker,
@ -561,7 +561,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
if uploadResult.Error != "" {
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
return uploadResult.ToPbFileChunk(finalFileId, offset), nil
return uploadResult.ToPbFileChunk(finalFileId, offset, tsNs), nil
}
var _ = filer_pb.FilerClient(&FileCopyWorker{})

View file

@ -0,0 +1,148 @@
package filer
import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"sync"
)
type ChunkGroup struct {
lookupFn wdclient.LookupFileIdFunctionType
chunkCache chunk_cache.ChunkCache
manifestChunks []*filer_pb.FileChunk
sections map[SectionIndex]*FileChunkSection
sectionsLock sync.RWMutex
}
func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
group := &ChunkGroup{
lookupFn: lookupFn,
chunkCache: chunkCache,
sections: make(map[SectionIndex]*FileChunkSection),
}
err := group.SetChunks(chunks)
return group, err
}
func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error {
group.sectionsLock.Lock()
defer group.sectionsLock.Unlock()
sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize)
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
if !found {
section = NewFileChunkSection(si)
group.sections[si] = section
}
section.addChunk(chunk)
}
return nil
}
func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
group.sectionsLock.RLock()
defer group.sectionsLock.RUnlock()
sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize)
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
if !found {
for i := rangeStart; i < rangeStop; i++ {
buff[i-offset] = 0
}
continue
}
xn, xTsNs, xErr := section.readDataAt(group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart)
if xErr != nil {
err = xErr
}
n += xn
tsNs = max(tsNs, xTsNs)
}
return
}
func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
var dataChunks []*filer_pb.FileChunk
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
dataChunks = append(dataChunks, chunk)
continue
}
resolvedChunks, err := ResolveOneChunkManifest(group.lookupFn, chunk)
if err != nil {
return err
}
group.manifestChunks = append(group.manifestChunks, chunk)
dataChunks = append(dataChunks, resolvedChunks...)
}
for _, chunk := range dataChunks {
sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize)
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
if !found {
section = NewFileChunkSection(si)
group.sections[si] = section
}
section.chunks = append(section.chunks, chunk)
}
}
return nil
}
const (
// see weedfs_file_lseek.go
SEEK_DATA uint32 = 3 // seek to next data after the offset
// SEEK_HOLE uint32 = 4 // seek to next hole after the offset
)
// FIXME: needa tests
func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) {
group.sectionsLock.RLock()
defer group.sectionsLock.RUnlock()
return group.doSearchChunks(offset, fileSize, whence)
}
func (group *ChunkGroup) doSearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) {
sectionIndex, maxSectionIndex := SectionIndex(offset/SectionSize), SectionIndex(fileSize/SectionSize)
if whence == SEEK_DATA {
for si := sectionIndex; si < maxSectionIndex+1; si++ {
section, foundSection := group.sections[si]
if !foundSection {
continue
}
sectionStart := section.DataStartOffset(group, offset, fileSize)
if sectionStart == -1 {
continue
}
return true, sectionStart
}
return false, 0
} else {
// whence == SEEK_HOLE
for si := sectionIndex; si < maxSectionIndex; si++ {
section, foundSection := group.sections[si]
if !foundSection {
return true, offset
}
holeStart := section.NextStopOffset(group, offset, fileSize)
if holeStart%SectionSize == 0 {
continue
}
return true, holeStart
}
return true, fileSize
}
}

View file

@ -0,0 +1,36 @@
package filer
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestChunkGroup_doSearchChunks(t *testing.T) {
type fields struct {
sections map[SectionIndex]*FileChunkSection
}
type args struct {
offset int64
fileSize int64
whence uint32
}
tests := []struct {
name string
fields fields
args args
wantFound bool
wantOut int64
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
group := &ChunkGroup{
sections: tt.fields.sections,
}
gotFound, gotOut := group.doSearchChunks(tt.args.offset, tt.args.fileSize, tt.args.whence)
assert.Equalf(t, tt.wantFound, gotFound, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence)
assert.Equalf(t, tt.wantOut, gotOut, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence)
})
}
}

View file

@ -264,7 +264,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer
}
}
manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0)
manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0)
if err != nil {
return nil, err
}
@ -275,4 +275,4 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer
return
}
type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error)
type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error)

View file

@ -0,0 +1,119 @@
package filer
import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"sync"
)
const SectionSize = 2 * 1024 * 1024 * 128 // 256MiB
type SectionIndex int64
type FileChunkSection struct {
sectionIndex SectionIndex
chunks []*filer_pb.FileChunk
visibleIntervals *IntervalList[*VisibleInterval]
chunkViews *IntervalList[*ChunkView]
reader *ChunkReadAt
lock sync.Mutex
}
func NewFileChunkSection(si SectionIndex) *FileChunkSection {
return &FileChunkSection{
sectionIndex: si,
}
}
func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error {
section.lock.Lock()
defer section.lock.Unlock()
start, stop := max(int64(section.sectionIndex)*SectionSize, chunk.Offset), min(((int64(section.sectionIndex)+1)*SectionSize), chunk.Offset+int64(chunk.Size))
section.chunks = append(section.chunks, chunk)
if section.visibleIntervals != nil {
MergeIntoVisibles(section.visibleIntervals, start, stop, chunk)
}
if section.visibleIntervals != nil {
section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
}
if section.chunkViews != nil {
MergeIntoChunkViews(section.chunkViews, start, stop, chunk)
}
return nil
}
func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
if section.visibleIntervals == nil {
section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
if section.reader != nil {
_ = section.reader.Close()
section.reader = nil
}
}
if section.chunkViews == nil {
section.chunkViews = ViewFromVisibleIntervals(section.visibleIntervals, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
}
if section.reader == nil {
section.reader = NewChunkReaderAtFromClient(group.lookupFn, section.chunkViews, group.chunkCache, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
}
section.reader.fileSize = fileSize
}
func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
return section.reader.ReadAtWithTime(buff, offset)
}
func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
visible := x.Value
if visible.stop <= offset {
continue
}
if offset < visible.start {
return offset
}
return offset
}
return -1
}
func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
isAfterOffset := false
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
visible := x.Value
if !isAfterOffset {
if visible.stop <= offset {
continue
}
isAfterOffset = true
}
if offset < visible.start {
return offset
}
// now visible.start <= offset
if offset < visible.stop {
offset = visible.stop
}
}
return offset
}

View file

@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"golang.org/x/exp/slices"
"math"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -42,7 +41,7 @@ func ETag(entry *filer_pb.Entry) (etag string) {
}
func ETagEntry(entry *Entry) (etag string) {
if entry.IsInRemoteOnly() {
if entry.IsInRemoteOnly() {
return entry.Remote.RemoteETag
}
if entry.Attr.Md5 == nil {
@ -66,8 +65,15 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64)
compacted, garbage = SeparateGarbageChunks(visibles, chunks)
return
}
func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*filer_pb.FileChunk) (compacted []*filer_pb.FileChunk, garbage []*filer_pb.FileChunk) {
fileIds := make(map[string]bool)
for _, interval := range visibles {
for x := visibles.Front(); x != nil; x = x.Next {
interval := x.Value
fileIds[interval.fileId] = true
}
for _, chunk := range chunks {
@ -77,8 +83,7 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks
garbage = append(garbage, chunk)
}
}
return
return compacted, garbage
}
func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
@ -131,20 +136,39 @@ func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_p
}
type ChunkView struct {
FileId string
Offset int64
Size uint64
LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
ChunkSize uint64
CipherKey []byte
IsGzipped bool
FileId string
OffsetInChunk int64 // offset within the chunk
ViewSize uint64
ViewOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
ChunkSize uint64
CipherKey []byte
IsGzipped bool
ModifiedTsNs int64
}
func (cv *ChunkView) SetStartStop(start, stop int64) {
cv.OffsetInChunk += start - cv.ViewOffset
cv.ViewOffset = start
cv.ViewSize = uint64(stop - start)
}
func (cv *ChunkView) Clone() IntervalValue {
return &ChunkView{
FileId: cv.FileId,
OffsetInChunk: cv.OffsetInChunk,
ViewSize: cv.ViewSize,
ViewOffset: cv.ViewOffset,
ChunkSize: cv.ChunkSize,
CipherKey: cv.CipherKey,
IsGzipped: cv.IsGzipped,
ModifiedTsNs: cv.ModifiedTsNs,
}
}
func (cv *ChunkView) IsFullChunk() bool {
return cv.Size == cv.ChunkSize
return cv.ViewSize == cv.ChunkSize
}
func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size)
@ -152,7 +176,7 @@ func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*
}
func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) {
func ViewFromVisibleIntervals(visibles *IntervalList[*VisibleInterval], offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
stop := offset + size
if size == math.MaxInt64 {
@ -162,164 +186,112 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
stop = math.MaxInt64
}
for _, chunk := range visibles {
chunkViews = NewIntervalList[*ChunkView]()
for x := visibles.Front(); x != nil; x = x.Next {
chunk := x.Value
chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop)
if chunkStart < chunkStop {
views = append(views, &ChunkView{
FileId: chunk.fileId,
Offset: chunkStart - chunk.start + chunk.chunkOffset,
Size: uint64(chunkStop - chunkStart),
LogicOffset: chunkStart,
ChunkSize: chunk.chunkSize,
CipherKey: chunk.cipherKey,
IsGzipped: chunk.isGzipped,
chunkView := &ChunkView{
FileId: chunk.fileId,
OffsetInChunk: chunkStart - chunk.start + chunk.offsetInChunk,
ViewSize: uint64(chunkStop - chunkStart),
ViewOffset: chunkStart,
ChunkSize: chunk.chunkSize,
CipherKey: chunk.cipherKey,
IsGzipped: chunk.isGzipped,
ModifiedTsNs: chunk.modifiedTsNs,
}
chunkViews.AppendInterval(&Interval[*ChunkView]{
StartOffset: chunkStart,
StopOffset: chunkStop,
TsNs: chunk.modifiedTsNs,
Value: chunkView,
Prev: nil,
Next: nil,
})
}
}
return views
return chunkViews
}
func logPrintf(name string, visibles []VisibleInterval) {
func MergeIntoVisibles(visibles *IntervalList[*VisibleInterval], start int64, stop int64, chunk *filer_pb.FileChunk) {
/*
glog.V(0).Infof("%s len %d", name, len(visibles))
for _, v := range visibles {
glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset)
}
*/
newV := &VisibleInterval{
start: start,
stop: stop,
fileId: chunk.GetFileIdString(),
modifiedTsNs: chunk.ModifiedTsNs,
offsetInChunk: start - chunk.Offset, // the starting position in the chunk
chunkSize: chunk.Size, // size of the chunk
cipherKey: chunk.CipherKey,
isGzipped: chunk.IsCompressed,
}
visibles.InsertInterval(start, stop, chunk.ModifiedTsNs, newV)
}
func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) {
func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop int64, chunk *filer_pb.FileChunk) {
newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.ModifiedTsNs, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed)
length := len(visibles)
if length == 0 {
return append(visibles, newV)
}
last := visibles[length-1]
if last.stop <= chunk.Offset {
return append(visibles, newV)
chunkView := &ChunkView{
FileId: chunk.GetFileIdString(),
OffsetInChunk: start - chunk.Offset,
ViewSize: uint64(stop - start),
ViewOffset: start,
ChunkSize: chunk.Size,
CipherKey: chunk.CipherKey,
IsGzipped: chunk.IsCompressed,
ModifiedTsNs: chunk.ModifiedTsNs,
}
logPrintf(" before", visibles)
// glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size)
chunkStop := chunk.Offset + int64(chunk.Size)
for _, v := range visibles {
if v.start < chunk.Offset && chunk.Offset < v.stop {
t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTsNs, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)
newVisibles = append(newVisibles, t)
// glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
}
if v.start < chunkStop && chunkStop < v.stop {
t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTsNs, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)
newVisibles = append(newVisibles, t)
// glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
}
if chunkStop <= v.start || v.stop <= chunk.Offset {
newVisibles = append(newVisibles, v)
// glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop)
}
}
newVisibles = append(newVisibles, newV)
logPrintf(" append", newVisibles)
for i := len(newVisibles) - 1; i >= 0; i-- {
if i > 0 && newV.start < newVisibles[i-1].start {
newVisibles[i] = newVisibles[i-1]
} else {
newVisibles[i] = newV
break
}
}
logPrintf(" sorted", newVisibles)
return newVisibles
chunkViews.InsertInterval(start, stop, chunk.ModifiedTsNs, chunkView)
}
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) {
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
if err != nil {
return
}
visibles2 := readResolvedChunks(chunks)
visibles2 := readResolvedChunks(chunks, 0, math.MaxInt64)
if true {
return visibles2, err
}
slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
if a.ModifiedTsNs == b.ModifiedTsNs {
filer_pb.EnsureFid(a)
filer_pb.EnsureFid(b)
if a.Fid == nil || b.Fid == nil {
return true
}
return a.Fid.FileKey < b.Fid.FileKey
}
return a.ModifiedTsNs < b.ModifiedTsNs
})
for _, chunk := range chunks {
// glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
visibles = MergeIntoVisibles(visibles, chunk)
logPrintf("add", visibles)
}
if len(visibles) != len(visibles2) {
fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2))
} else {
for i := 0; i < len(visibles); i++ {
checkDifference(visibles[i], visibles2[i])
}
}
return
}
func checkDifference(x, y VisibleInterval) {
if x.start != y.start ||
x.stop != y.stop ||
x.fileId != y.fileId ||
x.modifiedTsNs != y.modifiedTsNs {
fmt.Printf("different visible %+v : %+v\n", x, y)
}
return visibles2, err
}
// find non-overlapping visible intervals
// visible interval map to one file chunk
type VisibleInterval struct {
start int64
stop int64
modifiedTsNs int64
fileId string
chunkOffset int64
chunkSize uint64
cipherKey []byte
isGzipped bool
start int64
stop int64
modifiedTsNs int64
fileId string
offsetInChunk int64
chunkSize uint64
cipherKey []byte
isGzipped bool
}
func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
return VisibleInterval{
start: start,
stop: stop,
fileId: fileId,
modifiedTsNs: modifiedTime,
chunkOffset: chunkOffset, // the starting position in the chunk
chunkSize: chunkSize,
cipherKey: cipherKey,
isGzipped: isGzipped,
func (v *VisibleInterval) SetStartStop(start, stop int64) {
v.offsetInChunk += start - v.start
v.start, v.stop = start, stop
}
func (v *VisibleInterval) Clone() IntervalValue {
return &VisibleInterval{
start: v.start,
stop: v.stop,
modifiedTsNs: v.modifiedTsNs,
fileId: v.fileId,
offsetInChunk: v.offsetInChunk,
chunkSize: v.chunkSize,
cipherKey: v.cipherKey,
isGzipped: v.isGzipped,
}
}

View file

@ -1,14 +1,22 @@
package filer
import (
"container/list"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"golang.org/x/exp/slices"
)
func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
func readResolvedChunks(chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval]) {
var points []*Point
for _, chunk := range chunks {
if chunk.IsChunkManifest {
println("This should not happen! A manifest chunk found:", chunk.GetFileIdString())
}
start, stop := max(chunk.Offset, startOffset), min(chunk.Offset+int64(chunk.Size), stopOffset)
if start >= stop {
continue
}
points = append(points, &Point{
x: chunk.Offset,
ts: chunk.ModifiedTsNs,
@ -33,40 +41,45 @@ func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterva
})
var prevX int64
var queue []*Point
queue := list.New() // points with higher ts are at the tail
visibles = NewIntervalList[*VisibleInterval]()
var prevPoint *Point
for _, point := range points {
if queue.Len() > 0 {
prevPoint = queue.Back().Value.(*Point)
} else {
prevPoint = nil
}
if point.isStart {
if len(queue) > 0 {
lastIndex := len(queue) - 1
lastPoint := queue[lastIndex]
if point.x != prevX && lastPoint.ts < point.ts {
visibles = addToVisibles(visibles, prevX, lastPoint, point)
if prevPoint != nil {
if point.x != prevX && prevPoint.ts < point.ts {
addToVisibles(visibles, prevX, prevPoint, point)
prevX = point.x
}
}
// insert into queue
for i := len(queue); i >= 0; i-- {
if i == 0 || queue[i-1].ts <= point.ts {
if i == len(queue) {
prevX = point.x
if prevPoint == nil || prevPoint.ts < point.ts {
queue.PushBack(point)
prevX = point.x
} else {
for e := queue.Front(); e != nil; e = e.Next() {
if e.Value.(*Point).ts > point.ts {
queue.InsertBefore(point, e)
break
}
queue = addToQueue(queue, i, point)
break
}
}
} else {
lastIndex := len(queue) - 1
index := lastIndex
var startPoint *Point
for ; index >= 0; index-- {
startPoint = queue[index]
if startPoint.ts == point.ts {
queue = removeFromQueue(queue, index)
isLast := true
for e := queue.Back(); e != nil; e = e.Prev() {
if e.Value.(*Point).ts == point.ts {
queue.Remove(e)
break
}
isLast = false
}
if index == lastIndex && startPoint != nil {
visibles = addToVisibles(visibles, prevX, startPoint, point)
if isLast && prevPoint != nil {
addToVisibles(visibles, prevX, prevPoint, point)
prevX = point.x
}
}
@ -75,37 +88,30 @@ func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterva
return
}
func removeFromQueue(queue []*Point, index int) []*Point {
for i := index; i < len(queue)-1; i++ {
queue[i] = queue[i+1]
}
queue = queue[:len(queue)-1]
return queue
}
func addToQueue(queue []*Point, index int, point *Point) []*Point {
queue = append(queue, point)
for i := len(queue) - 1; i > index; i-- {
queue[i], queue[i-1] = queue[i-1], queue[i]
}
return queue
}
func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval {
func addToVisibles(visibles *IntervalList[*VisibleInterval], prevX int64, startPoint *Point, point *Point) {
if prevX < point.x {
chunk := startPoint.chunk
visibles = append(visibles, VisibleInterval{
start: prevX,
stop: point.x,
fileId: chunk.GetFileIdString(),
modifiedTsNs: chunk.ModifiedTsNs,
chunkOffset: prevX - chunk.Offset,
chunkSize: chunk.Size,
cipherKey: chunk.CipherKey,
isGzipped: chunk.IsCompressed,
})
visible := &VisibleInterval{
start: prevX,
stop: point.x,
fileId: chunk.GetFileIdString(),
modifiedTsNs: chunk.ModifiedTsNs,
offsetInChunk: prevX - chunk.Offset,
chunkSize: chunk.Size,
cipherKey: chunk.CipherKey,
isGzipped: chunk.IsCompressed,
}
appendVisibleInterfal(visibles, visible)
}
return visibles
}
func appendVisibleInterfal(visibles *IntervalList[*VisibleInterval], visible *VisibleInterval) {
visibles.AppendInterval(&Interval[*VisibleInterval]{
StartOffset: visible.start,
StopOffset: visible.stop,
TsNs: visible.modifiedTsNs,
Value: visible,
})
}
type Point struct {

View file

@ -3,6 +3,7 @@ package filer
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"math"
"math/rand"
"testing"
)
@ -42,9 +43,38 @@ func TestReadResolvedChunks(t *testing.T) {
},
}
visibles := readResolvedChunks(chunks)
visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
for _, visible := range visibles {
fmt.Printf("resolved to %d visible intervales\n", visibles.Len())
for x := visibles.Front(); x != nil; x = x.Next {
visible := x.Value
fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs)
}
}
func TestReadResolvedChunks2(t *testing.T) {
chunks := []*filer_pb.FileChunk{
{
FileId: "c",
Offset: 200,
Size: 50,
ModifiedTsNs: 3,
},
{
FileId: "e",
Offset: 200,
Size: 25,
ModifiedTsNs: 5,
},
}
visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
fmt.Printf("resolved to %d visible intervales\n", visibles.Len())
for x := visibles.Front(); x != nil; x = x.Next {
visible := x.Value
fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs)
}
@ -72,9 +102,10 @@ func TestRandomizedReadResolvedChunks(t *testing.T) {
chunks = append(chunks, randomWrite(array, start, size, ts))
}
visibles := readResolvedChunks(chunks)
visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
for _, visible := range visibles {
for x := visibles.Front(); x != nil; x = x.Next {
visible := x.Value
for i := visible.start; i < visible.stop; i++ {
if array[i] != visible.modifiedTsNs {
t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTsNs)
@ -112,9 +143,9 @@ func TestSequentialReadResolvedChunks(t *testing.T) {
})
}
visibles := readResolvedChunks(chunks)
visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
fmt.Printf("visibles %d", len(visibles))
fmt.Printf("visibles %d", visibles.Len())
}
@ -201,9 +232,48 @@ func TestActualReadResolvedChunks(t *testing.T) {
},
}
visibles := readResolvedChunks(chunks)
visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
for _, visible := range visibles {
for x := visibles.Front(); x != nil; x = x.Next {
visible := x.Value
fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs)
}
}
func TestActualReadResolvedChunks2(t *testing.T) {
chunks := []*filer_pb.FileChunk{
{
FileId: "1,e7b96fef48",
Offset: 0,
Size: 184320,
ModifiedTsNs: 1,
},
{
FileId: "2,22562640b9",
Offset: 184320,
Size: 4096,
ModifiedTsNs: 2,
},
{
FileId: "2,33562640b9",
Offset: 184320,
Size: 4096,
ModifiedTsNs: 4,
},
{
FileId: "4,df033e0fe4",
Offset: 188416,
Size: 2097152,
ModifiedTsNs: 3,
},
}
visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
for x := visibles.Front(); x != nil; x = x.Next {
visible := x.Value
fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs)
}

View file

@ -92,7 +92,8 @@ func TestRandomFileChunksCompact(t *testing.T) {
visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64)
for _, v := range visibles {
for visible := visibles.Front(); visible != nil; visible = visible.Next {
v := visible.Value
for x := v.start; x < v.stop; x++ {
assert.Equal(t, strconv.Itoa(int(data[x])), v.fileId)
}
@ -137,7 +138,7 @@ func TestIntervalMerging(t *testing.T) {
},
Expected: []*VisibleInterval{
{start: 0, stop: 70, fileId: "b"},
{start: 70, stop: 100, fileId: "a", chunkOffset: 70},
{start: 70, stop: 100, fileId: "a", offsetInChunk: 70},
},
},
// case 3: updates overwrite full chunks
@ -174,15 +175,15 @@ func TestIntervalMerging(t *testing.T) {
},
Expected: []*VisibleInterval{
{start: 0, stop: 200, fileId: "d"},
{start: 200, stop: 220, fileId: "c", chunkOffset: 130},
{start: 200, stop: 220, fileId: "c", offsetInChunk: 130},
},
},
// case 6: same updates
{
Chunks: []*filer_pb.FileChunk{
{Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, ModifiedTsNs: 123},
{Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 123},
{Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 123},
{Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 124},
{Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 125},
},
Expected: []*VisibleInterval{
{start: 0, stop: 100, fileId: "xyz"},
@ -228,11 +229,17 @@ func TestIntervalMerging(t *testing.T) {
for i, testcase := range testcases {
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64)
for x, interval := range intervals {
log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
i, x, interval.start, interval.stop, interval.fileId)
x := -1
for visible := intervals.Front(); visible != nil; visible = visible.Next {
x++
interval := visible.Value
log.Printf("test case %d, interval start=%d, stop=%d, fileId=%s",
i, interval.start, interval.stop, interval.fileId)
}
for x, interval := range intervals {
x = -1
for visible := intervals.Front(); visible != nil; visible = visible.Next {
x++
interval := visible.Value
if interval.start != testcase.Expected[x].start {
t.Fatalf("failed on test case %d, interval %d, start %d, expect %d",
i, x, interval.start, testcase.Expected[x].start)
@ -245,13 +252,13 @@ func TestIntervalMerging(t *testing.T) {
t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s",
i, x, interval.fileId, testcase.Expected[x].fileId)
}
if interval.chunkOffset != testcase.Expected[x].chunkOffset {
t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d",
i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset)
if interval.offsetInChunk != testcase.Expected[x].offsetInChunk {
t.Fatalf("failed on test case %d, interval %d, offsetInChunk %d, expect %d",
i, x, interval.offsetInChunk, testcase.Expected[x].offsetInChunk)
}
}
if len(intervals) != len(testcase.Expected) {
t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected))
if intervals.Len() != len(testcase.Expected) {
t.Fatalf("failed to compact test case %d, len %d expected %d", i, intervals.Len(), len(testcase.Expected))
}
}
@ -276,9 +283,9 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 250,
Expected: []*ChunkView{
{Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
{Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
{Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200},
{OffsetInChunk: 0, ViewSize: 100, FileId: "abc", ViewOffset: 0},
{OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 100},
{OffsetInChunk: 0, ViewSize: 50, FileId: "fsad", ViewOffset: 200},
},
},
// case 1: updates overwrite full chunks
@ -290,7 +297,7 @@ func TestChunksReading(t *testing.T) {
Offset: 50,
Size: 100,
Expected: []*ChunkView{
{Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50},
{OffsetInChunk: 50, ViewSize: 100, FileId: "asdf", ViewOffset: 50},
},
},
// case 2: updates overwrite part of previous chunks
@ -302,8 +309,8 @@ func TestChunksReading(t *testing.T) {
Offset: 30,
Size: 40,
Expected: []*ChunkView{
{Offset: 20, Size: 30, FileId: "b", LogicOffset: 30},
{Offset: 57, Size: 10, FileId: "a", LogicOffset: 60},
{OffsetInChunk: 20, ViewSize: 30, FileId: "b", ViewOffset: 30},
{OffsetInChunk: 57, ViewSize: 10, FileId: "a", ViewOffset: 60},
},
},
// case 3: updates overwrite full chunks
@ -316,8 +323,8 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 200,
Expected: []*ChunkView{
{Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0},
{Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50},
{OffsetInChunk: 0, ViewSize: 50, FileId: "asdf", ViewOffset: 0},
{OffsetInChunk: 0, ViewSize: 150, FileId: "xxxx", ViewOffset: 50},
},
},
// case 4: updates far away from prev chunks
@ -330,8 +337,8 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 400,
Expected: []*ChunkView{
{Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0},
{Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250},
{OffsetInChunk: 0, ViewSize: 200, FileId: "asdf", ViewOffset: 0},
{OffsetInChunk: 0, ViewSize: 150, FileId: "xxxx", ViewOffset: 250},
},
},
// case 5: updates overwrite full chunks
@ -345,21 +352,21 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 220,
Expected: []*ChunkView{
{Offset: 0, Size: 200, FileId: "c", LogicOffset: 0},
{Offset: 130, Size: 20, FileId: "b", LogicOffset: 200},
{OffsetInChunk: 0, ViewSize: 200, FileId: "c", ViewOffset: 0},
{OffsetInChunk: 130, ViewSize: 20, FileId: "b", ViewOffset: 200},
},
},
// case 6: same updates
{
Chunks: []*filer_pb.FileChunk{
{Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, ModifiedTsNs: 123},
{Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 123},
{Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 123},
{Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 124},
{Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 125},
},
Offset: 0,
Size: 100,
Expected: []*ChunkView{
{Offset: 0, Size: 100, FileId: "xyz", LogicOffset: 0},
{OffsetInChunk: 0, ViewSize: 100, FileId: "xyz", ViewOffset: 0},
},
},
// case 7: edge cases
@ -372,8 +379,8 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 200,
Expected: []*ChunkView{
{Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
{Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
{OffsetInChunk: 0, ViewSize: 100, FileId: "abc", ViewOffset: 0},
{OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 100},
},
},
// case 8: edge cases
@ -386,9 +393,9 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 300,
Expected: []*ChunkView{
{Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0},
{Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90},
{Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190},
{OffsetInChunk: 0, ViewSize: 90, FileId: "abc", ViewOffset: 0},
{OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 90},
{OffsetInChunk: 0, ViewSize: 110, FileId: "fsad", ViewOffset: 190},
},
},
// case 9: edge cases
@ -404,12 +411,12 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 153578836,
Expected: []*ChunkView{
{Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0},
{Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936},
{Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760},
{Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736},
{Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168},
{Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248},
{OffsetInChunk: 0, ViewSize: 43175936, FileId: "2,111fc2cbfac1", ViewOffset: 0},
{OffsetInChunk: 0, ViewSize: 52981760 - 43175936, FileId: "2,112a36ea7f85", ViewOffset: 43175936},
{OffsetInChunk: 0, ViewSize: 72564736 - 52981760, FileId: "4,112d5f31c5e7", ViewOffset: 52981760},
{OffsetInChunk: 0, ViewSize: 133255168 - 72564736, FileId: "1,113245f0cdb6", ViewOffset: 72564736},
{OffsetInChunk: 0, ViewSize: 137269248 - 133255168, FileId: "3,1141a70733b5", ViewOffset: 133255168},
{OffsetInChunk: 0, ViewSize: 153578836 - 137269248, FileId: "1,114201d5bbdb", ViewOffset: 137269248},
},
},
}
@ -420,28 +427,31 @@ func TestChunksReading(t *testing.T) {
}
log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
for x, chunk := range chunks {
x := -1
for c := chunks.Front(); c != nil; c = c.Next {
x++
chunk := c.Value
log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
i, x, chunk.Offset, chunk.Size, chunk.FileId)
if chunk.Offset != testcase.Expected[x].Offset {
i, x, chunk.OffsetInChunk, chunk.ViewSize, chunk.FileId)
if chunk.OffsetInChunk != testcase.Expected[x].OffsetInChunk {
t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d",
i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset)
i, chunk.FileId, chunk.OffsetInChunk, testcase.Expected[x].OffsetInChunk)
}
if chunk.Size != testcase.Expected[x].Size {
t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d",
i, chunk.FileId, chunk.Size, testcase.Expected[x].Size)
if chunk.ViewSize != testcase.Expected[x].ViewSize {
t.Fatalf("failed on read case %d, chunk %s, ViewSize %d, expect %d",
i, chunk.FileId, chunk.ViewSize, testcase.Expected[x].ViewSize)
}
if chunk.FileId != testcase.Expected[x].FileId {
t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s",
i, x, chunk.FileId, testcase.Expected[x].FileId)
}
if chunk.LogicOffset != testcase.Expected[x].LogicOffset {
t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d",
i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset)
if chunk.ViewOffset != testcase.Expected[x].ViewOffset {
t.Fatalf("failed on read case %d, chunk %d, ViewOffset %d, expect %d",
i, x, chunk.ViewOffset, testcase.Expected[x].ViewOffset)
}
}
if len(chunks) != len(testcase.Expected) {
t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected))
if chunks.Len() != len(testcase.Expected) {
t.Fatalf("failed to read test case %d, len %d expected %d", i, chunks.Len(), len(testcase.Expected))
}
}
@ -467,73 +477,79 @@ func BenchmarkCompactFileChunks(b *testing.B) {
}
}
func addVisibleInterval(visibles *IntervalList[*VisibleInterval], x *VisibleInterval) {
visibles.AppendInterval(&Interval[*VisibleInterval]{
StartOffset: x.start,
StopOffset: x.stop,
TsNs: x.modifiedTsNs,
Value: x,
})
}
func TestViewFromVisibleIntervals(t *testing.T) {
visibles := []VisibleInterval{
{
start: 0,
stop: 25,
fileId: "fid1",
},
{
start: 4096,
stop: 8192,
fileId: "fid2",
},
{
start: 16384,
stop: 18551,
fileId: "fid3",
},
}
visibles := NewIntervalList[*VisibleInterval]()
addVisibleInterval(visibles, &VisibleInterval{
start: 0,
stop: 25,
fileId: "fid1",
})
addVisibleInterval(visibles, &VisibleInterval{
start: 4096,
stop: 8192,
fileId: "fid2",
})
addVisibleInterval(visibles, &VisibleInterval{
start: 16384,
stop: 18551,
fileId: "fid3",
})
views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
if len(views) != len(visibles) {
assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
if views.Len() != visibles.Len() {
assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error")
}
}
func TestViewFromVisibleIntervals2(t *testing.T) {
visibles := []VisibleInterval{
{
start: 344064,
stop: 348160,
fileId: "fid1",
},
{
start: 348160,
stop: 356352,
fileId: "fid2",
},
}
visibles := NewIntervalList[*VisibleInterval]()
addVisibleInterval(visibles, &VisibleInterval{
start: 344064,
stop: 348160,
fileId: "fid1",
})
addVisibleInterval(visibles, &VisibleInterval{
start: 348160,
stop: 356352,
fileId: "fid2",
})
views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
if len(views) != len(visibles) {
assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
if views.Len() != visibles.Len() {
assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error")
}
}
func TestViewFromVisibleIntervals3(t *testing.T) {
visibles := []VisibleInterval{
{
start: 1000,
stop: 2000,
fileId: "fid1",
},
{
start: 3000,
stop: 4000,
fileId: "fid2",
},
}
visibles := NewIntervalList[*VisibleInterval]()
addVisibleInterval(visibles, &VisibleInterval{
start: 1000,
stop: 2000,
fileId: "fid1",
})
addVisibleInterval(visibles, &VisibleInterval{
start: 3000,
stop: 4000,
fileId: "fid2",
})
views := ViewFromVisibleIntervals(visibles, 1700, 1500)
if len(views) != len(visibles) {
assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
if views.Len() != visibles.Len() {
assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error")
}
}

View file

@ -40,7 +40,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
}
// append to existing chunks
entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset))
entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset, time.Now().UnixNano()))
// update the entry
err = f.CreateEntry(context.Background(), entry, false, false, nil, false)

259
weed/filer/interval_list.go Normal file
View file

@ -0,0 +1,259 @@
package filer
import (
"math"
"sync"
)
type IntervalValue interface {
SetStartStop(start, stop int64)
Clone() IntervalValue
}
type Interval[T IntervalValue] struct {
StartOffset int64
StopOffset int64
TsNs int64
Value T
Prev *Interval[T]
Next *Interval[T]
}
func (interval *Interval[T]) Size() int64 {
return interval.StopOffset - interval.StartOffset
}
// IntervalList mark written intervals within one page chunk
type IntervalList[T IntervalValue] struct {
head *Interval[T]
tail *Interval[T]
Lock sync.Mutex
}
func NewIntervalList[T IntervalValue]() *IntervalList[T] {
list := &IntervalList[T]{
head: &Interval[T]{
StartOffset: -1,
StopOffset: -1,
},
tail: &Interval[T]{
StartOffset: math.MaxInt64,
StopOffset: math.MaxInt64,
},
}
return list
}
func (list *IntervalList[T]) Front() (interval *Interval[T]) {
return list.head.Next
}
func (list *IntervalList[T]) AppendInterval(interval *Interval[T]) {
list.Lock.Lock()
defer list.Lock.Unlock()
if list.head.Next == nil {
list.head.Next = interval
}
interval.Prev = list.tail.Prev
if list.tail.Prev != nil {
list.tail.Prev.Next = interval
}
list.tail.Prev = interval
}
func (list *IntervalList[T]) Overlay(startOffset, stopOffset, tsNs int64, value T) {
if startOffset >= stopOffset {
return
}
interval := &Interval[T]{
StartOffset: startOffset,
StopOffset: stopOffset,
TsNs: tsNs,
Value: value,
}
list.Lock.Lock()
defer list.Lock.Unlock()
list.overlayInterval(interval)
}
func (list *IntervalList[T]) InsertInterval(startOffset, stopOffset, tsNs int64, value T) {
interval := &Interval[T]{
StartOffset: startOffset,
StopOffset: stopOffset,
TsNs: tsNs,
Value: value,
}
list.Lock.Lock()
defer list.Lock.Unlock()
value.SetStartStop(startOffset, stopOffset)
list.insertInterval(interval)
}
func (list *IntervalList[T]) insertInterval(interval *Interval[T]) {
prev := list.head
next := prev.Next
for interval.StartOffset < interval.StopOffset {
if next == nil {
// add to the end
list.insertBetween(prev, interval, list.tail)
break
}
// interval is ahead of the next
if interval.StopOffset <= next.StartOffset {
list.insertBetween(prev, interval, next)
break
}
// interval is after the next
if next.StopOffset <= interval.StartOffset {
prev = next
next = next.Next
continue
}
// intersecting next and interval
if interval.TsNs >= next.TsNs {
// interval is newer
if next.StartOffset < interval.StartOffset {
// left side of next is ahead of interval
t := &Interval[T]{
StartOffset: next.StartOffset,
StopOffset: interval.StartOffset,
TsNs: next.TsNs,
Value: next.Value.Clone().(T),
}
t.Value.SetStartStop(t.StartOffset, t.StopOffset)
list.insertBetween(prev, t, interval)
next.StartOffset = interval.StartOffset
next.Value.SetStartStop(next.StartOffset, next.StopOffset)
prev = t
}
if interval.StopOffset < next.StopOffset {
// right side of next is after interval
next.StartOffset = interval.StopOffset
next.Value.SetStartStop(next.StartOffset, next.StopOffset)
list.insertBetween(prev, interval, next)
break
} else {
// next is covered
prev.Next = interval
next = next.Next
}
} else {
// next is newer
if interval.StartOffset < next.StartOffset {
// left side of interval is ahead of next
t := &Interval[T]{
StartOffset: interval.StartOffset,
StopOffset: next.StartOffset,
TsNs: interval.TsNs,
Value: interval.Value.Clone().(T),
}
t.Value.SetStartStop(t.StartOffset, t.StopOffset)
list.insertBetween(prev, t, next)
interval.StartOffset = next.StartOffset
interval.Value.SetStartStop(interval.StartOffset, interval.StopOffset)
}
if next.StopOffset < interval.StopOffset {
// right side of interval is after next
interval.StartOffset = next.StopOffset
interval.Value.SetStartStop(interval.StartOffset, interval.StopOffset)
} else {
// interval is covered
break
}
}
}
}
func (list *IntervalList[T]) insertBetween(a, interval, b *Interval[T]) {
a.Next = interval
b.Prev = interval
if a != list.head {
interval.Prev = a
}
if b != list.tail {
interval.Next = b
}
}
func (list *IntervalList[T]) overlayInterval(interval *Interval[T]) {
//t := list.head
//for ; t.Next != nil; t = t.Next {
// if t.TsNs > interval.TsNs {
// println("writes is out of order", t.TsNs-interval.TsNs, "ns")
// }
//}
p := list.head
for ; p.Next != nil && p.Next.StopOffset <= interval.StartOffset; p = p.Next {
}
q := list.tail
for ; q.Prev != nil && q.Prev.StartOffset >= interval.StopOffset; q = q.Prev {
}
// left side
// interval after p.Next start
if p.Next != nil && p.Next.StartOffset < interval.StartOffset {
t := &Interval[T]{
StartOffset: p.Next.StartOffset,
StopOffset: interval.StartOffset,
TsNs: p.Next.TsNs,
Value: p.Next.Value,
}
p.Next = t
if p != list.head {
t.Prev = p
}
t.Next = interval
interval.Prev = t
} else {
p.Next = interval
if p != list.head {
interval.Prev = p
}
}
// right side
// interval ends before p.Prev
if q.Prev != nil && interval.StopOffset < q.Prev.StopOffset {
t := &Interval[T]{
StartOffset: interval.StopOffset,
StopOffset: q.Prev.StopOffset,
TsNs: q.Prev.TsNs,
Value: q.Prev.Value,
}
q.Prev = t
if q != list.tail {
t.Next = q
}
interval.Next = t
t.Prev = interval
} else {
q.Prev = interval
if q != list.tail {
interval.Next = q
}
}
}
func (list *IntervalList[T]) Len() int {
list.Lock.Lock()
defer list.Lock.Unlock()
var count int
for t := list.head; t != nil; t = t.Next {
count++
}
return count - 1
}

View file

@ -0,0 +1,327 @@
package filer
import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)
type IntervalInt int
func (i IntervalInt) SetStartStop(start, stop int64) {
}
func (i IntervalInt) Clone() IntervalValue {
return i
}
func TestIntervalList_Overlay(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(0, 100, 1, 1)
list.Overlay(50, 150, 2, 2)
list.Overlay(200, 250, 3, 3)
list.Overlay(225, 250, 4, 4)
list.Overlay(175, 210, 5, 5)
list.Overlay(0, 25, 6, 6)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 6, list.Len())
println()
list.Overlay(50, 150, 7, 7)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 6, list.Len())
}
func TestIntervalList_Overlay2(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
list.Overlay(0, 50, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
}
func TestIntervalList_Overlay3(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
assert.Equal(t, 1, list.Len())
list.Overlay(0, 60, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_Overlay4(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
list.Overlay(0, 100, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 1, list.Len())
}
func TestIntervalList_Overlay5(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
list.Overlay(0, 110, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 1, list.Len())
}
func TestIntervalList_Overlay6(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
list.Overlay(50, 110, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 1, list.Len())
}
func TestIntervalList_Overlay7(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
list.Overlay(50, 90, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_Overlay8(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
list.Overlay(60, 90, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 3, list.Len())
}
func TestIntervalList_Overlay9(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
list.Overlay(60, 100, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_Overlay10(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(50, 100, 1, 1)
list.Overlay(60, 110, 2, 2)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_Overlay11(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.Overlay(0, 100, 1, 1)
list.Overlay(100, 110, 2, 2)
list.Overlay(0, 90, 3, 3)
list.Overlay(0, 80, 4, 4)
list.Overlay(0, 90, 5, 5)
list.Overlay(90, 90, 6, 6)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 3, list.Len())
}
func TestIntervalList_insertInterval1(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(50, 150, 2, 2)
list.InsertInterval(200, 250, 3, 3)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_insertInterval2(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(50, 150, 2, 2)
list.InsertInterval(0, 25, 3, 3)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_insertInterval3(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(50, 150, 2, 2)
list.InsertInterval(200, 250, 4, 4)
list.InsertInterval(0, 75, 3, 3)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 3, list.Len())
}
func TestIntervalList_insertInterval4(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(200, 250, 4, 4)
list.InsertInterval(0, 225, 3, 3)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_insertInterval5(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(200, 250, 4, 4)
list.InsertInterval(0, 225, 5, 5)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_insertInterval6(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(50, 150, 2, 2)
list.InsertInterval(200, 250, 4, 4)
list.InsertInterval(0, 275, 1, 1)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 5, list.Len())
}
func TestIntervalList_insertInterval7(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(50, 150, 2, 2)
list.InsertInterval(200, 250, 4, 4)
list.InsertInterval(75, 275, 1, 1)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 4, list.Len())
}
func TestIntervalList_insertInterval8(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(50, 150, 2, 2)
list.InsertInterval(200, 250, 4, 4)
list.InsertInterval(75, 275, 3, 3)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 4, list.Len())
}
func TestIntervalList_insertInterval9(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(50, 150, 2, 2)
list.InsertInterval(200, 250, 4, 4)
list.InsertInterval(50, 150, 3, 3)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 2, list.Len())
}
func TestIntervalList_insertInterval10(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(50, 100, 2, 2)
list.InsertInterval(200, 300, 4, 4)
list.InsertInterval(100, 200, 5, 5)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 3, list.Len())
}
func TestIntervalList_insertInterval11(t *testing.T) {
list := NewIntervalList[IntervalInt]()
list.InsertInterval(0, 64, 1, 1)
list.InsertInterval(72, 136, 3, 3)
list.InsertInterval(64, 128, 2, 2)
list.InsertInterval(68, 72, 4, 4)
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 4, list.Len())
}
type IntervalStruct struct {
x int
start int64
stop int64
}
func newIntervalStruct(i int) IntervalStruct {
return IntervalStruct{
x: i,
}
}
func (i IntervalStruct) SetStartStop(start, stop int64) {
i.start, i.stop = start, stop
}
func (i IntervalStruct) Clone() IntervalValue {
return &IntervalStruct{
x: i.x,
start: i.start,
stop: i.stop,
}
}
func TestIntervalList_insertIntervalStruct(t *testing.T) {
list := NewIntervalList[IntervalStruct]()
list.InsertInterval(0, 64, 1, newIntervalStruct(1))
list.InsertInterval(64, 72, 2, newIntervalStruct(2))
list.InsertInterval(72, 136, 3, newIntervalStruct(3))
list.InsertInterval(64, 68, 4, newIntervalStruct(4))
for p := list.Front(); p != nil; p = p.Next {
fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
}
assert.Equal(t, 4, list.Len())
}

View file

@ -16,8 +16,7 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
readerLock sync.Mutex
chunkViews *IntervalList[*ChunkView]
fileSize int64
readerCache *ReaderCache
readerPattern *ReaderPattern
@ -89,7 +88,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
}
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews *IntervalList[*ChunkView], chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
@ -108,44 +107,58 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerPattern.MonitorReadAt(offset, len(p))
c.readerLock.Lock()
defer c.readerLock.Unlock()
c.chunkViews.Lock.Lock()
defer c.chunkViews.Lock.Unlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
n, _, err = c.doReadAt(p, offset)
return
}
func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error) {
c.readerPattern.MonitorReadAt(offset, len(p))
c.chunkViews.Lock.Lock()
defer c.chunkViews.Lock.Unlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err error) {
startOffset, remaining := offset, int64(len(p))
var nextChunks []*ChunkView
for i, chunk := range c.chunkViews {
var nextChunks *Interval[*ChunkView]
for x := c.chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
if remaining <= 0 {
break
}
if i+1 < len(c.chunkViews) {
nextChunks = c.chunkViews[i+1:]
if x.Next != nil {
nextChunks = x.Next
}
if startOffset < chunk.LogicOffset {
gap := chunk.LogicOffset - startOffset
glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.LogicOffset)
if startOffset < chunk.ViewOffset {
gap := chunk.ViewOffset - startOffset
glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset)
n += zero(p, startOffset-offset, gap)
startOffset, remaining = chunk.LogicOffset, remaining-gap
startOffset, remaining = chunk.ViewOffset, remaining-gap
if remaining <= 0 {
break
}
}
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining)
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining)
if chunkStart >= chunkStop {
continue
}
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize))
bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk
ts = chunk.ModifiedTsNs
copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
return copied, err
return copied, ts, err
}
n += copied
@ -177,7 +190,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) {
func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) {
if c.readerPattern.IsRandomMode() {
n, err := c.readerCache.chunkCache.ReadChunkAt(buffer, chunkView.FileId, offset)
@ -187,16 +200,14 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next
return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
}
n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.ViewOffset == 0)
if c.lastChunkFid != chunkView.FileId {
if chunkView.Offset == 0 { // start of a new chunk
if chunkView.OffsetInChunk == 0 { // start of a new chunk
if c.lastChunkFid != "" {
c.readerCache.UnCache(c.lastChunkFid)
c.readerCache.MaybeCache(nextChunkViews)
} else {
if len(nextChunkViews) >= 1 {
c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning
}
}
if nextChunkViews != nil {
c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning
}
}
}

View file

@ -5,7 +5,6 @@ import (
"io"
"math"
"strconv"
"sync"
"testing"
)
@ -34,42 +33,40 @@ func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
func TestReaderAt(t *testing.T) {
visibles := []VisibleInterval{
{
start: 1,
stop: 2,
fileId: "1",
chunkSize: 9,
},
{
start: 3,
stop: 4,
fileId: "3",
chunkSize: 1,
},
{
start: 5,
stop: 6,
fileId: "5",
chunkSize: 2,
},
{
start: 7,
stop: 9,
fileId: "7",
chunkSize: 2,
},
{
start: 9,
stop: 10,
fileId: "9",
chunkSize: 2,
},
}
visibles := NewIntervalList[*VisibleInterval]()
addVisibleInterval(visibles, &VisibleInterval{
start: 1,
stop: 2,
fileId: "1",
chunkSize: 9,
})
addVisibleInterval(visibles, &VisibleInterval{
start: 3,
stop: 4,
fileId: "3",
chunkSize: 1,
})
addVisibleInterval(visibles, &VisibleInterval{
start: 5,
stop: 6,
fileId: "5",
chunkSize: 2,
})
addVisibleInterval(visibles, &VisibleInterval{
start: 7,
stop: 9,
fileId: "7",
chunkSize: 2,
})
addVisibleInterval(visibles, &VisibleInterval{
start: 9,
stop: 10,
fileId: "9",
chunkSize: 2,
})
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
readerLock: sync.Mutex{},
fileSize: 10,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
@ -86,7 +83,7 @@ func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, exp
if data == nil {
data = make([]byte, size)
}
n, err := readerAt.doReadAt(data, offset)
n, _, err := readerAt.doReadAt(data, offset)
if expectedN != n {
t.Errorf("unexpected read size: %d, expect: %d", n, expectedN)
@ -101,24 +98,22 @@ func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, exp
func TestReaderAt0(t *testing.T) {
visibles := []VisibleInterval{
{
start: 2,
stop: 5,
fileId: "1",
chunkSize: 9,
},
{
start: 7,
stop: 9,
fileId: "2",
chunkSize: 9,
},
}
visibles := NewIntervalList[*VisibleInterval]()
addVisibleInterval(visibles, &VisibleInterval{
start: 2,
stop: 5,
fileId: "1",
chunkSize: 9,
})
addVisibleInterval(visibles, &VisibleInterval{
start: 7,
stop: 9,
fileId: "2",
chunkSize: 9,
})
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
readerLock: sync.Mutex{},
fileSize: 10,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
@ -135,18 +130,16 @@ func TestReaderAt0(t *testing.T) {
func TestReaderAt1(t *testing.T) {
visibles := []VisibleInterval{
{
start: 2,
stop: 5,
fileId: "1",
chunkSize: 9,
},
}
visibles := NewIntervalList[*VisibleInterval]()
addVisibleInterval(visibles, &VisibleInterval{
start: 2,
stop: 5,
fileId: "1",
chunkSize: 9,
})
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
readerLock: sync.Mutex{},
fileSize: 20,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
@ -164,24 +157,22 @@ func TestReaderAt1(t *testing.T) {
}
func TestReaderAtGappedChunksDoNotLeak(t *testing.T) {
visibles := []VisibleInterval{
{
start: 2,
stop: 3,
fileId: "1",
chunkSize: 5,
},
{
start: 7,
stop: 9,
fileId: "1",
chunkSize: 4,
},
}
visibles := NewIntervalList[*VisibleInterval]()
addVisibleInterval(visibles, &VisibleInterval{
start: 2,
stop: 3,
fileId: "1",
chunkSize: 5,
})
addVisibleInterval(visibles, &VisibleInterval{
start: 7,
stop: 9,
fileId: "1",
chunkSize: 4,
})
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
readerLock: sync.Mutex{},
fileSize: 9,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
@ -193,8 +184,7 @@ func TestReaderAtGappedChunksDoNotLeak(t *testing.T) {
func TestReaderAtSparseFileDoesNotLeak(t *testing.T) {
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals([]VisibleInterval{}, 0, math.MaxInt64),
readerLock: sync.Mutex{},
chunkViews: ViewFromVisibleIntervals(NewIntervalList[*VisibleInterval](), 0, math.MaxInt64),
fileSize: 3,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),

View file

@ -43,7 +43,7 @@ func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
}
}
func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
if rc.lookupFileIdFn == nil {
return
}
@ -55,7 +55,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
return
}
for _, chunkView := range chunkViews {
for x := chunkViews; x != nil; x = x.Next {
chunkView := x.Value
if _, found := rc.downloaders[chunkView.FileId]; found {
continue
}
@ -65,7 +66,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
return
}
// glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
// glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
// cache this chunk if not yet
cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
go cacher.startCaching()

View file

@ -6,7 +6,6 @@ import (
"golang.org/x/exp/slices"
"io"
"math"
"sort"
"strings"
"sync"
"time"
@ -78,7 +77,8 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w
fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
for x := chunkViews.Front(); x != nil; x = x.Next {
chunkView := x.Value
var urlStrings []string
var err error
for _, backoff := range getLookupFileIdBackoffSchedule {
@ -102,29 +102,30 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
for _, chunkView := range chunkViews {
if offset < chunkView.LogicOffset {
gap := chunkView.LogicOffset - offset
for x := chunkViews.Front(); x != nil; x = x.Next {
chunkView := x.Value
if offset < chunkView.ViewOffset {
gap := chunkView.ViewOffset - offset
remaining -= gap
glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset)
glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset)
err := writeZero(writer, gap)
if err != nil {
return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset)
return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset)
}
offset = chunkView.LogicOffset
offset = chunkView.ViewOffset
}
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
offset += int64(chunkView.Size)
remaining -= int64(chunkView.Size)
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
offset += int64(chunkView.ViewSize)
remaining -= int64(chunkView.ViewSize)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
if err != nil {
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
return fmt.Errorf("read chunk: %v", err)
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
downloadThrottler.MaybeSlowdown(int64(chunkView.Size))
downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize))
}
if remaining > 0 {
glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
@ -167,14 +168,15 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer
idx := 0
for _, chunkView := range chunkViews {
for x := chunkViews.Front(); x != nil; x = x.Next {
chunkView := x.Value
urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil {
return err
}
@ -185,7 +187,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer
// ---------------- ChunkStreamReader ----------------------------------
type ChunkStreamReader struct {
chunkViews []*ChunkView
chunkView *Interval[*ChunkView]
totalSize int64
logicOffset int64
buffer []byte
@ -201,17 +203,15 @@ var _ = io.ReaderAt(&ChunkStreamReader{})
func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
slices.SortFunc(chunkViews, func(a, b *ChunkView) bool {
return a.LogicOffset < b.LogicOffset
})
var totalSize int64
for _, chunk := range chunkViews {
totalSize += int64(chunk.Size)
for x := chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
totalSize += int64(chunk.ViewSize)
}
return &ChunkStreamReader{
chunkViews: chunkViews,
chunkView: chunkViews.Front(),
lookupFileId: lookupFileIdFn,
totalSize: totalSize,
}
@ -290,7 +290,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
}
func insideChunk(offset int64, chunk *ChunkView) bool {
return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size)
return chunk.ViewOffset <= offset && offset < chunk.ViewOffset+int64(chunk.ViewSize)
}
func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
@ -300,48 +300,22 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
}
// fmt.Printf("fetch for offset %d\n", offset)
// need to seek to a different chunk
currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
return offset < c.chunkViews[i].LogicOffset
})
if currentChunkIndex == len(c.chunkViews) {
// not found
if insideChunk(offset, c.chunkViews[0]) {
// fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
currentChunkIndex = 0
} else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) {
currentChunkIndex = len(c.chunkViews) - 1
// fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
} else {
return io.EOF
}
} else if currentChunkIndex > 0 {
if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
// good hit
} else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
currentChunkIndex -= 1
// fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
} else {
// glog.Fatalf("unexpected1 offset %d", offset)
return fmt.Errorf("unexpected1 offset %d", offset)
}
} else {
// glog.Fatalf("unexpected2 offset %d", offset)
return fmt.Errorf("unexpected2 offset %d", offset)
c.chunkView = c.chunkView.Next
if c.chunkView == nil {
return io.EOF
}
// positioning within the new chunk
chunk := c.chunkViews[currentChunkIndex]
chunk := c.chunkView.Value
if insideChunk(offset, chunk) {
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
if c.isBufferEmpty() || c.bufferOffset != chunk.ViewOffset {
if err = c.fetchChunkToBuffer(chunk); err != nil {
return
}
}
} else {
// glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
// glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
}
return
}
@ -355,7 +329,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
@ -372,10 +346,10 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
c.buffer = buffer.Bytes()
c.bufferOffset = chunkView.LogicOffset
c.bufferOffset = chunkView.ViewOffset
c.chunk = chunkView.FileId
// glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
// glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize))
return nil
}

View file

@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"io"
"sync"
"time"
)
type ChunkedDirtyPages struct {
@ -38,11 +37,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
return dirtyPages
}
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) {
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
pages.hasWrites = true
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data)))
pages.uploadPipeline.SaveDataAt(data, offset, isSequential)
pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs)
return
}
@ -58,28 +57,27 @@ func (pages *ChunkedDirtyPages) FlushData() error {
return nil
}
func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) {
if !pages.hasWrites {
return
}
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset, tsNs)
}
func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) {
mtime := time.Now().UnixNano()
defer cleanupFn()
fileFullPath := pages.fh.FullPath()
fileName := fileFullPath.Name()
chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset)
chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs)
if err != nil {
glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
pages.lastErr = err
return
}
chunk.ModifiedTsNs = mtime
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
pages.fh.entryChunkGroup.AddChunk(chunk)
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
}

View file

@ -5,50 +5,60 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/exp/slices"
"golang.org/x/sync/semaphore"
"math"
"os"
"sync"
)
type FileHandleId uint64
var IsDebugFileReadWrite = false
type FileHandle struct {
fh FileHandleId
counter int64
entry *LockedEntry
entryLock sync.Mutex
inode uint64
wfs *WFS
fh FileHandleId
counter int64
entry *LockedEntry
entryLock sync.Mutex
entryChunkGroup *filer.ChunkGroup
inode uint64
wfs *WFS
// cache file has been written to
dirtyMetadata bool
dirtyPages *PageWriter
entryViewCache []filer.VisibleInterval
reader *filer.ChunkReadAt
contentType string
handle uint64
orderedMutex *semaphore.Weighted
dirtyMetadata bool
dirtyPages *PageWriter
reader *filer.ChunkReadAt
contentType string
handle uint64
sync.Mutex
isDeleted bool
// for debugging
mirrorFile *os.File
}
func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
fh := &FileHandle{
fh: handleId,
counter: 1,
inode: inode,
wfs: wfs,
orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
fh: handleId,
counter: 1,
inode: inode,
wfs: wfs,
}
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
}
fh.entry = &LockedEntry{
Entry: entry,
}
if entry != nil {
fh.SetEntry(entry)
}
if IsDebugFileReadWrite {
var err error
fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
println("failed to create mirror:", err.Error())
}
}
return fh
}
@ -63,6 +73,17 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry {
}
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
if entry != nil {
fileSize := filer.FileSize(entry)
entry.Attributes.FileSize = fileSize
var resolveManifestErr error
fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
if resolveManifestErr != nil {
glog.Warningf("failed to resolve manifest chunks in %+v", entry)
}
} else {
glog.Fatalf("setting file handle entry to nil")
}
fh.entry.SetEntry(entry)
}
@ -78,43 +99,17 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
return
}
// find the earliest incoming chunk
newChunks := chunks
earliestChunk := newChunks[0]
for i := 1; i < len(newChunks); i++ {
if lessThan(earliestChunk, newChunks[i]) {
earliestChunk = newChunks[i]
}
}
// pick out-of-order chunks from existing chunks
for _, chunk := range fh.entry.GetChunks() {
if lessThan(earliestChunk, chunk) {
chunks = append(chunks, chunk)
}
}
// sort incoming chunks
slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
return lessThan(a, b)
})
glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks))
fh.entry.AppendChunks(newChunks)
fh.entryViewCache = nil
fh.entry.AppendChunks(chunks)
}
func (fh *FileHandle) CloseReader() {
if fh.reader != nil {
_ = fh.reader.Close()
fh.reader = nil
}
}
func (fh *FileHandle) ReleaseHandle() {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
func (fh *FileHandle) Release() {
fh.dirtyPages.Destroy()
fh.CloseReader()
if IsDebugFileReadWrite {
fh.mirrorFile.Close()
}
}
func lessThan(a, b *filer_pb.FileChunk) bool {

View file

@ -65,7 +65,7 @@ func (i *FileHandleToInode) ReleaseByInode(inode uint64) {
if fh.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fh.fh)
fh.Release()
fh.ReleaseHandle()
}
}
}
@ -82,7 +82,7 @@ func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) {
if fhHandle.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fhHandle.fh)
fhHandle.Release()
fhHandle.ReleaseHandle()
}
}

View file

@ -17,18 +17,20 @@ func (fh *FileHandle) unlockForRead(startOffset int64, size int) {
fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size))
}
func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs int64) (maxStop int64) {
maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset, tsNs)
return
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fileFullPath := fh.FullPath()
entry := fh.GetEntry()
if entry == nil {
return 0, io.EOF
return 0, 0, io.EOF
}
if entry.IsInRemoteOnly() {
@ -36,43 +38,28 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
newEntry, err := fh.downloadRemoteEntry(entry)
if err != nil {
glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err)
return 0, err
return 0, 0, err
}
entry = newEntry
}
fileSize := int64(filer.FileSize(entry))
fileSize := int64(entry.Attributes.FileSize)
if fileSize == 0 {
fileSize = int64(filer.FileSize(entry))
}
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fileFullPath)
return 0, io.EOF
return 0, 0, io.EOF
}
if offset+int64(len(buff)) <= int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
return int64(totalRead), 0, nil
}
var chunkResolveErr error
if fh.entryViewCache == nil {
fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
fh.CloseReader()
}
if fh.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, fileSize)
glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews))
for _, chunkView := range chunkViews {
glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId)
}
fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize)
}
totalRead, err := fh.reader.ReadAt(buff, offset)
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err)
@ -80,7 +67,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
return int64(totalRead), ts, err
}
func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {

View file

@ -29,35 +29,35 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
return pw
}
func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) {
func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
pw.addToOneChunk(i, offset, data[:writeSize], isSequential)
pw.addToOneChunk(i, offset, data[:writeSize], isSequential, tsNs)
offset += writeSize
data = data[writeSize:]
}
}
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) {
pw.randomWriter.AddPage(offset, data, isSequential)
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool, tsNs int64) {
pw.randomWriter.AddPage(offset, data, isSequential, tsNs)
}
func (pw *PageWriter) FlushData() error {
return pw.randomWriter.FlushData()
}
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64, tsNs int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset, tsNs)
offset += readSize
data = data[readSize:]

View file

@ -0,0 +1,39 @@
package page_writer
import "time"
type ActivityScore struct {
lastActiveTsNs int64
decayedActivenessScore int64
}
func NewActivityScore() *ActivityScore {
return &ActivityScore{}
}
func (as ActivityScore) MarkRead() {
now := time.Now().UnixNano()
deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds
as.lastActiveTsNs = now
as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 256
if as.decayedActivenessScore < 0 {
as.decayedActivenessScore = 0
}
}
func (as ActivityScore) MarkWrite() {
now := time.Now().UnixNano()
deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds
as.lastActiveTsNs = now
as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 1024
if as.decayedActivenessScore < 0 {
as.decayedActivenessScore = 0
}
}
func (as ActivityScore) ActivityScore() int64 {
deltaTime := (time.Now().UnixNano() - as.lastActiveTsNs) >> 30 // about number of seconds
return as.decayedActivenessScore >> deltaTime
}

View file

@ -8,6 +8,7 @@ import (
type ChunkWrittenInterval struct {
StartOffset int64
stopOffset int64
TsNs int64
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
@ -42,10 +43,14 @@ func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
return list
}
func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset, tsNs int64) {
if startOffset >= stopOffset {
return
}
interval := &ChunkWrittenInterval{
StartOffset: startOffset,
stopOffset: stopOffset,
TsNs: tsNs,
}
list.addInterval(interval)
}
@ -62,50 +67,54 @@ func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
//t := list.head
//for ; t.next != nil; t = t.next {
// if t.TsNs > interval.TsNs {
// println("writes is out of order", t.TsNs-interval.TsNs, "ns")
// }
//}
p := list.head
for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
for ; p.next != nil && p.next.stopOffset <= interval.StartOffset; p = p.next {
}
q := list.tail
for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
for ; q.prev != nil && q.prev.StartOffset >= interval.stopOffset; q = q.prev {
}
if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
// merge p and q together
p.stopOffset = q.stopOffset
unlinkNodesBetween(p, q.next)
return
}
if interval.StartOffset <= p.stopOffset {
// merge new interval into p
p.stopOffset = interval.stopOffset
unlinkNodesBetween(p, q)
return
}
if q.StartOffset <= interval.stopOffset {
// merge new interval into q
q.StartOffset = interval.StartOffset
unlinkNodesBetween(p, q)
return
// left side
// interval after p.next start
if p.next.StartOffset < interval.StartOffset {
t := &ChunkWrittenInterval{
StartOffset: p.next.StartOffset,
stopOffset: interval.StartOffset,
TsNs: p.next.TsNs,
}
p.next = t
t.prev = p
t.next = interval
interval.prev = t
} else {
p.next = interval
interval.prev = p
}
// add the new interval between p and q
unlinkNodesBetween(p, q)
p.next = interval
interval.prev = p
q.prev = interval
interval.next = q
}
// unlinkNodesBetween remove all nodes after start and before stop, exclusive
func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
if start.next == stop {
return
// right side
// interval ends before p.prev
if interval.stopOffset < q.prev.stopOffset {
t := &ChunkWrittenInterval{
StartOffset: interval.stopOffset,
stopOffset: q.prev.stopOffset,
TsNs: q.prev.TsNs,
}
q.prev = t
t.next = q
interval.next = t
t.prev = interval
} else {
q.prev = interval
interval.next = q
}
start.next.prev = nil
start.next = stop
stop.prev.next = nil
stop.prev = start
}
func (list *ChunkWrittenIntervalList) size() int {

View file

@ -10,40 +10,72 @@ func Test_PageChunkWrittenIntervalList(t *testing.T) {
assert.Equal(t, 0, list.size(), "empty list")
list.MarkWritten(0, 5)
list.MarkWritten(0, 5, 1)
assert.Equal(t, 1, list.size(), "one interval")
list.MarkWritten(0, 5)
list.MarkWritten(0, 5, 2)
assert.Equal(t, 1, list.size(), "duplicated interval2")
list.MarkWritten(95, 100)
list.MarkWritten(95, 100, 3)
assert.Equal(t, 2, list.size(), "two intervals")
list.MarkWritten(50, 60)
list.MarkWritten(50, 60, 4)
assert.Equal(t, 3, list.size(), "three intervals")
list.MarkWritten(50, 55)
assert.Equal(t, 3, list.size(), "three intervals merge")
list.MarkWritten(50, 55, 5)
assert.Equal(t, 4, list.size(), "three intervals merge")
list.MarkWritten(40, 50)
assert.Equal(t, 3, list.size(), "three intervals grow forward")
list.MarkWritten(40, 50, 6)
assert.Equal(t, 5, list.size(), "three intervals grow forward")
list.MarkWritten(50, 65)
assert.Equal(t, 3, list.size(), "three intervals grow backward")
list.MarkWritten(50, 65, 7)
assert.Equal(t, 4, list.size(), "three intervals grow backward")
list.MarkWritten(70, 80)
assert.Equal(t, 4, list.size(), "four intervals")
list.MarkWritten(70, 80, 8)
assert.Equal(t, 5, list.size(), "four intervals")
list.MarkWritten(60, 70)
assert.Equal(t, 3, list.size(), "three intervals merged")
list.MarkWritten(60, 70, 9)
assert.Equal(t, 6, list.size(), "three intervals merged")
list.MarkWritten(59, 71)
assert.Equal(t, 3, list.size(), "covered three intervals")
list.MarkWritten(59, 71, 10)
assert.Equal(t, 6, list.size(), "covered three intervals")
list.MarkWritten(5, 59)
assert.Equal(t, 2, list.size(), "covered two intervals")
list.MarkWritten(5, 59, 11)
assert.Equal(t, 5, list.size(), "covered two intervals")
list.MarkWritten(70, 99)
assert.Equal(t, 1, list.size(), "covered one intervals")
list.MarkWritten(70, 99, 12)
assert.Equal(t, 5, list.size(), "covered one intervals")
}
type interval struct {
start int64
stop int64
expected bool
}
func Test_PageChunkWrittenIntervalList1(t *testing.T) {
list := newChunkWrittenIntervalList()
inputs := []interval{
{1, 5, true},
{2, 3, true},
}
for i, input := range inputs {
list.MarkWritten(input.start, input.stop, int64(i)+1)
actual := hasData(list, 0, 4)
if actual != input.expected {
t.Errorf("input [%d,%d) expected %v actual %v", input.start, input.stop, input.expected, actual)
}
}
}
func hasData(usage *ChunkWrittenIntervalList, chunkStartOffset, x int64) bool {
for t := usage.head.next; t != usage.tail; t = t.next {
logicStart := chunkStartOffset + t.StartOffset
logicStop := chunkStartOffset + t.stopOffset
if logicStart <= x && x < logicStop {
return true
}
}
return false
}

View file

@ -1,9 +1,9 @@
package page_writer
type DirtyPages interface {
AddPage(offset int64, data []byte, isSequential bool)
AddPage(offset int64, data []byte, isSequential bool, tsNs int64)
FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64)
Destroy()
LockForRead(startOffset, stopOffset int64)
UnlockForRead(startOffset, stopOffset int64)

View file

@ -4,13 +4,13 @@ import (
"io"
)
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func())
type PageChunk interface {
FreeResource()
WriteDataAt(src []byte, offset int64) (n int)
ReadDataAt(p []byte, off int64) (maxStop int64)
WriteDataAt(src []byte, offset int64, tsNs int64) (n int)
ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
IsComplete() bool
WrittenSize() int64
ActivityScore() int64
SaveContent(saveFn SaveToStorageFunc)
}

View file

@ -19,6 +19,7 @@ type MemChunk struct {
usage *ChunkWrittenIntervalList
chunkSize int64
logicChunkIndex LogicChunkIndex
activityScore *ActivityScore
}
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
@ -28,6 +29,7 @@ func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
chunkSize: chunkSize,
buf: mem.Allocate(int(chunkSize)),
usage: newChunkWrittenIntervalList(),
activityScore: NewActivityScore(),
}
}
@ -39,29 +41,37 @@ func (mc *MemChunk) FreeResource() {
mem.Free(mc.buf)
}
func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
mc.Lock()
defer mc.Unlock()
innerOffset := offset % mc.chunkSize
n = copy(mc.buf[innerOffset:], src)
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
mc.activityScore.MarkWrite()
return
}
func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
mc.RLock()
defer mc.RUnlock()
memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
logicStart := max(off, memChunkBaseOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop)
if t.TsNs >= tsNs {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop)
} else {
println("read old data1", tsNs-t.TsNs, "ns")
}
}
}
mc.activityScore.MarkRead()
return
}
@ -72,11 +82,8 @@ func (mc *MemChunk) IsComplete() bool {
return mc.usage.IsComplete(mc.chunkSize)
}
func (mc *MemChunk) WrittenSize() int64 {
mc.RLock()
defer mc.RUnlock()
return mc.usage.WrittenSize()
func (mc *MemChunk) ActivityScore() int64 {
return mc.activityScore.ActivityScore()
}
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
@ -88,7 +95,7 @@ func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
}
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), t.TsNs, func() {
})
}
}

View file

@ -15,12 +15,12 @@ var (
type ActualChunkIndex int
type SwapFile struct {
dir string
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
logicToActualChunkIndexLock sync.Mutex
chunkSize int64
freeActualChunkList []ActualChunkIndex
dir string
file *os.File
chunkSize int64
chunkTrackingLock sync.Mutex
activeChunkCount int
freeActualChunkList []ActualChunkIndex
}
type SwapFileChunk struct {
@ -29,14 +29,15 @@ type SwapFileChunk struct {
usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
activityScore *ActivityScore
//memChunk *MemChunk
}
func NewSwapFile(dir string, chunkSize int64) *SwapFile {
return &SwapFile{
dir: dir,
file: nil,
logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
chunkSize: chunkSize,
dir: dir,
file: nil,
chunkSize: chunkSize,
}
}
func (sf *SwapFile) FreeResource() {
@ -46,7 +47,7 @@ func (sf *SwapFile) FreeResource() {
}
}
func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
if sf.file == nil {
var err error
sf.file, err = os.CreateTemp(sf.dir, "")
@ -55,70 +56,98 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
return nil
}
}
sf.logicToActualChunkIndexLock.Lock()
defer sf.logicToActualChunkIndexLock.Unlock()
actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
if !found {
if len(sf.freeActualChunkList) > 0 {
actualChunkIndex = sf.freeActualChunkList[0]
sf.freeActualChunkList = sf.freeActualChunkList[1:]
} else {
actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
}
sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
sf.chunkTrackingLock.Lock()
defer sf.chunkTrackingLock.Unlock()
sf.activeChunkCount++
// assign a new physical chunk
var actualChunkIndex ActualChunkIndex
if len(sf.freeActualChunkList) > 0 {
actualChunkIndex = sf.freeActualChunkList[0]
sf.freeActualChunkList = sf.freeActualChunkList[1:]
} else {
actualChunkIndex = ActualChunkIndex(sf.activeChunkCount)
}
return &SwapFileChunk{
swapFileChunk := &SwapFileChunk{
swapfile: sf,
usage: newChunkWrittenIntervalList(),
logicChunkIndex: logicChunkIndex,
actualChunkIndex: actualChunkIndex,
activityScore: NewActivityScore(),
// memChunk: NewMemChunk(logicChunkIndex, sf.chunkSize),
}
// println(logicChunkIndex, "|", "++++", swapFileChunk.actualChunkIndex, swapFileChunk, sf)
return swapFileChunk
}
func (sc *SwapFileChunk) FreeResource() {
sc.swapfile.logicToActualChunkIndexLock.Lock()
defer sc.swapfile.logicToActualChunkIndexLock.Unlock()
sc.Lock()
defer sc.Unlock()
sc.swapfile.chunkTrackingLock.Lock()
defer sc.swapfile.chunkTrackingLock.Unlock()
sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
sc.swapfile.activeChunkCount--
// println(sc.logicChunkIndex, "|", "----", sc.actualChunkIndex, sc, sc.swapfile)
}
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
sc.Lock()
defer sc.Unlock()
// println(sc.logicChunkIndex, "|", tsNs, "write at", offset, len(src), sc.actualChunkIndex)
innerOffset := offset % sc.swapfile.chunkSize
var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
if err == nil {
sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
} else {
sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
if err != nil {
glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
}
//sc.memChunk.WriteDataAt(src, offset, tsNs)
sc.activityScore.MarkWrite()
return
}
func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
sc.RLock()
defer sc.RUnlock()
// println(sc.logicChunkIndex, "|", tsNs, "read at", off, len(p), sc.actualChunkIndex)
//memCopy := make([]byte, len(p))
//copy(memCopy, p)
chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
break
if t.TsNs >= tsNs {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
break
}
maxStop = max(maxStop, logicStop)
} else {
println("read old data2", tsNs-t.TsNs, "ns")
}
maxStop = max(maxStop, logicStop)
}
}
//sc.memChunk.ReadDataAt(memCopy, off, tsNs)
//if bytes.Compare(memCopy, p) != 0 {
// println("read wrong data from swap file", off, sc.logicChunkIndex)
//}
sc.activityScore.MarkRead()
return
}
@ -128,27 +157,27 @@ func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize)
}
func (sc *SwapFileChunk) WrittenSize() int64 {
sc.RLock()
defer sc.RUnlock()
return sc.usage.WrittenSize()
func (sc *SwapFileChunk) ActivityScore() int64 {
return sc.activityScore.ActivityScore()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
sc.RLock()
defer sc.RUnlock()
if saveFn == nil {
return
}
sc.Lock()
defer sc.Unlock()
// println(sc.logicChunkIndex, "|", "save")
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
reader := util.NewBytesReader(data)
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
})
n, _ := sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
if n > 0 {
reader := util.NewBytesReader(data[:n])
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, int64(n), t.TsNs, func() {
})
}
mem.Free(data)
}
sc.usage = newChunkWrittenIntervalList()
}

View file

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
"math"
"sync"
"sync/atomic"
)
@ -55,7 +56,8 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
return t
}
func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) {
up.chunksLock.Lock()
defer up.chunksLock.Unlock()
@ -65,33 +67,39 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
if !found {
if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
chunkFullness := mc.WrittenSize()
if fullness < chunkFullness {
fullestChunkIndex = lci
fullness = chunkFullness
laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64)
for wci, wc := range up.writableChunks {
activityScore := wc.ActivityScore()
if lowestActivityScore > activityScore {
laziestChunkIndex = wci
lowestActivityScore = activityScore
}
}
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs)
}
if isSequential &&
len(up.writableChunks) < up.writableChunkLimit &&
atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex)
}
up.writableChunks[logicChunkIndex] = pageChunk
}
n = pageChunk.WriteDataAt(p, off)
//if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed {
// println("found already sealed chunk", logicChunkIndex)
//}
//if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading {
// println("found active read chunk", logicChunkIndex)
//}
n = pageChunk.WriteDataAt(p, off, tsNs)
up.maybeMoveToSealed(pageChunk, logicChunkIndex)
return
}
func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
up.chunksLock.Lock()
@ -103,12 +111,8 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
// read from sealed chunks first
sealedChunk, found := up.sealedChunks[logicChunkIndex]
if found {
sealedChunk.referenceCounter++
}
if found {
maxStop = sealedChunk.chunk.ReadDataAt(p, off)
maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
}
// read from writable chunks last
@ -116,7 +120,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
if !found {
return
}
writableMaxStop := writableChunk.ReadDataAt(p, off)
writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop)

View file

@ -31,14 +31,14 @@ func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i))
uploadPipeline.SaveDataAt(p, i, false)
uploadPipeline.SaveDataAt(p, i, false, 0)
}
}
func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff; i < stopOff/4; i += 4 {
uploadPipeline.MaybeReadDataAt(p, i)
uploadPipeline.MaybeReadDataAt(p, i, 0)
x := util.BytesToUint32(p)
if x != uint32(i) {
t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)

View file

@ -20,12 +20,12 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
_, _, entry, status := wfs.maybeReadEntry(inode)
if status == fuse.OK {
out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, inode, entry)
wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
return status
} else {
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry())
wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true)
out.Nlink = 0
return fuse.OK
}
@ -75,7 +75,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
// set the new chunks and reset entry cache
entry.Chunks = chunks
if fh != nil {
fh.entryViewCache = nil
fh.entryChunkGroup.SetChunks(chunks)
}
}
entry.Attributes.Mtime = time.Now().Unix()
@ -114,7 +114,11 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
}
out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry)
size, includeSize := input.GetSize()
if includeSize {
out.Attr.Size = size
}
wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry, !includeSize)
if fh != nil {
fh.dirtyMetadata = true
@ -139,12 +143,14 @@ func (wfs *WFS) setRootAttr(out *fuse.AttrOut) {
out.Nlink = 1
}
func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry) {
func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry, calculateSize bool) {
out.Ino = inode
if entry.Attributes != nil && entry.Attributes.Inode != 0 {
out.Ino = entry.Attributes.Inode
}
out.Size = filer.FileSize(entry)
if calculateSize {
out.Size = filer.FileSize(entry)
}
if entry.FileMode()&os.ModeSymlink != 0 {
out.Size = uint64(len(entry.Attributes.SymlinkTarget))
}
@ -194,7 +200,7 @@ func (wfs *WFS) outputPbEntry(out *fuse.EntryOut, inode uint64, entry *filer_pb.
out.Generation = 1
out.EntryValid = 1
out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, inode, entry)
wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
}
func (wfs *WFS) outputFilerEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) {

View file

@ -1,8 +1,8 @@
package mount
import (
"context"
"net/http"
"time"
"github.com/hanwen/go-fuse/v2/fuse"
@ -44,8 +44,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
// lock source and target file handles
fhOut.orderedMutex.Acquire(context.Background(), 1)
defer fhOut.orderedMutex.Release(1)
fhOut.Lock()
defer fhOut.Unlock()
fhOut.entryLock.Lock()
defer fhOut.entryLock.Unlock()
@ -54,8 +54,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
if fhIn.fh != fhOut.fh {
fhIn.orderedMutex.Acquire(context.Background(), 1)
defer fhIn.orderedMutex.Release(1)
fhIn.Lock()
defer fhIn.Unlock()
fhIn.entryLock.Lock()
defer fhIn.entryLock.Unlock()
}
@ -88,7 +88,7 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
// put data at the specified offset in target file
fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len))
fhOut.entry.Content = nil
fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode())
fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano())
fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize)))
fhOut.dirtyMetadata = true
written = uint32(totalRead)

View file

@ -1,7 +1,6 @@
package mount
import (
"context"
"syscall"
"github.com/hanwen/go-fuse/v2/fuse"
@ -36,8 +35,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
}
// lock the file until the proper offset was calculated
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
fh.Lock()
defer fh.Unlock()
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
@ -56,17 +55,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return ENXIO
}
// refresh view cache if necessary
if fh.entryViewCache == nil {
var err error
fh.entryViewCache, err = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), fh.entry.GetChunks(), 0, fileSize)
if err != nil {
return fuse.EIO
}
}
// search chunks for the offset
found, offset := searchChunks(fh, offset, fileSize, in.Whence)
found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence)
if found {
out.Offset = uint64(offset)
return fuse.OK
@ -82,30 +72,3 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return fuse.OK
}
// searchChunks goes through all chunks to find the correct offset
func searchChunks(fh *FileHandle, offset, fileSize int64, whence uint32) (found bool, out int64) {
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, offset, fileSize)
for _, chunkView := range chunkViews {
if offset < chunkView.LogicOffset {
if whence == SEEK_HOLE {
out = offset
} else {
out = chunkView.LogicOffset
}
return true, out
}
if offset >= chunkView.LogicOffset && offset < chunkView.Offset+int64(chunkView.Size) && whence == SEEK_DATA {
out = offset
return true, out
}
offset += int64(chunkView.Size)
}
return
}

View file

@ -1,7 +1,8 @@
package mount
import (
"context"
"bytes"
"fmt"
"io"
"github.com/hanwen/go-fuse/v2/fuse"
@ -40,8 +41,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
fh.Lock()
defer fh.Unlock()
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)
@ -50,6 +51,23 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.EIO
}
if IsDebugFileReadWrite {
// print(".")
mirrorData := make([]byte, totalRead)
fh.mirrorFile.ReadAt(mirrorData, offset)
if bytes.Compare(mirrorData, buff[:totalRead]) != 0 {
againBuff := make([]byte, len(buff))
againRead, _ := readDataByFileHandle(buff, fh, offset)
againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0
againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0
fmt.Printf("\ncompare %v [%d,%d) size:%d againSame:%v againCorrect:%v\n", fh.mirrorFile.Name(), offset, offset+totalRead, totalRead, againSame, againCorrect)
//fmt.Printf("read mirrow data: %v\n", mirrorData)
//fmt.Printf("read actual data: %v\n", buff[:totalRead])
}
}
return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}
@ -59,9 +77,9 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e
fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size)
n, err := fhIn.readFromChunks(buff, offset)
n, tsNs, err := fhIn.readFromChunks(buff, offset)
if err == nil || err == io.EOF {
maxStop := fhIn.readFromDirtyPages(buff, offset)
maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
n = max(maxStop-offset, n)
}
if err == io.EOF {

View file

@ -89,8 +89,8 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
}
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
fh.Lock()
defer fh.Unlock()
// flush works at fh level
fileFullPath := fh.FullPath()
@ -145,9 +145,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
}
glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.GetChunks()))
for i, chunk := range entry.GetChunks() {
glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
//for i, chunk := range entry.GetChunks() {
// glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
//}
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks())
@ -158,6 +158,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
entry.Chunks = append(chunks, manifestChunks...)
fh.entryChunkGroup.SetChunks(entry.Chunks)
wfs.mapPbIdFromLocalToFiler(request.Entry)
defer wfs.mapPbIdFromFilerToLocal(request.Entry)
@ -181,5 +182,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
return fuse.EIO
}
if IsDebugFileReadWrite {
fh.mirrorFile.Sync()
}
return fuse.OK
}

View file

@ -1,10 +1,10 @@
package mount
import (
"context"
"github.com/hanwen/go-fuse/v2/fuse"
"net/http"
"syscall"
"time"
)
/**
@ -46,8 +46,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size))
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
tsNs := time.Now().UnixNano()
fh.Lock()
defer fh.Unlock()
entry := fh.GetEntry()
if entry == nil {
@ -59,7 +61,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode())
fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs)
written = uint32(len(data))
@ -70,5 +72,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyMetadata = true
if IsDebugFileReadWrite {
// print("+")
fh.mirrorFile.WriteAt(data, offset)
}
return written, fuse.OK
}

View file

@ -13,7 +13,7 @@ import (
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, err error) {
return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
fileId, uploadResult, err, data := operation.UploadWithRetry(
wfs,
@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
wfs.chunkCache.SetChunk(fileId, data)
}
chunk = uploadResult.ToPbFileChunk(fileId, offset)
chunk = uploadResult.ToPbFileChunk(fileId, offset, tsNs)
return chunk, nil
}
}

View file

@ -45,13 +45,13 @@ type UploadResult struct {
RetryCount int `json:"-"`
}
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsNs int64) *filer_pb.FileChunk {
fid, _ := filer_pb.ToFileIdObject(fileId)
return &filer_pb.FileChunk{
FileId: fileId,
Offset: offset,
Size: uint64(uploadResult.Size),
ModifiedTsNs: time.Now().UnixNano(),
ModifiedTsNs: tsNs,
ETag: uploadResult.ContentMd5,
CipherKey: uploadResult.CipherKey,
IsCompressed: uploadResult.Gzip > 0,

View file

@ -7,9 +7,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
for _, chunk := range chunkViews {
for x := chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
fileUrls, err := filerSource.LookupFileId(chunk.FileId)
if err != nil {
@ -20,7 +21,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {

View file

@ -256,7 +256,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, error) {
return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) {
var fileId string
var uploadResult *operation.UploadResult
@ -290,7 +290,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
return nil, err
}
return uploadResult.ToPbFileChunk(fileId, offset), nil
return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
}
}

View file

@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
}
// Save to chunk manifest structure
fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)}
fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0, time.Now().UnixNano())}
// fmt.Printf("uploaded: %+v\n", uploadResult)

View file

@ -214,5 +214,5 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
if uploadResult.Size == 0 {
return nil, nil
}
return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset)}, nil
return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset, time.Now().UnixNano())}, nil
}

View file

@ -102,14 +102,14 @@ func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
func (fi *FileInfo) Sys() interface{} { return nil }
type WebDavFile struct {
fs *WebDavFileSystem
name string
isDirectory bool
off int64
entry *filer_pb.Entry
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
bufWriter *buffered_writer.BufferedWriteCloser
fs *WebDavFileSystem
name string
isDirectory bool
off int64
entry *filer_pb.Entry
visibleIntervals *filer.IntervalList[*filer.VisibleInterval]
reader io.ReaderAt
bufWriter *buffered_writer.BufferedWriteCloser
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
@ -381,7 +381,7 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
return fs.stat(ctx, name)
}
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
f.fs,
@ -413,7 +413,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
return uploadResult.ToPbFileChunk(fileId, offset), nil
return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
}
func (f *WebDavFile) Write(buf []byte) (int, error) {
@ -439,7 +439,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
var chunk *filer_pb.FileChunk
chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset)
chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano())
if flushErr != nil {
return fmt.Errorf("%s upload result: %v", f.name, flushErr)
@ -498,7 +498,7 @@ func (f *WebDavFile) Close() error {
if f.entry != nil {
f.entry = nil
f.entryViewCache = nil
f.visibleIntervals = nil
}
return err
@ -521,12 +521,12 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
if fileSize == 0 {
return 0, io.EOF
}
if f.entryViewCache == nil {
f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
if f.visibleIntervals == nil {
f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
f.reader = nil
}
if f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
}

View file

@ -117,7 +117,7 @@ type ItemEntry struct {
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) {
timeNowAtSec := time.Now().Unix()
return fileCount, errCount, doTraverseBfsAndSaving(c.env, nil, path, false,
return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
if c.modifyTimeAgoAtSec > 0 {
if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime {

View file

@ -211,7 +211,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
}
}()
return doTraverseBfsAndSaving(c.env, nil, c.getCollectFilerFilePath(), false,
return doTraverseBfsAndSaving(c.env, c.writer, c.getCollectFilerFilePath(), false,
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
if *c.verbose && entry.Entry.IsDirectory {
fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))