notification add ack and nack

This commit is contained in:
Chris Lu 2021-01-26 11:08:44 -08:00
parent 3a1d3d3413
commit ad2a20c8a5
6 changed files with 38 additions and 9 deletions

View file

@ -97,13 +97,19 @@ func runFilerReplicate(cmd *Command, args []string) bool {
replicator := replication.NewReplicator(config, "source.filer.", dataSink) replicator := replication.NewReplicator(config, "source.filer.", dataSink)
for { for {
key, m, err := notificationInput.ReceiveMessage() key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage()
if err != nil { if err != nil {
glog.Errorf("receive %s: %+v", key, err) glog.Errorf("receive %s: %+v", key, err)
if onFailureFn != nil {
onFailureFn()
}
continue continue
} }
if key == "" { if key == "" {
// long poll received no messages // long poll received no messages
if onSuccessFn != nil {
onSuccessFn()
}
continue continue
} }
if m.OldEntry != nil && m.NewEntry == nil { if m.OldEntry != nil && m.NewEntry == nil {
@ -115,8 +121,14 @@ func runFilerReplicate(cmd *Command, args []string) bool {
} }
if err = replicator.Replicate(context.Background(), key, m); err != nil { if err = replicator.Replicate(context.Background(), key, m); err != nil {
glog.Errorf("replicate %s: %+v", key, err) glog.Errorf("replicate %s: %+v", key, err)
if onFailureFn != nil {
onFailureFn()
}
} else { } else {
glog.V(1).Infof("replicated %s", key) glog.V(1).Infof("replicated %s", key)
if onSuccessFn != nil {
onSuccessFn()
}
} }
} }

View file

@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que
return nil return nil
} }
func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
// receive message // receive message
result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{

View file

@ -38,13 +38,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
return nil return nil
} }
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg, err := k.sub.Receive(context.Background()) msg, err := k.sub.Receive(context.Background())
if err != nil {
return
}
onSuccessFn = func() {
msg.Ack()
}
onFailureFn = func() {
if msg.Nackable() {
msg.Nack()
}
}
key = msg.Metadata["key"] key = msg.Metadata["key"]
message = &filer_pb.EventNotification{} message = &filer_pb.EventNotification{}
err = proto.Unmarshal(msg.Body, message) err = proto.Unmarshal(msg.Body, message)
if err != nil { if err != nil {
return "", nil, err return "", nil, onSuccessFn, onFailureFn, err
} }
return key, message, nil return key, message, onSuccessFn, onFailureFn, nil
} }

View file

@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
k.messageChan <- m k.messageChan <- m
m.Ack()
}) })
return err return err
} }
func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
m := <-k.messageChan m := <-k.messageChan
onSuccessFn = func() {
m.Ack()
}
onFailureFn = func() {
m.Nack()
}
// process the message // process the message
key = m.Attributes["key"] key = m.Attributes["key"]
message = &filer_pb.EventNotification{} message = &filer_pb.EventNotification{}

View file

@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
return nil return nil
} }
func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg := <-k.messageChan msg := <-k.messageChan

View file

@ -10,7 +10,7 @@ type NotificationInput interface {
GetName() string GetName() string
// Initialize initializes the file store // Initialize initializes the file store
Initialize(configuration util.Configuration, prefix string) error Initialize(configuration util.Configuration, prefix string) error
ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error)
} }
var ( var (