Merge pull request #1794 from kmlebedev/recoveringRabbitMQ

This commit is contained in:
Chris Lu 2021-02-10 04:09:54 -08:00 committed by GitHub
commit 755b524814
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 4 additions and 0 deletions

View file

@ -113,6 +113,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg, err := k.sub.Receive(context.Background())
if err != nil {
var conn *amqp.Connection
if k.sub.As(&conn) && conn.IsClosed() {
glog.Fatalln(err)
}
return
}
onFailureFn = func() {