diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 383986f23..7aa1016d7 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -253,7 +253,7 @@ func (file *File) Forget() { } func (file *File) maybeLoadEntry(ctx context.Context) error { - if file.isOpen > 0 { + if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 { return nil } entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 81b2d77c3..4b282253d 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -21,13 +21,15 @@ type MetaCache struct { sync.RWMutex visitedBoundary *bounded_tree.BoundedTree uidGidMapper *UidGidMapper + invalidateFunc func(util.FullPath) } -func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache { +func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache { return &MetaCache{ localStore: openMetaStore(dbFolder), visitedBoundary: bounded_tree.NewBoundedTree(baseDir), uidGidMapper: uidGidMapper, + invalidateFunc: invalidateFunc, } } @@ -70,7 +72,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti // skip the unnecessary deletion // leave the update to the following InsertEntry operation } else { - glog.V(3).Infof("DeleteEntry %s/%s", oldPath,oldPath.Name()) + glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name()) if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { return err } @@ -83,7 +85,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti if newEntry != nil { newDir, _ := newEntry.DirAndName() if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { - glog.V(3).Infof("InsertEntry %s/%s", newDir,newEntry.Name()) + glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name()) if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil { return err } diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index 9b72cadcf..f9973f436 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -23,15 +23,15 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } } + dir := resp.Directory var oldPath util.FullPath var newEntry *filer.Entry if message.OldEntry != nil { - oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name) + oldPath = util.NewFullPath(dir, message.OldEntry.Name) glog.V(4).Infof("deleting %v", oldPath) } if message.NewEntry != nil { - dir := resp.Directory if message.NewParentPath != "" { dir = message.NewParentPath } @@ -39,7 +39,14 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil glog.V(4).Infof("creating %v", key) newEntry = filer.FromPbEntry(dir, message.NewEntry) } - return mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) + err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) + if err == nil && message.OldEntry != nil && message.NewEntry != nil { + key := util.NewFullPath(dir, message.NewEntry.Name) + mc.invalidateFunc(key) + } + + return err + } for { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 912be3c05..759e21b15 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -92,7 +92,14 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) } - wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper) + wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) { + fsNode := wfs.fsNodeCache.GetFsNode(filePath) + if fsNode != nil { + if file, ok := fsNode.(*File); ok { + file.entry = nil + } + } + }) startTime := time.Now() go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) grace.OnInterrupt(func() {