mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
mount: invalide file cache when metadata is changed
This commit is contained in:
parent
e219c57849
commit
e71463a9eb
|
@ -253,7 +253,7 @@ func (file *File) Forget() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) maybeLoadEntry(ctx context.Context) error {
|
func (file *File) maybeLoadEntry(ctx context.Context) error {
|
||||||
if file.isOpen > 0 {
|
if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
|
entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
|
||||||
|
|
|
@ -21,13 +21,15 @@ type MetaCache struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
visitedBoundary *bounded_tree.BoundedTree
|
visitedBoundary *bounded_tree.BoundedTree
|
||||||
uidGidMapper *UidGidMapper
|
uidGidMapper *UidGidMapper
|
||||||
|
invalidateFunc func(util.FullPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache {
|
func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache {
|
||||||
return &MetaCache{
|
return &MetaCache{
|
||||||
localStore: openMetaStore(dbFolder),
|
localStore: openMetaStore(dbFolder),
|
||||||
visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
|
visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
|
||||||
uidGidMapper: uidGidMapper,
|
uidGidMapper: uidGidMapper,
|
||||||
|
invalidateFunc: invalidateFunc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,7 +72,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
|
||||||
// skip the unnecessary deletion
|
// skip the unnecessary deletion
|
||||||
// leave the update to the following InsertEntry operation
|
// leave the update to the following InsertEntry operation
|
||||||
} else {
|
} else {
|
||||||
glog.V(3).Infof("DeleteEntry %s/%s", oldPath,oldPath.Name())
|
glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name())
|
||||||
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
|
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -83,7 +85,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
|
||||||
if newEntry != nil {
|
if newEntry != nil {
|
||||||
newDir, _ := newEntry.DirAndName()
|
newDir, _ := newEntry.DirAndName()
|
||||||
if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
|
if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
|
||||||
glog.V(3).Infof("InsertEntry %s/%s", newDir,newEntry.Name())
|
glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
|
||||||
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
|
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,15 +23,15 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dir := resp.Directory
|
||||||
var oldPath util.FullPath
|
var oldPath util.FullPath
|
||||||
var newEntry *filer.Entry
|
var newEntry *filer.Entry
|
||||||
if message.OldEntry != nil {
|
if message.OldEntry != nil {
|
||||||
oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
oldPath = util.NewFullPath(dir, message.OldEntry.Name)
|
||||||
glog.V(4).Infof("deleting %v", oldPath)
|
glog.V(4).Infof("deleting %v", oldPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
if message.NewEntry != nil {
|
if message.NewEntry != nil {
|
||||||
dir := resp.Directory
|
|
||||||
if message.NewParentPath != "" {
|
if message.NewParentPath != "" {
|
||||||
dir = message.NewParentPath
|
dir = message.NewParentPath
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,14 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
||||||
glog.V(4).Infof("creating %v", key)
|
glog.V(4).Infof("creating %v", key)
|
||||||
newEntry = filer.FromPbEntry(dir, message.NewEntry)
|
newEntry = filer.FromPbEntry(dir, message.NewEntry)
|
||||||
}
|
}
|
||||||
return mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
|
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
|
||||||
|
if err == nil && message.OldEntry != nil && message.NewEntry != nil {
|
||||||
|
key := util.NewFullPath(dir, message.NewEntry.Name)
|
||||||
|
mc.invalidateFunc(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -92,7 +92,14 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
||||||
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
|
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
|
||||||
}
|
}
|
||||||
|
|
||||||
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper)
|
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
|
||||||
|
fsNode := wfs.fsNodeCache.GetFsNode(filePath)
|
||||||
|
if fsNode != nil {
|
||||||
|
if file, ok := fsNode.(*File); ok {
|
||||||
|
file.entry = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
|
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
|
||||||
grace.OnInterrupt(func() {
|
grace.OnInterrupt(func() {
|
||||||
|
|
Loading…
Reference in a new issue