diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index f3a795ad0..c461a82b8 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -55,7 +55,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry if notification.Queue != nil { glog.V(3).Infof("notifying entry update %v", fullpath) - notification.Queue.SendMessage(fullpath, eventNotification) + if err := notification.Queue.SendMessage(fullpath, eventNotification); err != nil { + // throw message + glog.Error(err) + } } f.logMetaEvent(ctx, fullpath, eventNotification) diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go index 1ae102509..01c4d901f 100644 --- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -17,10 +17,14 @@ package gocdk_pub_sub import ( "context" "fmt" - "github.com/golang/protobuf/proto" + "github.com/streadway/amqp" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" + "gocloud.dev/pubsub/rabbitpubsub" + "net/url" + "path" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" @@ -29,12 +33,18 @@ import ( _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" _ "gocloud.dev/pubsub/rabbitpubsub" + "os" ) func init() { notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{}) } +func getPath(rawUrl string) string { + parsedUrl, _ := url.Parse(rawUrl) + return path.Join(parsedUrl.Host, parsedUrl.Path) +} + type GoCDKPubSub struct { topicURL string topic *pubsub.Topic @@ -44,6 +54,28 @@ func (k *GoCDKPubSub) GetName() string { return "gocdk_pub_sub" } +func (k *GoCDKPubSub) doReconnect() { + var conn *amqp.Connection + if k.topic.As(&conn) { + go func() { + <-conn.NotifyClose(make(chan *amqp.Error)) + conn.Close() + k.topic.Shutdown(context.Background()) + for { + glog.Info("Try reconnect") + conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) + if err == nil { + k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil) + k.doReconnect() + break + } + glog.Error(err) + time.Sleep(time.Second) + } + }() + } +} + func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error { k.topicURL = configuration.GetString(prefix + "topic_url") glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL) @@ -52,6 +84,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string glog.Fatalf("Failed to open topic: %v", err) } k.topic = topic + k.doReconnect() return nil } diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 1d7fe520b..b16eec2e1 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -9,9 +9,12 @@ import ( "github.com/streadway/amqp" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" + "gocloud.dev/pubsub/rabbitpubsub" "net/url" + "os" "path" "strings" + "time" // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" @@ -73,7 +76,8 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str } type GoCDKPubSubInput struct { - sub *pubsub.Subscription + sub *pubsub.Subscription + subURL string } func (k *GoCDKPubSubInput) GetName() string { @@ -82,9 +86,9 @@ func (k *GoCDKPubSubInput) GetName() string { func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { topicUrl := configuration.GetString(prefix + "topic_url") - subURL := configuration.GetString(prefix + "sub_url") - glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) - sub, err := pubsub.OpenSubscription(context.Background(), subURL) + k.subURL = configuration.GetString(prefix + "sub_url") + glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL) + sub, err := pubsub.OpenSubscription(context.Background(), k.subURL) if err != nil { return err } @@ -95,10 +99,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s return err } defer ch.Close() - _, err = ch.QueueInspect(getPath(subURL)) + _, err = ch.QueueInspect(getPath(k.subURL)) if err != nil { if strings.HasPrefix(err.Error(), "Exception (404) Reason") { - if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil { + if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil { return err } } else { @@ -111,13 +115,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s } func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { - msg, err := k.sub.Receive(context.Background()) + ctx := context.Background() + msg, err := k.sub.Receive(ctx) if err != nil { var conn *amqp.Connection if k.sub.As(&conn) && conn.IsClosed() { - glog.Fatalln(err) + conn.Close() + k.sub.Shutdown(ctx) + conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) + if err != nil { + glog.Error(err) + time.Sleep(time.Second) + return + } + k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil) + return } - return + // This is permanent cached sub err + glog.Fatal(err) } onFailureFn = func() { if msg.Nackable() {