mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Fix data loss: add lock for metacache (#4664)
Co-authored-by: wang wusong <wangwusong@virtaitech.com>
This commit is contained in:
parent
89542db2d0
commit
0cb9ddd8ec
|
@ -3,6 +3,7 @@ package meta_cache
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
|
"github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
|
||||||
|
@ -17,7 +18,7 @@ import (
|
||||||
type MetaCache struct {
|
type MetaCache struct {
|
||||||
root util.FullPath
|
root util.FullPath
|
||||||
localStore filer.VirtualFilerStore
|
localStore filer.VirtualFilerStore
|
||||||
// sync.RWMutex
|
sync.RWMutex
|
||||||
uidGidMapper *UidGidMapper
|
uidGidMapper *UidGidMapper
|
||||||
markCachedFn func(fullpath util.FullPath)
|
markCachedFn func(fullpath util.FullPath)
|
||||||
isCachedFn func(fullpath util.FullPath) bool
|
isCachedFn func(fullpath util.FullPath) bool
|
||||||
|
@ -57,8 +58,8 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
|
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
|
||||||
//mc.Lock()
|
mc.Lock()
|
||||||
//defer mc.Unlock()
|
defer mc.Unlock()
|
||||||
return mc.doInsertEntry(ctx, entry)
|
return mc.doInsertEntry(ctx, entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,10 +68,10 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
|
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
|
||||||
//mc.Lock()
|
mc.Lock()
|
||||||
//defer mc.Unlock()
|
defer mc.Unlock()
|
||||||
|
|
||||||
entry, err := mc.FindEntry(ctx, oldPath)
|
entry, err := mc.localStore.FindEntry(ctx, oldPath)
|
||||||
if err != nil && err != filer_pb.ErrNotFound {
|
if err != nil && err != filer_pb.ErrNotFound {
|
||||||
glog.Errorf("Metacache: find entry error: %v", err)
|
glog.Errorf("Metacache: find entry error: %v", err)
|
||||||
return err
|
return err
|
||||||
|
@ -105,14 +106,14 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
|
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
|
||||||
//mc.Lock()
|
mc.Lock()
|
||||||
//defer mc.Unlock()
|
defer mc.Unlock()
|
||||||
return mc.localStore.UpdateEntry(ctx, entry)
|
return mc.localStore.UpdateEntry(ctx, entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
|
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
|
||||||
//mc.RLock()
|
mc.RLock()
|
||||||
//defer mc.RUnlock()
|
defer mc.RUnlock()
|
||||||
entry, err = mc.localStore.FindEntry(ctx, fp)
|
entry, err = mc.localStore.FindEntry(ctx, fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -122,19 +123,19 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
|
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
|
||||||
//mc.Lock()
|
mc.Lock()
|
||||||
//defer mc.Unlock()
|
defer mc.Unlock()
|
||||||
return mc.localStore.DeleteEntry(ctx, fp)
|
return mc.localStore.DeleteEntry(ctx, fp)
|
||||||
}
|
}
|
||||||
func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
|
func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
|
||||||
//mc.Lock()
|
mc.Lock()
|
||||||
//defer mc.Unlock()
|
defer mc.Unlock()
|
||||||
return mc.localStore.DeleteFolderChildren(ctx, fp)
|
return mc.localStore.DeleteFolderChildren(ctx, fp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
|
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
|
||||||
//mc.RLock()
|
mc.RLock()
|
||||||
//defer mc.RUnlock()
|
defer mc.RUnlock()
|
||||||
|
|
||||||
if !mc.isCachedFn(dirPath) {
|
if !mc.isCachedFn(dirPath) {
|
||||||
// if this request comes after renaming, it should be fine
|
// if this request comes after renaming, it should be fine
|
||||||
|
@ -152,8 +153,8 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MetaCache) Shutdown() {
|
func (mc *MetaCache) Shutdown() {
|
||||||
//mc.Lock()
|
mc.Lock()
|
||||||
//defer mc.Unlock()
|
defer mc.Unlock()
|
||||||
mc.localStore.Shutdown()
|
mc.localStore.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue