diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go new file mode 100644 index 000000000..723a14a13 --- /dev/null +++ b/weed/filer/filechunk_group.go @@ -0,0 +1,152 @@ +package filer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "sync" +) + +type ChunkGroup struct { + lookupFn wdclient.LookupFileIdFunctionType + chunkCache chunk_cache.ChunkCache + manifestChunks []*filer_pb.FileChunk + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex +} + +func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { + group := &ChunkGroup{ + lookupFn: lookupFn, + chunkCache: chunkCache, + sections: make(map[SectionIndex]*FileChunkSection), + } + + err := group.SetChunks(chunks) + return group, err +} + +func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { + + group.sectionsLock.Lock() + defer group.sectionsLock.Unlock() + + sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize) + for si := sectionIndexStart; si < sectionIndexStop+1; si++ { + section, found := group.sections[si] + if !found { + section = &FileChunkSection{ + sectionIndex: si, + } + group.sections[si] = section + } + section.addChunk(chunk) + } + return nil +} + +func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) { + + group.sectionsLock.RLock() + defer group.sectionsLock.RUnlock() + + sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) + for si := sectionIndexStart; si < sectionIndexStop+1; si++ { + section, found := group.sections[si] + rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) + if !found { + for i := rangeStart; i < rangeStop; i++ { + buff[i-offset] = 0 + } + continue + } + xn, xTsNs, xErr := section.readDataAt(group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart) + if xErr != nil { + err = xErr + } + n += xn + tsNs = max(tsNs, xTsNs) + } + return +} + +func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { + var dataChunks []*filer_pb.FileChunk + for _, chunk := range chunks { + + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + continue + } + + resolvedChunks, err := ResolveOneChunkManifest(group.lookupFn, chunk) + if err != nil { + return err + } + + group.manifestChunks = append(group.manifestChunks, chunk) + dataChunks = append(dataChunks, resolvedChunks...) + } + + for _, chunk := range dataChunks { + sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize) + for si := sectionIndexStart; si < sectionIndexStop+1; si++ { + section, found := group.sections[si] + if !found { + section = &FileChunkSection{ + sectionIndex: si, + } + group.sections[si] = section + } + section.chunks = append(section.chunks, chunk) + } + } + return nil +} + +const ( + // see weedfs_file_lseek.go + SEEK_DATA uint32 = 3 // seek to next data after the offset + // SEEK_HOLE uint32 = 4 // seek to next hole after the offset +) + +// FIXME: needa tests +func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) { + group.sectionsLock.RLock() + defer group.sectionsLock.RUnlock() + + return group.doSearchChunks(offset, fileSize, whence) +} + +func (group *ChunkGroup) doSearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) { + + sectionIndex, maxSectionIndex := SectionIndex(offset/SectionSize), SectionIndex(fileSize/SectionSize) + if whence == SEEK_DATA { + for si := sectionIndex; si < maxSectionIndex+1; si++ { + section, foundSection := group.sections[si] + if !foundSection { + continue + } + sectionStart := section.DataStartOffset(group, offset, fileSize) + if sectionStart == -1 { + continue + } + return true, sectionStart + } + return false, 0 + } else { + // whence == SEEK_HOLE + for si := sectionIndex; si < maxSectionIndex; si++ { + section, foundSection := group.sections[si] + if !foundSection { + return true, offset + } + holeStart := section.NextStopOffset(group, offset, fileSize) + if holeStart%SectionSize == 0 { + continue + } + return true, holeStart + } + return true, fileSize + } +} diff --git a/weed/filer/filechunk_group_test.go b/weed/filer/filechunk_group_test.go new file mode 100644 index 000000000..d24d66a49 --- /dev/null +++ b/weed/filer/filechunk_group_test.go @@ -0,0 +1,36 @@ +package filer + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestChunkGroup_doSearchChunks(t *testing.T) { + type fields struct { + sections map[SectionIndex]*FileChunkSection + } + type args struct { + offset int64 + fileSize int64 + whence uint32 + } + tests := []struct { + name string + fields fields + args args + wantFound bool + wantOut int64 + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + group := &ChunkGroup{ + sections: tt.fields.sections, + } + gotFound, gotOut := group.doSearchChunks(tt.args.offset, tt.args.fileSize, tt.args.whence) + assert.Equalf(t, tt.wantFound, gotFound, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence) + assert.Equalf(t, tt.wantOut, gotOut, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence) + }) + } +} diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go new file mode 100644 index 000000000..ea7d65010 --- /dev/null +++ b/weed/filer/filechunk_section.go @@ -0,0 +1,94 @@ +package filer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "sync" +) + +const SectionSize = 2 * 1024 * 1024 * 128 // 256MiB +type SectionIndex int64 +type FileChunkSection struct { + sectionIndex SectionIndex + chunks []*filer_pb.FileChunk + entryViewCache []VisibleInterval + chunkViews []*ChunkView + reader *ChunkReadAt + lock sync.Mutex +} + +func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error { + section.lock.Lock() + defer section.lock.Unlock() + section.chunks = append(section.chunks, chunk) + // FIXME: this can be improved to an incremental change + section.entryViewCache = nil + return nil +} + +func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) { + section.lock.Lock() + defer section.lock.Unlock() + + section.setupForRead(group, fileSize) + + return section.reader.ReadAtWithTime(buff, offset) +} + +func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) { + if section.entryViewCache == nil { + section.entryViewCache = readResolvedChunks(section.chunks) + section.chunks, _ = SeparateGarbageChunks(section.entryViewCache, section.chunks) + if section.reader != nil { + _ = section.reader.Close() + section.reader = nil + } + } + + if section.reader == nil { + chunkViews := ViewFromVisibleIntervals(section.entryViewCache, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize) + section.reader = NewChunkReaderAtFromClient(group.lookupFn, chunkViews, group.chunkCache, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) + } +} + +func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 { + section.lock.Lock() + defer section.lock.Unlock() + + section.setupForRead(group, fileSize) + + for _, visible := range section.entryViewCache { + if visible.stop <= offset { + continue + } + if offset < visible.start { + return offset + } + return offset + } + return -1 +} + +func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 { + section.lock.Lock() + defer section.lock.Unlock() + + section.setupForRead(group, fileSize) + + isAfterOffset := false + for _, visible := range section.entryViewCache { + if !isAfterOffset { + if visible.stop <= offset { + continue + } + isAfterOffset = true + } + if offset < visible.start { + return offset + } + // now visible.start <= offset + if offset < visible.stop { + offset = visible.stop + } + } + return offset +} diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index 430c40405..56c97549f 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -77,6 +77,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reade return } pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.entryChunkGroup.AddChunk(chunk) glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size) } diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index a08b66a00..7709a6921 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -16,21 +16,21 @@ type FileHandleId uint64 var IsDebug = true type FileHandle struct { - fh FileHandleId - counter int64 - entry *LockedEntry - entryLock sync.Mutex - inode uint64 - wfs *WFS + fh FileHandleId + counter int64 + entry *LockedEntry + entryLock sync.Mutex + entryChunkGroup *filer.ChunkGroup + inode uint64 + wfs *WFS // cache file has been written to - dirtyMetadata bool - dirtyPages *PageWriter - entryViewCache []filer.VisibleInterval - reader *filer.ChunkReadAt - contentType string - handle uint64 - orderedMutex *semaphore.Weighted + dirtyMetadata bool + dirtyPages *PageWriter + reader *filer.ChunkReadAt + contentType string + handle uint64 + orderedMutex *semaphore.Weighted isDeleted bool @@ -48,12 +48,12 @@ func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_p } // dirtyPages: newContinuousDirtyPages(file, writeOnly), fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit) - if entry != nil { - entry.Attributes.FileSize = filer.FileSize(entry) - } fh.entry = &LockedEntry{ Entry: entry, } + if entry != nil { + fh.SetEntry(entry) + } if IsDebug { var err error @@ -76,6 +76,17 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry { } func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { + if entry != nil { + fileSize := filer.FileSize(entry) + entry.Attributes.FileSize = fileSize + var resolveManifestErr error + fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks) + if resolveManifestErr != nil { + glog.Warningf("failed to resolve manifest chunks in %+v", entry) + } + } else { + glog.Fatalf("setting file handle entry to nil") + } fh.entry.SetEntry(entry) } @@ -92,14 +103,6 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { } fh.entry.AppendChunks(chunks) - fh.entryViewCache = nil -} - -func (fh *FileHandle) CloseReader() { - if fh.reader != nil { - _ = fh.reader.Close() - fh.reader = nil - } } func (fh *FileHandle) Release() { @@ -109,8 +112,14 @@ func (fh *FileHandle) Release() { glog.V(4).Infof("Release %s fh %d", fh.entry.Name, fh.handle) fh.dirtyPages.Destroy() - fh.CloseReader() if IsDebug { fh.mirrorFile.Close() } } + +func lessThan(a, b *filer_pb.FileChunk) bool { + if a.ModifiedTsNs == b.ModifiedTsNs { + return a.Fid.FileKey < b.Fid.FileKey + } + return a.ModifiedTsNs < b.ModifiedTsNs +} diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index 286f7f0fd..012154225 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -55,26 +55,8 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, e glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), 0, nil } - - var chunkResolveErr error - if fh.entryViewCache == nil { - fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize) - if chunkResolveErr != nil { - return 0, 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) - } - fh.CloseReader() - } - - if fh.reader == nil { - chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, fileSize) - glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews)) - //for _, chunkView := range chunkViews { - // glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId) - //} - fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize) - } - - totalRead, ts, err := fh.reader.ReadAtWithTime(buff, offset) + + totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset) if err != nil && err != io.EOF { glog.Errorf("file handle read %s: %v", fileFullPath, err) diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index 1d58e0852..ccdc4c234 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -75,7 +75,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse // set the new chunks and reset entry cache entry.Chunks = chunks if fh != nil { - fh.entryViewCache = nil + fh.entryChunkGroup.SetChunks(chunks) } } entry.Attributes.Mtime = time.Now().Unix() diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go index 9d6402f96..5f0a32894 100644 --- a/weed/mount/weedfs_file_lseek.go +++ b/weed/mount/weedfs_file_lseek.go @@ -56,17 +56,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO return ENXIO } - // refresh view cache if necessary - if fh.entryViewCache == nil { - var err error - fh.entryViewCache, err = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), fh.entry.GetChunks(), 0, fileSize) - if err != nil { - return fuse.EIO - } - } - // search chunks for the offset - found, offset := searchChunks(fh, offset, fileSize, in.Whence) + found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence) if found { out.Offset = uint64(offset) return fuse.OK @@ -82,30 +73,3 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO return fuse.OK } - -// searchChunks goes through all chunks to find the correct offset -func searchChunks(fh *FileHandle, offset, fileSize int64, whence uint32) (found bool, out int64) { - chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, offset, fileSize) - - for _, chunkView := range chunkViews { - if offset < chunkView.LogicOffset { - if whence == SEEK_HOLE { - out = offset - } else { - out = chunkView.LogicOffset - } - - return true, out - } - - if offset >= chunkView.LogicOffset && offset < chunkView.Offset+int64(chunkView.Size) && whence == SEEK_DATA { - out = offset - - return true, out - } - - offset += int64(chunkView.Size) - } - - return -}