mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
invalidate filehandle entry view cache
This commit is contained in:
parent
1734017ba1
commit
0ba88596e8
|
@ -11,7 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type MemoryChunkPages struct {
|
type MemoryChunkPages struct {
|
||||||
f *File
|
fh *FileHandle
|
||||||
writeWaitGroup sync.WaitGroup
|
writeWaitGroup sync.WaitGroup
|
||||||
chunkAddLock sync.Mutex
|
chunkAddLock sync.Mutex
|
||||||
lastErr error
|
lastErr error
|
||||||
|
@ -21,14 +21,14 @@ type MemoryChunkPages struct {
|
||||||
hasWrites bool
|
hasWrites bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages {
|
func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages {
|
||||||
|
|
||||||
dirtyPages := &MemoryChunkPages{
|
dirtyPages := &MemoryChunkPages{
|
||||||
f: file,
|
fh: fh,
|
||||||
}
|
}
|
||||||
|
|
||||||
dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(
|
dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(
|
||||||
file.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage)
|
fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage)
|
||||||
|
|
||||||
return dirtyPages
|
return dirtyPages
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages {
|
||||||
func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) {
|
func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) {
|
||||||
pages.hasWrites = true
|
pages.hasWrites = true
|
||||||
|
|
||||||
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data)))
|
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.f.fullpath(), offset, offset+int64(len(data)))
|
||||||
pages.uploadPipeline.SaveDataAt(data, offset)
|
pages.uploadPipeline.SaveDataAt(data, offset)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -79,23 +79,24 @@ func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader,
|
||||||
defer pages.writeWaitGroup.Done()
|
defer pages.writeWaitGroup.Done()
|
||||||
defer cleanupFn()
|
defer cleanupFn()
|
||||||
|
|
||||||
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
|
chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
|
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err)
|
||||||
pages.lastErr = err
|
pages.lastErr = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
chunk.Mtime = mtime
|
chunk.Mtime = mtime
|
||||||
pages.collection, pages.replication = collection, replication
|
pages.collection, pages.replication = collection, replication
|
||||||
pages.chunkAddLock.Lock()
|
pages.chunkAddLock.Lock()
|
||||||
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
|
pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk})
|
||||||
glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
|
pages.fh.entryViewCache = nil
|
||||||
|
glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size)
|
||||||
pages.chunkAddLock.Unlock()
|
pages.chunkAddLock.Unlock()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if pages.f.wfs.concurrentWriters != nil {
|
if pages.fh.f.wfs.concurrentWriters != nil {
|
||||||
pages.f.wfs.concurrentWriters.Execute(writer)
|
pages.fh.f.wfs.concurrentWriters.Execute(writer)
|
||||||
} else {
|
} else {
|
||||||
go writer()
|
go writer()
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,12 +36,12 @@ type FileHandle struct {
|
||||||
|
|
||||||
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
|
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
|
||||||
fh := &FileHandle{
|
fh := &FileHandle{
|
||||||
f: file,
|
f: file,
|
||||||
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
|
Uid: uid,
|
||||||
dirtyPages: newPageWriter(file, file.wfs.option.ChunkSizeLimit),
|
Gid: gid,
|
||||||
Uid: uid,
|
|
||||||
Gid: gid,
|
|
||||||
}
|
}
|
||||||
|
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
|
||||||
|
fh.dirtyPages = newPageWriter(fh, file.wfs.option.ChunkSizeLimit)
|
||||||
entry := fh.f.getEntry()
|
entry := fh.f.getEntry()
|
||||||
if entry != nil {
|
if entry != nil {
|
||||||
entry.Attributes.FileSize = filer.FileSize(entry)
|
entry.Attributes.FileSize = filer.FileSize(entry)
|
||||||
|
@ -149,6 +149,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
||||||
reader := fh.reader
|
reader := fh.reader
|
||||||
if reader == nil {
|
if reader == nil {
|
||||||
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64)
|
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64)
|
||||||
|
glog.V(4).Infof("file handle read %s [%d,%d) from %+v", fileFullPath, offset, offset+int64(len(buff)), chunkViews)
|
||||||
reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
|
reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
|
||||||
}
|
}
|
||||||
fh.reader = reader
|
fh.reader = reader
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type PageWriter struct {
|
type PageWriter struct {
|
||||||
f *File
|
fh *FileHandle
|
||||||
collection string
|
collection string
|
||||||
replication string
|
replication string
|
||||||
chunkSize int64
|
chunkSize int64
|
||||||
|
@ -20,19 +20,19 @@ var (
|
||||||
_ = page_writer.DirtyPages(&PageWriter{})
|
_ = page_writer.DirtyPages(&PageWriter{})
|
||||||
)
|
)
|
||||||
|
|
||||||
func newPageWriter(file *File, chunkSize int64) *PageWriter {
|
func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
|
||||||
pw := &PageWriter{
|
pw := &PageWriter{
|
||||||
f: file,
|
fh: fh,
|
||||||
chunkSize: chunkSize,
|
chunkSize: chunkSize,
|
||||||
writerPattern: NewWriterPattern(chunkSize),
|
writerPattern: NewWriterPattern(chunkSize),
|
||||||
randomWriter: newMemoryChunkPages(file, chunkSize),
|
randomWriter: newMemoryChunkPages(fh, chunkSize),
|
||||||
}
|
}
|
||||||
return pw
|
return pw
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *PageWriter) AddPage(offset int64, data []byte) {
|
func (pw *PageWriter) AddPage(offset int64, data []byte) {
|
||||||
|
|
||||||
glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode())
|
glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.fh.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode())
|
||||||
|
|
||||||
chunkIndex := offset / pw.chunkSize
|
chunkIndex := offset / pw.chunkSize
|
||||||
for i := chunkIndex; len(data) > 0; i++ {
|
for i := chunkIndex; len(data) > 0; i++ {
|
||||||
|
@ -64,7 +64,7 @@ func (pw *PageWriter) FlushData() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
|
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
|
||||||
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data)))
|
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.f.fullpath(), offset, offset+int64(len(data)))
|
||||||
|
|
||||||
chunkIndex := offset / pw.chunkSize
|
chunkIndex := offset / pw.chunkSize
|
||||||
for i := chunkIndex; len(data) > 0; i++ {
|
for i := chunkIndex; len(data) > 0; i++ {
|
||||||
|
|
Loading…
Reference in a new issue