package meta_cache import ( "context" "os" "sync" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" ) // need to have logic similar to FilerStoreWrapper // e.g. fill fileId field for chunks type MetaCache struct { actualStore filer2.FilerStore sync.RWMutex visitedBoundary *bounded_tree.BoundedTree } func NewMetaCache(dbFolder string) *MetaCache { return &MetaCache{ actualStore: openMetaStore(dbFolder), visitedBoundary: bounded_tree.NewBoundedTree(), } } func openMetaStore(dbFolder string) filer2.FilerStore { os.RemoveAll(dbFolder) os.MkdirAll(dbFolder, 0755) store := &leveldb.LevelDBStore{} config := &cacheConfig{ dir: dbFolder, } if err := store.Initialize(config, ""); err != nil { glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) } return store } func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error { mc.Lock() defer mc.Unlock() filer_pb.BeforeEntrySerialization(entry.Chunks) return mc.actualStore.InsertEntry(ctx, entry) } func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error { mc.Lock() defer mc.Unlock() oldDir, _ := oldPath.DirAndName() if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { if oldPath != "" { if newEntry != nil && oldPath == newEntry.FullPath { // skip the unnecessary deletion // leave the update to the following InsertEntry operation } else { if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil { return err } } } } else { // println("unknown old directory:", oldDir) } if newEntry != nil { newDir, _ := newEntry.DirAndName() if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil { return err } } } return nil } func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error { mc.Lock() defer mc.Unlock() filer_pb.BeforeEntrySerialization(entry.Chunks) return mc.actualStore.UpdateEntry(ctx, entry) } func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) { mc.RLock() defer mc.RUnlock() entry, err = mc.actualStore.FindEntry(ctx, fp) if err != nil { return nil, err } filer_pb.AfterEntryDeserialization(entry.Chunks) return } func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { mc.Lock() defer mc.Unlock() return mc.actualStore.DeleteEntry(ctx, fp) } func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) { mc.RLock() defer mc.RUnlock() entries, err := mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) if err != nil { return nil, err } for _, entry := range entries { filer_pb.AfterEntryDeserialization(entry.Chunks) } return entries, err } func (mc *MetaCache) Shutdown() { mc.Lock() defer mc.Unlock() mc.actualStore.Shutdown() }