mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Mount concurrent read (#4400)
* fix:mount deadlock * feat: concurrent read * fix * Remove useless code * fix --------- Co-authored-by: zemul <zhouzemiao@ihuman.com>
This commit is contained in:
parent
5614ad0000
commit
0122e022ea
|
@ -13,7 +13,8 @@ type FileChunkSection struct {
|
||||||
visibleIntervals *IntervalList[*VisibleInterval]
|
visibleIntervals *IntervalList[*VisibleInterval]
|
||||||
chunkViews *IntervalList[*ChunkView]
|
chunkViews *IntervalList[*ChunkView]
|
||||||
reader *ChunkReadAt
|
reader *ChunkReadAt
|
||||||
lock sync.Mutex
|
lock sync.RWMutex
|
||||||
|
isPrepared bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFileChunkSection(si SectionIndex) *FileChunkSection {
|
func NewFileChunkSection(si SectionIndex) *FileChunkSection {
|
||||||
|
@ -61,6 +62,19 @@ func removeGarbageChunks(section *FileChunkSection, garbageFileIds map[string]st
|
||||||
}
|
}
|
||||||
|
|
||||||
func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
|
func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
|
||||||
|
if section.isPrepared {
|
||||||
|
section.reader.fileSize = fileSize
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
section.lock.Lock()
|
||||||
|
defer section.lock.Unlock()
|
||||||
|
|
||||||
|
if section.isPrepared {
|
||||||
|
section.reader.fileSize = fileSize
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if section.visibleIntervals == nil {
|
if section.visibleIntervals == nil {
|
||||||
section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
|
section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
|
||||||
section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
|
section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
|
||||||
|
@ -76,23 +90,25 @@ func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64)
|
||||||
if section.reader == nil {
|
if section.reader == nil {
|
||||||
section.reader = NewChunkReaderAtFromClient(group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
|
section.reader = NewChunkReaderAtFromClient(group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
section.isPrepared = true
|
||||||
section.reader.fileSize = fileSize
|
section.reader.fileSize = fileSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
|
func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
|
||||||
section.lock.Lock()
|
|
||||||
defer section.lock.Unlock()
|
|
||||||
|
|
||||||
section.setupForRead(group, fileSize)
|
section.setupForRead(group, fileSize)
|
||||||
|
section.lock.RLock()
|
||||||
|
defer section.lock.RUnlock()
|
||||||
|
|
||||||
return section.reader.ReadAtWithTime(buff, offset)
|
return section.reader.ReadAtWithTime(buff, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
|
func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
|
||||||
section.lock.Lock()
|
|
||||||
defer section.lock.Unlock()
|
|
||||||
|
|
||||||
section.setupForRead(group, fileSize)
|
section.setupForRead(group, fileSize)
|
||||||
|
section.lock.RLock()
|
||||||
|
defer section.lock.RUnlock()
|
||||||
|
|
||||||
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
|
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
|
||||||
visible := x.Value
|
visible := x.Value
|
||||||
|
@ -108,10 +124,10 @@ func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
|
func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
|
||||||
section.lock.Lock()
|
|
||||||
defer section.lock.Unlock()
|
|
||||||
|
|
||||||
section.setupForRead(group, fileSize)
|
section.setupForRead(group, fileSize)
|
||||||
|
section.lock.RLock()
|
||||||
|
defer section.lock.RUnlock()
|
||||||
|
|
||||||
isAfterOffset := false
|
isAfterOffset := false
|
||||||
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
|
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
|
||||||
|
|
|
@ -27,7 +27,7 @@ func (interval *Interval[T]) Size() int64 {
|
||||||
type IntervalList[T IntervalValue] struct {
|
type IntervalList[T IntervalValue] struct {
|
||||||
head *Interval[T]
|
head *Interval[T]
|
||||||
tail *Interval[T]
|
tail *Interval[T]
|
||||||
Lock sync.Mutex
|
Lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIntervalList[T IntervalValue]() *IntervalList[T] {
|
func NewIntervalList[T IntervalValue]() *IntervalList[T] {
|
||||||
|
@ -248,8 +248,8 @@ func (list *IntervalList[T]) overlayInterval(interval *Interval[T]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (list *IntervalList[T]) Len() int {
|
func (list *IntervalList[T]) Len() int {
|
||||||
list.Lock.Lock()
|
list.Lock.RLock()
|
||||||
defer list.Lock.Unlock()
|
defer list.Lock.RUnlock()
|
||||||
|
|
||||||
var count int
|
var count int
|
||||||
for t := list.head; t != nil; t = t.Next {
|
for t := list.head; t != nil; t = t.Next {
|
||||||
|
|
|
@ -106,8 +106,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
|
||||||
|
|
||||||
c.readerPattern.MonitorReadAt(offset, len(p))
|
c.readerPattern.MonitorReadAt(offset, len(p))
|
||||||
|
|
||||||
c.chunkViews.Lock.Lock()
|
c.chunkViews.Lock.RLock()
|
||||||
defer c.chunkViews.Lock.Unlock()
|
defer c.chunkViews.Lock.RUnlock()
|
||||||
|
|
||||||
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
|
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
|
||||||
n, _, err = c.doReadAt(p, offset)
|
n, _, err = c.doReadAt(p, offset)
|
||||||
|
@ -118,8 +118,8 @@ func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, e
|
||||||
|
|
||||||
c.readerPattern.MonitorReadAt(offset, len(p))
|
c.readerPattern.MonitorReadAt(offset, len(p))
|
||||||
|
|
||||||
c.chunkViews.Lock.Lock()
|
c.chunkViews.Lock.RLock()
|
||||||
defer c.chunkViews.Lock.Unlock()
|
defer c.chunkViews.Lock.RUnlock()
|
||||||
|
|
||||||
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
|
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
|
||||||
return c.doReadAt(p, offset)
|
return c.doReadAt(p, offset)
|
||||||
|
|
|
@ -17,7 +17,7 @@ type FileHandle struct {
|
||||||
fh FileHandleId
|
fh FileHandleId
|
||||||
counter int64
|
counter int64
|
||||||
entry *LockedEntry
|
entry *LockedEntry
|
||||||
entryLock sync.Mutex
|
entryLock sync.RWMutex
|
||||||
entryChunkGroup *filer.ChunkGroup
|
entryChunkGroup *filer.ChunkGroup
|
||||||
inode uint64
|
inode uint64
|
||||||
wfs *WFS
|
wfs *WFS
|
||||||
|
@ -28,7 +28,7 @@ type FileHandle struct {
|
||||||
reader *filer.ChunkReadAt
|
reader *filer.ChunkReadAt
|
||||||
contentType string
|
contentType string
|
||||||
handle uint64
|
handle uint64
|
||||||
sync.Mutex
|
sync.RWMutex
|
||||||
|
|
||||||
isDeleted bool
|
isDeleted bool
|
||||||
|
|
||||||
|
@ -103,6 +103,9 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fh *FileHandle) ReleaseHandle() {
|
func (fh *FileHandle) ReleaseHandle() {
|
||||||
|
fh.Lock()
|
||||||
|
defer fh.Unlock()
|
||||||
|
|
||||||
fh.entryLock.Lock()
|
fh.entryLock.Lock()
|
||||||
defer fh.entryLock.Unlock()
|
defer fh.entryLock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -23,8 +23,8 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs in
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
|
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
|
||||||
fh.entryLock.Lock()
|
fh.entryLock.RLock()
|
||||||
defer fh.entryLock.Unlock()
|
defer fh.entryLock.RUnlock()
|
||||||
|
|
||||||
fileFullPath := fh.FullPath()
|
fileFullPath := fh.FullPath()
|
||||||
|
|
||||||
|
|
|
@ -58,12 +58,12 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
|
||||||
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
|
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
|
||||||
|
|
||||||
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
|
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
|
||||||
fh.entryLock.Lock()
|
fh.entryLock.RLock()
|
||||||
if entry := fh.GetEntry(); entry != nil {
|
if entry := fh.GetEntry(); entry != nil {
|
||||||
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry))
|
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry))
|
||||||
localEntry = filer.FromPbEntry(string(dirPath), entry)
|
localEntry = filer.FromPbEntry(string(dirPath), entry)
|
||||||
}
|
}
|
||||||
fh.entryLock.Unlock()
|
fh.entryLock.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
wfs.outputFilerEntry(out, inode, localEntry)
|
wfs.outputFilerEntry(out, inode, localEntry)
|
||||||
|
|
|
@ -35,10 +35,10 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock the file until the proper offset was calculated
|
// lock the file until the proper offset was calculated
|
||||||
fh.Lock()
|
fh.RLock()
|
||||||
defer fh.Unlock()
|
defer fh.RUnlock()
|
||||||
fh.entryLock.Lock()
|
fh.entryLock.RLock()
|
||||||
defer fh.entryLock.Unlock()
|
defer fh.entryLock.RUnlock()
|
||||||
|
|
||||||
fileSize := int64(filer.FileSize(fh.GetEntry()))
|
fileSize := int64(filer.FileSize(fh.GetEntry()))
|
||||||
offset := max(int64(in.Offset), 0)
|
offset := max(int64(in.Offset), 0)
|
||||||
|
|
|
@ -41,8 +41,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
|
||||||
return nil, fuse.ENOENT
|
return nil, fuse.ENOENT
|
||||||
}
|
}
|
||||||
|
|
||||||
fh.Lock()
|
fh.RLock()
|
||||||
defer fh.Unlock()
|
defer fh.RUnlock()
|
||||||
|
|
||||||
offset := int64(in.Offset)
|
offset := int64(in.Offset)
|
||||||
totalRead, err := readDataByFileHandle(buff, fh, offset)
|
totalRead, err := readDataByFileHandle(buff, fh, offset)
|
||||||
|
|
Loading…
Reference in a new issue