filer: avoid duplicated FindEntry for deletion

This commit is contained in:
Chris Lu 2020-11-26 11:14:56 -08:00
parent 1ae108efca
commit 513bcd6e0d
3 changed files with 21 additions and 3 deletions

View file

@ -269,7 +269,7 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
entry, err = f.Store.FindEntry(ctx, p) entry, err = f.Store.FindEntry(ctx, p)
if entry != nil && entry.TtlSec > 0 { if entry != nil && entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteEntry(ctx, p.Child(entry.Name())) f.Store.DeleteOneEntry(ctx, entry)
return nil, filer_pb.ErrNotFound return nil, filer_pb.ErrNotFound
} }
} }
@ -303,7 +303,7 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
lastFileName = entry.Name() lastFileName = entry.Name()
if entry.TtlSec > 0 { if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteEntry(ctx, p.Child(entry.Name())) f.Store.DeleteOneEntry(ctx, entry)
expiredCount++ expiredCount++
continue continue
} }

View file

@ -122,7 +122,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %v", storeDeletionErr) return fmt.Errorf("filer store delete: %v", storeDeletionErr)
} }
if !entry.IsDirectory() { if !entry.IsDirectory() {

View file

@ -45,6 +45,7 @@ type FilerStore interface {
type VirtualFilerStore interface { type VirtualFilerStore interface {
FilerStore FilerStore
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
DeleteOneEntry(ctx context.Context, entry *Entry) error
} }
type FilerStoreWrapper struct { type FilerStoreWrapper struct {
@ -145,6 +146,23 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath)
return fsw.ActualStore.DeleteEntry(ctx, fp) return fsw.ActualStore.DeleteEntry(ctx, fp)
} }
func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
}()
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc() stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now() start := time.Now()