From 8d42a1263bdbf8d0e6c425ae0aedea476e22420c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 28 Apr 2020 17:30:04 -0700 Subject: [PATCH] meta subscription: update the last read time --- weed/server/filer_grpc_server_listen.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go index 2a18f1950..e45068f39 100644 --- a/weed/server/filer_grpc_server_listen.go +++ b/weed/server/filer_grpc_server_listen.go @@ -22,6 +22,8 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, defer fs.deleteClient(clientName) lastReadTime := time.Unix(0, req.SinceNs) + glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + var processedTsNs int64 eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { @@ -67,6 +69,8 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return err } + processedTsNs = logEntry.TsNs + return nil } @@ -74,6 +78,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return fmt.Errorf("reading from persisted logs: %v", err) } + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait()