From fda2fc47b1bed30b606539c8caad5e289376382f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 15 Aug 2021 12:37:35 -0700 Subject: [PATCH] add RetryForever --- .../meta_cache/meta_cache_subscribe.go | 5 ++++- weed/util/retry.go | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index c650b8024..409120060 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -59,8 +59,11 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } - return util.Retry("followMetaUpdates", func() error { + util.RetryForever("followMetaUpdates", func() error { return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true) + }, func(err error) bool { + glog.Errorf("follow metadata updates: %v", err) + return true }) } diff --git a/weed/util/retry.go b/weed/util/retry.go index 7b0f2d3c3..4b34d5129 100644 --- a/weed/util/retry.go +++ b/weed/util/retry.go @@ -32,6 +32,26 @@ func Retry(name string, job func() error) (err error) { return err } +func RetryForever(name string, job func() error, onErrFn func(err error) bool) { + waitTime := time.Second + for { + err = job() + if err == nil { + break + } + if onErrFn(err) { + if strings.Contains(err.Error(), "transport") { + glog.V(0).Infof("retry %s: err: %v", name, err) + time.Sleep(waitTime) + if waitTime < RetryWaitTime { + waitTime += waitTime / 2 + } + } + continue + } + } +} + // return the first non empty string func Nvl(values ...string) string { for _, s := range values {