mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge branch 'master' of https://github.com/seaweedfs/seaweedfs
This commit is contained in:
commit
8dae81c5ed
|
@ -3,6 +3,7 @@ package filer
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
|
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
|
||||||
|
@ -20,17 +21,17 @@ type ReaderCache struct {
|
||||||
|
|
||||||
type SingleChunkCacher struct {
|
type SingleChunkCacher struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
parent *ReaderCache
|
parent *ReaderCache
|
||||||
chunkFileId string
|
chunkFileId string
|
||||||
data []byte
|
data []byte
|
||||||
err error
|
err error
|
||||||
cipherKey []byte
|
cipherKey []byte
|
||||||
isGzipped bool
|
isGzipped bool
|
||||||
chunkSize int
|
chunkSize int
|
||||||
shouldCache bool
|
shouldCache bool
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
cacheStartedCh chan struct{}
|
cacheStartedCh chan struct{}
|
||||||
completedTime time.Time
|
completedTimeNew int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
|
func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
|
||||||
|
@ -50,13 +51,17 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
|
||||||
rc.Lock()
|
rc.Lock()
|
||||||
defer rc.Unlock()
|
defer rc.Unlock()
|
||||||
|
|
||||||
|
if len(rc.downloaders) >= rc.limit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for _, chunkView := range chunkViews {
|
for _, chunkView := range chunkViews {
|
||||||
if _, found := rc.downloaders[chunkView.FileId]; found {
|
if _, found := rc.downloaders[chunkView.FileId]; found {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rc.downloaders) >= rc.limit {
|
if len(rc.downloaders) >= rc.limit {
|
||||||
// if still no slots, return
|
// abort when slots are filled
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,27 +79,28 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
|
||||||
|
|
||||||
func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
|
func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
|
||||||
rc.Lock()
|
rc.Lock()
|
||||||
defer rc.Unlock()
|
|
||||||
if cacher, found := rc.downloaders[fileId]; found {
|
if cacher, found := rc.downloaders[fileId]; found {
|
||||||
if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
|
if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
|
||||||
|
rc.Unlock()
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if shouldCache || rc.lookupFileIdFn == nil {
|
if shouldCache || rc.lookupFileIdFn == nil {
|
||||||
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
|
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
|
rc.Unlock()
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// clean up old downloaders
|
// clean up old downloaders
|
||||||
if len(rc.downloaders) >= rc.limit {
|
if len(rc.downloaders) >= rc.limit {
|
||||||
oldestFid, oldestTime := "", time.Now()
|
oldestFid, oldestTime := "", time.Now().Unix()
|
||||||
for fid, downloader := range rc.downloaders {
|
for fid, downloader := range rc.downloaders {
|
||||||
if !downloader.completedTime.IsZero() {
|
completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
|
||||||
if downloader.completedTime.Before(oldestTime) {
|
if completedTime > 0 && completedTime < oldestTime {
|
||||||
oldestFid, oldestTime = fid, downloader.completedTime
|
oldestFid, oldestTime = fid, completedTime
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if oldestFid != "" {
|
if oldestFid != "" {
|
||||||
|
@ -110,6 +116,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
|
||||||
go cacher.startCaching()
|
go cacher.startCaching()
|
||||||
<-cacher.cacheStartedCh
|
<-cacher.cacheStartedCh
|
||||||
rc.downloaders[fileId] = cacher
|
rc.downloaders[fileId] = cacher
|
||||||
|
rc.Unlock()
|
||||||
|
|
||||||
return cacher.readChunkAt(buffer, offset)
|
return cacher.readChunkAt(buffer, offset)
|
||||||
}
|
}
|
||||||
|
@ -172,7 +179,7 @@ func (s *SingleChunkCacher) startCaching() {
|
||||||
if s.shouldCache {
|
if s.shouldCache {
|
||||||
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
|
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
|
||||||
}
|
}
|
||||||
s.completedTime = time.Now()
|
atomic.StoreInt64(&s.completedTimeNew, time.Now().Unix())
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
package filer
|
package filer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
type ReaderPattern struct {
|
type ReaderPattern struct {
|
||||||
isSequentialCounter int64
|
isSequentialCounter int64
|
||||||
lastReadStopOffset int64
|
lastReadStopOffset int64
|
||||||
|
@ -18,18 +22,20 @@ func NewReaderPattern() *ReaderPattern {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
|
func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
|
||||||
if rp.lastReadStopOffset == offset {
|
lastOffset := atomic.SwapInt64(&rp.lastReadStopOffset, offset+int64(size))
|
||||||
if rp.isSequentialCounter < ModeChangeLimit {
|
counter := atomic.LoadInt64(&rp.isSequentialCounter)
|
||||||
rp.isSequentialCounter++
|
|
||||||
|
if lastOffset == offset {
|
||||||
|
if counter < ModeChangeLimit {
|
||||||
|
atomic.AddInt64(&rp.isSequentialCounter, 1)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if rp.isSequentialCounter > -ModeChangeLimit {
|
if counter > -ModeChangeLimit {
|
||||||
rp.isSequentialCounter--
|
atomic.AddInt64(&rp.isSequentialCounter, -1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rp.lastReadStopOffset = offset + int64(size)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *ReaderPattern) IsRandomMode() bool {
|
func (rp *ReaderPattern) IsRandomMode() bool {
|
||||||
return rp.isSequentialCounter < 0
|
return atomic.LoadInt64(&rp.isSequentialCounter) < 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
package mount
|
package mount
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
"golang.org/x/exp/slices"
|
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileHandleId uint64
|
type FileHandleId uint64
|
||||||
|
@ -57,12 +59,20 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry {
|
||||||
defer fh.entryLock.Unlock()
|
defer fh.entryLock.Unlock()
|
||||||
return fh.entry
|
return fh.entry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
|
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
|
||||||
fh.entryLock.Lock()
|
fh.entryLock.Lock()
|
||||||
defer fh.entryLock.Unlock()
|
defer fh.entryLock.Unlock()
|
||||||
fh.entry = entry
|
fh.entry = entry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
|
||||||
|
fh.entryLock.Lock()
|
||||||
|
defer fh.entryLock.Unlock()
|
||||||
|
fn(fh.entry)
|
||||||
|
return fh.entry
|
||||||
|
}
|
||||||
|
|
||||||
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
|
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
|
||||||
fh.entryLock.Lock()
|
fh.entryLock.Lock()
|
||||||
defer fh.entryLock.Unlock()
|
defer fh.entryLock.Unlock()
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
package mount
|
package mount
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileHandleToInode struct {
|
type FileHandleToInode struct {
|
||||||
|
@ -49,7 +50,9 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil
|
||||||
} else {
|
} else {
|
||||||
fh.counter++
|
fh.counter++
|
||||||
}
|
}
|
||||||
fh.entry = entry
|
if fh.entry != entry {
|
||||||
|
fh.SetEntry(entry)
|
||||||
|
}
|
||||||
return fh
|
return fh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -135,10 +135,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
|
||||||
}
|
}
|
||||||
var found bool
|
var found bool
|
||||||
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
|
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
|
||||||
entry = fh.GetEntry()
|
entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
|
||||||
if entry != nil && fh.entry.Attributes == nil {
|
if entry != nil && fh.entry.Attributes == nil {
|
||||||
entry.Attributes = &filer_pb.FuseAttributes{}
|
entry.Attributes = &filer_pb.FuseAttributes{}
|
||||||
}
|
}
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
entry, status = wfs.maybeLoadEntry(path)
|
entry, status = wfs.maybeLoadEntry(path)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,9 @@ package mount
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/hanwen/go-fuse/v2/fuse"
|
"github.com/hanwen/go-fuse/v2/fuse"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
|
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
|
||||||
|
@ -55,9 +57,13 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
|
||||||
|
|
||||||
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
|
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
|
||||||
|
|
||||||
if fh, found := wfs.fhmap.FindFileHandle(inode); found && fh.entry != nil {
|
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
|
||||||
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry))
|
fh.entryLock.Lock()
|
||||||
localEntry = filer.FromPbEntry(string(dirPath), fh.entry)
|
if fh.entry != nil {
|
||||||
|
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry))
|
||||||
|
localEntry = filer.FromPbEntry(string(dirPath), fh.entry)
|
||||||
|
}
|
||||||
|
fh.entryLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
wfs.outputFilerEntry(out, inode, localEntry)
|
wfs.outputFilerEntry(out, inode, localEntry)
|
||||||
|
|
Loading…
Reference in a new issue