diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 624069b7e..3fdac1b26 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,7 +2,6 @@ package weed_server import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "strings" "time" @@ -12,6 +11,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" +) + +const ( + // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered + MaxUnsyncedEvents = 1e3 ) func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { @@ -25,7 +30,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -87,7 +92,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -152,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { - return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + filtered := 0 + return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + defer func() { + if filtered > MaxUnsyncedEvents { + if err := stream.Send(&filer_pb.SubscribeMetadataResponse{ + EventNotification: &filer_pb.EventNotification{}, + TsNs: tsNs, + }); err == nil { + filtered = 0 + } + } + }() + + filtered++ foundSelf := false for _, sig := range eventNotification.Signatures { - if sig == clientSignature && clientSignature != 0 { + if sig == req.Signature && req.Signature != 0 { return nil } if sig == fs.filer.Signature { @@ -204,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe glog.V(0).Infof("=> client %v: %+v", clientName, err) return err } + filtered = 0 return nil } }