mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
d4566d4aaa
* compare chunks by timestamp
* fix slab clearing error
* fix test compilation
* move oldest chunk to sealed, instead of by fullness
* lock on fh.entryViewCache
* remove verbose logs
* revert slat clearing
* less logs
* less logs
* track write and read by timestamp
* remove useless logic
* add entry lock on file handle release
* use mem chunk only, swap file chunk has problems
* comment out code that maybe used later
* add debug mode to compare data read and write
* more efficient readResolvedChunks with linked list
* small optimization
* fix test compilation
* minor fix on writer
* add SeparateGarbageChunks
* group chunks into sections
* turn off debug mode
* fix tests
* fix tests
* tmp enable swap file chunk
* Revert "tmp enable swap file chunk"
This reverts commit 985137ec47
.
* simple refactoring
* simple refactoring
* do not re-use swap file chunk. Sealed chunks should not be re-used.
* comment out debugging facilities
* either mem chunk or swap file chunk is fine now
* remove orderedMutex as *semaphore.Weighted
not found impactful
* optimize size calculation for changing large files
* optimize performance to avoid going through the long list of chunks
* still problems with swap file chunk
* rename
* tiny optimization
* swap file chunk save only successfully read data
* fix
* enable both mem and swap file chunk
* resolve chunks with range
* rename
* fix chunk interval list
* also change file handle chunk group when adding chunks
* pick in-active chunk with time-decayed counter
* fix compilation
* avoid nil with empty fh.entry
* refactoring
* rename
* rename
* refactor visible intervals to *list.List
* refactor chunkViews to *list.List
* add IntervalList for generic interval list
* change visible interval to use IntervalList in generics
* cahnge chunkViews to *IntervalList[*ChunkView]
* use NewFileChunkSection to create
* rename variables
* refactor
* fix renaming leftover
* renaming
* renaming
* add insert interval
* interval list adds lock
* incrementally add chunks to readers
Fixes:
1. set start and stop offset for the value object
2. clone the value object
3. use pointer instead of copy-by-value when passing to interval.Value
4. use insert interval since adding chunk could be out of order
* fix tests compilation
* fix tests compilation
218 lines
4.9 KiB
Go
218 lines
4.9 KiB
Go
package filer
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
)
|
|
|
|
type ReaderCache struct {
|
|
chunkCache chunk_cache.ChunkCache
|
|
lookupFileIdFn wdclient.LookupFileIdFunctionType
|
|
sync.Mutex
|
|
downloaders map[string]*SingleChunkCacher
|
|
limit int
|
|
}
|
|
|
|
type SingleChunkCacher struct {
|
|
sync.Mutex
|
|
parent *ReaderCache
|
|
chunkFileId string
|
|
data []byte
|
|
err error
|
|
cipherKey []byte
|
|
isGzipped bool
|
|
chunkSize int
|
|
shouldCache bool
|
|
wg sync.WaitGroup
|
|
cacheStartedCh chan struct{}
|
|
completedTimeNew int64
|
|
}
|
|
|
|
func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
|
|
return &ReaderCache{
|
|
limit: limit,
|
|
chunkCache: chunkCache,
|
|
lookupFileIdFn: lookupFileIdFn,
|
|
downloaders: make(map[string]*SingleChunkCacher),
|
|
}
|
|
}
|
|
|
|
func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
|
|
if rc.lookupFileIdFn == nil {
|
|
return
|
|
}
|
|
|
|
rc.Lock()
|
|
defer rc.Unlock()
|
|
|
|
if len(rc.downloaders) >= rc.limit {
|
|
return
|
|
}
|
|
|
|
for x := chunkViews; x != nil; x = x.Next {
|
|
chunkView := x.Value
|
|
if _, found := rc.downloaders[chunkView.FileId]; found {
|
|
continue
|
|
}
|
|
|
|
if len(rc.downloaders) >= rc.limit {
|
|
// abort when slots are filled
|
|
return
|
|
}
|
|
|
|
// glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
|
|
// cache this chunk if not yet
|
|
cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
|
|
go cacher.startCaching()
|
|
<-cacher.cacheStartedCh
|
|
rc.downloaders[chunkView.FileId] = cacher
|
|
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
|
|
rc.Lock()
|
|
|
|
if cacher, found := rc.downloaders[fileId]; found {
|
|
if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
|
|
rc.Unlock()
|
|
return n, err
|
|
}
|
|
}
|
|
if shouldCache || rc.lookupFileIdFn == nil {
|
|
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
|
|
if n > 0 {
|
|
rc.Unlock()
|
|
return n, err
|
|
}
|
|
}
|
|
|
|
// clean up old downloaders
|
|
if len(rc.downloaders) >= rc.limit {
|
|
oldestFid, oldestTime := "", time.Now().UnixNano()
|
|
for fid, downloader := range rc.downloaders {
|
|
completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
|
|
if completedTime > 0 && completedTime < oldestTime {
|
|
oldestFid, oldestTime = fid, completedTime
|
|
}
|
|
}
|
|
if oldestFid != "" {
|
|
oldDownloader := rc.downloaders[oldestFid]
|
|
delete(rc.downloaders, oldestFid)
|
|
oldDownloader.destroy()
|
|
}
|
|
}
|
|
|
|
// glog.V(4).Infof("cache1 %s", fileId)
|
|
|
|
cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
|
|
go cacher.startCaching()
|
|
<-cacher.cacheStartedCh
|
|
rc.downloaders[fileId] = cacher
|
|
rc.Unlock()
|
|
|
|
return cacher.readChunkAt(buffer, offset)
|
|
}
|
|
|
|
func (rc *ReaderCache) UnCache(fileId string) {
|
|
rc.Lock()
|
|
defer rc.Unlock()
|
|
// glog.V(4).Infof("uncache %s", fileId)
|
|
if downloader, found := rc.downloaders[fileId]; found {
|
|
downloader.destroy()
|
|
delete(rc.downloaders, fileId)
|
|
}
|
|
}
|
|
|
|
func (rc *ReaderCache) destroy() {
|
|
rc.Lock()
|
|
defer rc.Unlock()
|
|
|
|
for _, downloader := range rc.downloaders {
|
|
downloader.destroy()
|
|
}
|
|
|
|
}
|
|
|
|
func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
|
|
return &SingleChunkCacher{
|
|
parent: parent,
|
|
chunkFileId: fileId,
|
|
cipherKey: cipherKey,
|
|
isGzipped: isGzipped,
|
|
chunkSize: chunkSize,
|
|
shouldCache: shouldCache,
|
|
cacheStartedCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (s *SingleChunkCacher) startCaching() {
|
|
s.wg.Add(1)
|
|
defer s.wg.Done()
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
s.cacheStartedCh <- struct{}{} // means this has been started
|
|
|
|
urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
|
|
if err != nil {
|
|
s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
|
|
return
|
|
}
|
|
|
|
s.data = mem.Allocate(s.chunkSize)
|
|
|
|
_, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
|
|
if s.err != nil {
|
|
mem.Free(s.data)
|
|
s.data = nil
|
|
return
|
|
}
|
|
|
|
if s.shouldCache {
|
|
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
|
|
}
|
|
atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
|
|
|
|
return
|
|
}
|
|
|
|
func (s *SingleChunkCacher) destroy() {
|
|
// wait for all reads to finish before destroying the data
|
|
s.wg.Wait()
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
if s.data != nil {
|
|
mem.Free(s.data)
|
|
s.data = nil
|
|
close(s.cacheStartedCh)
|
|
}
|
|
}
|
|
|
|
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
|
|
s.wg.Add(1)
|
|
defer s.wg.Done()
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
if s.err != nil {
|
|
return 0, s.err
|
|
}
|
|
|
|
if len(s.data) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
return copy(buf, s.data[offset:]), nil
|
|
|
|
}
|