mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #1801 from kmlebedev/recoveringRabbitMQ
Do reconnect to RabbitMQ
This commit is contained in:
commit
0f426ce34d
|
@ -55,7 +55,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
|
||||||
|
|
||||||
if notification.Queue != nil {
|
if notification.Queue != nil {
|
||||||
glog.V(3).Infof("notifying entry update %v", fullpath)
|
glog.V(3).Infof("notifying entry update %v", fullpath)
|
||||||
notification.Queue.SendMessage(fullpath, eventNotification)
|
if err := notification.Queue.SendMessage(fullpath, eventNotification); err != nil {
|
||||||
|
// throw message
|
||||||
|
glog.Error(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
f.logMetaEvent(ctx, fullpath, eventNotification)
|
f.logMetaEvent(ctx, fullpath, eventNotification)
|
||||||
|
|
|
@ -17,10 +17,14 @@ package gocdk_pub_sub
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/streadway/amqp"
|
||||||
"gocloud.dev/pubsub"
|
"gocloud.dev/pubsub"
|
||||||
_ "gocloud.dev/pubsub/awssnssqs"
|
_ "gocloud.dev/pubsub/awssnssqs"
|
||||||
|
"gocloud.dev/pubsub/rabbitpubsub"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/notification"
|
"github.com/chrislusf/seaweedfs/weed/notification"
|
||||||
|
@ -29,12 +33,18 @@ import (
|
||||||
_ "gocloud.dev/pubsub/gcppubsub"
|
_ "gocloud.dev/pubsub/gcppubsub"
|
||||||
_ "gocloud.dev/pubsub/natspubsub"
|
_ "gocloud.dev/pubsub/natspubsub"
|
||||||
_ "gocloud.dev/pubsub/rabbitpubsub"
|
_ "gocloud.dev/pubsub/rabbitpubsub"
|
||||||
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
|
notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getPath(rawUrl string) string {
|
||||||
|
parsedUrl, _ := url.Parse(rawUrl)
|
||||||
|
return path.Join(parsedUrl.Host, parsedUrl.Path)
|
||||||
|
}
|
||||||
|
|
||||||
type GoCDKPubSub struct {
|
type GoCDKPubSub struct {
|
||||||
topicURL string
|
topicURL string
|
||||||
topic *pubsub.Topic
|
topic *pubsub.Topic
|
||||||
|
@ -44,6 +54,28 @@ func (k *GoCDKPubSub) GetName() string {
|
||||||
return "gocdk_pub_sub"
|
return "gocdk_pub_sub"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *GoCDKPubSub) doReconnect() {
|
||||||
|
var conn *amqp.Connection
|
||||||
|
if k.topic.As(&conn) {
|
||||||
|
go func() {
|
||||||
|
<-conn.NotifyClose(make(chan *amqp.Error))
|
||||||
|
conn.Close()
|
||||||
|
k.topic.Shutdown(context.Background())
|
||||||
|
for {
|
||||||
|
glog.Info("Try reconnect")
|
||||||
|
conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
|
||||||
|
if err == nil {
|
||||||
|
k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
|
||||||
|
k.doReconnect()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
glog.Error(err)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
|
func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
|
||||||
k.topicURL = configuration.GetString(prefix + "topic_url")
|
k.topicURL = configuration.GetString(prefix + "topic_url")
|
||||||
glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
|
glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
|
||||||
|
@ -52,6 +84,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
|
||||||
glog.Fatalf("Failed to open topic: %v", err)
|
glog.Fatalf("Failed to open topic: %v", err)
|
||||||
}
|
}
|
||||||
k.topic = topic
|
k.topic = topic
|
||||||
|
k.doReconnect()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,9 +9,12 @@ import (
|
||||||
"github.com/streadway/amqp"
|
"github.com/streadway/amqp"
|
||||||
"gocloud.dev/pubsub"
|
"gocloud.dev/pubsub"
|
||||||
_ "gocloud.dev/pubsub/awssnssqs"
|
_ "gocloud.dev/pubsub/awssnssqs"
|
||||||
|
"gocloud.dev/pubsub/rabbitpubsub"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
// _ "gocloud.dev/pubsub/azuresb"
|
// _ "gocloud.dev/pubsub/azuresb"
|
||||||
_ "gocloud.dev/pubsub/gcppubsub"
|
_ "gocloud.dev/pubsub/gcppubsub"
|
||||||
|
@ -73,7 +76,8 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str
|
||||||
}
|
}
|
||||||
|
|
||||||
type GoCDKPubSubInput struct {
|
type GoCDKPubSubInput struct {
|
||||||
sub *pubsub.Subscription
|
sub *pubsub.Subscription
|
||||||
|
subURL string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *GoCDKPubSubInput) GetName() string {
|
func (k *GoCDKPubSubInput) GetName() string {
|
||||||
|
@ -82,9 +86,9 @@ func (k *GoCDKPubSubInput) GetName() string {
|
||||||
|
|
||||||
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
|
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
|
||||||
topicUrl := configuration.GetString(prefix + "topic_url")
|
topicUrl := configuration.GetString(prefix + "topic_url")
|
||||||
subURL := configuration.GetString(prefix + "sub_url")
|
k.subURL = configuration.GetString(prefix + "sub_url")
|
||||||
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
|
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
|
||||||
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
|
sub, err := pubsub.OpenSubscription(context.Background(), k.subURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -95,10 +99,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer ch.Close()
|
defer ch.Close()
|
||||||
_, err = ch.QueueInspect(getPath(subURL))
|
_, err = ch.QueueInspect(getPath(k.subURL))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
|
if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
|
||||||
if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
|
if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -111,13 +115,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
|
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
|
||||||
msg, err := k.sub.Receive(context.Background())
|
ctx := context.Background()
|
||||||
|
msg, err := k.sub.Receive(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var conn *amqp.Connection
|
var conn *amqp.Connection
|
||||||
if k.sub.As(&conn) && conn.IsClosed() {
|
if k.sub.As(&conn) && conn.IsClosed() {
|
||||||
glog.Fatalln(err)
|
conn.Close()
|
||||||
|
k.sub.Shutdown(ctx)
|
||||||
|
conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(err)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return
|
// This is permanent cached sub err
|
||||||
|
glog.Fatal(err)
|
||||||
}
|
}
|
||||||
onFailureFn = func() {
|
onFailureFn = func() {
|
||||||
if msg.Nackable() {
|
if msg.Nackable() {
|
||||||
|
|
Loading…
Reference in a new issue