mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #2182 from qieqieplus/fix-event-notification
Subscribe meta data: sync empty notification with timestamp
This commit is contained in:
commit
e2da647fa3
|
@ -2,7 +2,6 @@ package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -12,6 +11,12 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
|
||||||
|
MaxUnsyncedEvents = 1e3
|
||||||
)
|
)
|
||||||
|
|
||||||
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
|
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
|
||||||
|
@ -25,7 +30,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
||||||
lastReadTime := time.Unix(0, req.SinceNs)
|
lastReadTime := time.Unix(0, req.SinceNs)
|
||||||
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
||||||
|
|
||||||
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
|
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
|
||||||
|
|
||||||
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
||||||
|
|
||||||
|
@ -87,7 +92,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
|
||||||
lastReadTime := time.Unix(0, req.SinceNs)
|
lastReadTime := time.Unix(0, req.SinceNs)
|
||||||
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
||||||
|
|
||||||
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
|
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
|
||||||
|
|
||||||
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
||||||
|
|
||||||
|
@ -152,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
|
func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
|
||||||
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
|
filtered := 0
|
||||||
|
|
||||||
|
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
|
||||||
|
defer func() {
|
||||||
|
if filtered > MaxUnsyncedEvents {
|
||||||
|
if err := stream.Send(&filer_pb.SubscribeMetadataResponse{
|
||||||
|
EventNotification: &filer_pb.EventNotification{},
|
||||||
|
TsNs: tsNs,
|
||||||
|
}); err == nil {
|
||||||
|
filtered = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
filtered++
|
||||||
foundSelf := false
|
foundSelf := false
|
||||||
for _, sig := range eventNotification.Signatures {
|
for _, sig := range eventNotification.Signatures {
|
||||||
if sig == clientSignature && clientSignature != 0 {
|
if sig == req.Signature && req.Signature != 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if sig == fs.filer.Signature {
|
if sig == fs.filer.Signature {
|
||||||
|
@ -204,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
|
||||||
glog.V(0).Infof("=> client %v: %+v", clientName, err)
|
glog.V(0).Infof("=> client %v: %+v", clientName, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
filtered = 0
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue