From 303bd067b568dc4834753fe54b17365757ef5406 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 31 Jul 2022 22:51:41 -0700 Subject: [PATCH] Revert "rename: delete source entry metadata only, skipping hard links" This reverts commit 03466f955e7907f5e7442dd3e60c45a3ab261ea3. fix https://github.com/seaweedfs/seaweedfs/issues/3386 --- weed/filer/filer_delete_entry.go | 6 +----- weed/filer/filerstore_wrapper.go | 13 ------------- weed/mount/meta_cache/meta_cache.go | 18 +++--------------- weed/mount/meta_cache/meta_cache_subscribe.go | 2 +- weed/mount/weedfs_rename.go | 4 ++-- 5 files changed, 7 insertions(+), 36 deletions(-) diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 169d77d7b..2ea20ea64 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -122,11 +122,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) - if !entry.IsDirectory() && !shouldDeleteChunks { - if storeDeletionErr := f.Store.DeleteOneEntrySkipHardlink(ctx, entry.FullPath); storeDeletionErr != nil { - return fmt.Errorf("filer store delete skip hardlink: %v", storeDeletionErr) - } - } else if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil { + if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil { return fmt.Errorf("filer store delete: %v", storeDeletionErr) } if !entry.IsDirectory() { diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 4e2f4834f..493ba845a 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -23,7 +23,6 @@ type VirtualFilerStore interface { FilerStore DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteOneEntry(ctx context.Context, entry *Entry) error - DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) error AddPathSpecificStore(path string, storeId string, store FilerStore) OnBucketCreation(bucket string) OnBucketDeletion(bucket string) @@ -217,18 +216,6 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry return actualStore.DeleteEntry(ctx, existingEntry.FullPath) } -func (fsw *FilerStoreWrapper) DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) (err error) { - actualStore := fsw.getActualStore(fullpath) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) - }() - - glog.V(4).Infof("DeleteOneEntrySkipHardlink %s", fullpath) - return actualStore.DeleteEntry(ctx, fullpath) -} - func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { actualStore := fsw.getActualStore(fp + "/") stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index 9b757afdb..76def715b 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -65,7 +65,7 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro return mc.localStore.InsertEntry(ctx, entry) } -func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry, shouldDeleteChunks bool) error { +func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { //mc.Lock() //defer mc.Unlock() @@ -77,14 +77,8 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti // leave the update to the following InsertEntry operation } else { glog.V(3).Infof("DeleteEntry %s", oldPath) - if shouldDeleteChunks { - if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { - return err - } - } else { - if err := mc.localStore.DeleteOneEntrySkipHardlink(ctx, oldPath); err != nil { - return err - } + if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { + return err } } } @@ -121,12 +115,6 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi return } -func (mc *MetaCache) DeleteEntrySkipHardlink(ctx context.Context, fp util.FullPath) (err error) { - //mc.Lock() - //defer mc.Unlock() - return mc.localStore.DeleteOneEntrySkipHardlink(ctx, fp) -} - func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { //mc.Lock() //defer mc.Unlock() diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index 8b02058ea..22f02c8c7 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -36,7 +36,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil glog.V(4).Infof("creating %v", key) newEntry = filer.FromPbEntry(dir, message.NewEntry) } - err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry, message.DeleteChunks) + err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) if err == nil { if message.OldEntry != nil && message.NewEntry != nil { oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go index fc2243aac..42dcf348d 100644 --- a/weed/mount/weedfs_rename.go +++ b/weed/mount/weedfs_rename.go @@ -223,7 +223,7 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR if resp.EventNotification.NewEntry != nil { // with new entry, the old entry name also exists. This is the first step to create new entry newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry) - if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry, false); err != nil { + if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil { return err } @@ -248,7 +248,7 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR } else if resp.EventNotification.OldEntry != nil { // without new entry, only old entry name exists. This is the second step to delete old entry - if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil, resp.EventNotification.DeleteChunks); err != nil { + if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil { return err } }