This commit is contained in:
Chris Lu 2020-12-10 19:55:28 -08:00
parent 83078ac6ce
commit 4b0c2a846b
2 changed files with 16 additions and 0 deletions

View file

@ -3,6 +3,7 @@ package filer
import (
"context"
"errors"
"github.com/chrislusf/seaweedfs/weed/glog"
"strings"
"time"
@ -85,6 +86,7 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err
return err
}
glog.V(4).Infof("InsertEntry %s", entry.FullPath)
return fsw.ActualStore.InsertEntry(ctx, entry)
}
@ -104,6 +106,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err
return err
}
glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
return fsw.ActualStore.UpdateEntry(ctx, entry)
}
@ -114,6 +117,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("FindEntry %s", fp)
entry, err = fsw.ActualStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
@ -138,11 +142,13 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath)
}
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
glog.V(4).Infof("DeleteEntry %s", fp)
return fsw.ActualStore.DeleteEntry(ctx, fp)
}
@ -155,11 +161,13 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
glog.V(4).Infof("DeleteHaDeleteEntryrdLink %s", existingEntry.FullPath)
return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
@ -170,6 +178,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("DeleteFolderChildren %s", fp)
return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
}
@ -180,6 +189,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
@ -197,6 +207,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
if err == ErrUnsupportedListDirectoryPrefixed {
entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)

View file

@ -18,6 +18,7 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry
}
// check what is existing entry
glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
if err != nil && err != filer_pb.ErrNotFound {
return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
@ -25,6 +26,7 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry
// remove old hard link
if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
glog.V(4).Infof("handleUpdateToHardLinks DeleteHardLink %s", entry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
@ -52,6 +54,7 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr
}
key := entry.HardLinkId
glog.V(4).Infof("maybeReadHardLink KvGet %v", key)
value, err := fsw.KvGet(ctx, key)
if err != nil {
glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
@ -83,6 +86,7 @@ func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId Har
entry.HardLinkCounter--
if entry.HardLinkCounter <= 0 {
glog.V(4).Infof("DeleteHardLink KvDelete %v", key)
return fsw.KvDelete(ctx, key)
}
@ -91,6 +95,7 @@ func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId Har
return encodeErr
}
glog.V(4).Infof("DeleteHardLink KvPut %v", key)
return fsw.KvPut(ctx, key, newBlob)
}