diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 3ae4f1e2f..4780e9a3b 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -36,6 +36,7 @@ var cmdFilerReplicate = &Command{ func runFilerReplicate(cmd *Command, args []string) bool { weed_server.LoadConfiguration("replication", true) + weed_server.LoadConfiguration("notification", true) config := viper.GetViper() var notificationInput sub.NotificationInput @@ -54,7 +55,8 @@ func runFilerReplicate(cmd *Command, args []string) bool { } if notificationInput == nil { - println("Please follow 'weed scaffold -config=repliaction' to see example notification configurations.") + println("No notification is defined in notification.toml file.") + println("Please follow 'weed scaffold -config=notification' to see example notification configurations.") return true } @@ -85,7 +87,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { } if dataSink == nil { - println("no data sink configured:") + println("no data sink configured in replication.toml:") for _, sk := range sink.Sinks { println(" " + sk.GetName()) } diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index cc6e5d6ef..4d836d9bc 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -19,7 +19,7 @@ var cmdScaffold = &Command{ var ( outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory") - config = cmdScaffold.Flag.String("config", "filer", "[filer|replication] the configuration file to generate") + config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication] the configuration file to generate") ) func runScaffold(cmd *Command, args []string) bool { @@ -28,6 +28,8 @@ func runScaffold(cmd *Command, args []string) bool { switch *config { case "filer": content = FILER_TOML_EXAMPLE + case "notification": + content = NOTIFICATION_TOML_EXAMPLE case "replication": content = REPLICATION_TOML_EXAMPLE } @@ -37,7 +39,7 @@ func runScaffold(cmd *Command, args []string) bool { } if *outputPath != "" { - ioutil.WriteFile(filepath.Join(*outputPath, *config+".toml"), []byte(content), 0x755) + ioutil.WriteFile(filepath.Join(*outputPath, *config+".toml"), []byte(content), 0644) } else { println(content) } @@ -131,20 +133,34 @@ addresses = [ "localhost:30006", ] +` + + NOTIFICATION_TOML_EXAMPLE = ` +# A sample TOML config file for SeaweedFS filer store +# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate" +# Put this file to one of the location, with descending priority +# ./notification.toml +# $HOME/.seaweedfs/notification.toml +# /etc/seaweedfs/notification.toml #################################################### # notification -# sends filer updates for each file to an external message queue +# send and receive filer updates for each file to an external message queue #################################################### [notification.log] +# this is only for debugging perpose and does not work with "weed filer.replicate" enabled = false + [notification.kafka] enabled = false hosts = [ "localhost:9092" ] topic = "seaweedfs_filer" +offsetFile = "./last.offset" +offsetSaveIntervalSeconds = 10 + [notification.aws_sqs] # experimental, let me know if it works @@ -154,7 +170,16 @@ aws_secret_access_key = "" # if empty, loads from the shared credentials region = "us-east-2" sqs_queue_name = "my_filer_queue" # an existing queue name + +[notification.google_pub_sub] +# read credentials doc at https://cloud.google.com/docs/authentication/getting-started +enabled = false +google_application_credentials = "/path/to/x.json" # path to json credential file +project_id = "" # an existing project id +topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists + ` + REPLICATION_TOML_EXAMPLE = ` # A sample TOML config file for replicating SeaweedFS filer # Used with "weed filer.replicate" @@ -168,22 +193,6 @@ enabled = true grpcAddress = "localhost:18888" directory = "/buckets" # all files under this directory tree are replicated -[notification.kafka] -enabled = false -hosts = [ - "localhost:9092" -] -topic = "seaweedfs_filer1_to_filer2" -offsetFile = "./last.offset" -offsetSaveIntervalSeconds = 10 - -[notification.aws_sqs] -enabled = false -aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -region = "us-east-2" -sqs_queue_name = "my_filer_queue" # an existing queue name - [sink.filer] enabled = false grpcAddress = "localhost:18888" diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go index e0ba61d58..68c31af77 100644 --- a/weed/notification/configuration.go +++ b/weed/notification/configuration.go @@ -35,7 +35,7 @@ func LoadConfiguration(config *viper.Viper) { store.GetName(), err) } Queue = store - glog.V(0).Infof("Configure message queue for %s", store.GetName()) + glog.V(0).Infof("Configure notification message queue for %s", store.GetName()) return } } diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go new file mode 100644 index 000000000..419fb26a4 --- /dev/null +++ b/weed/notification/google_pub_sub/google_pub_sub.go @@ -0,0 +1,89 @@ +package google_pub_sub + +import ( + "fmt" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "google.golang.org/api/option" + "context" + "cloud.google.com/go/pubsub" +) + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &GooglePubSub{}) +} + +type GooglePubSub struct { + topic *pubsub.Topic +} + +func (k *GooglePubSub) GetName() string { + return "google_pub_sub" +} + +func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) + return k.initialize( + configuration.GetString("google_application_credentials"), + configuration.GetString("project_id"), + configuration.GetString("topic"), + ) +} + +func (k *GooglePubSub) initialize(google_application_credentials, projectId, topicName string) (err error) { + + ctx := context.Background() + // Creates a client. + if google_application_credentials == "" { + var found bool + google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") + if !found { + glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") + } + } + + client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials)) + if err != nil { + glog.Fatalf("Failed to create client: %v", err) + } + + k.topic = client.Topic(topicName) + if exists, err := k.topic.Exists(ctx); err == nil { + if !exists { + k.topic, err = client.CreateTopic(ctx, topicName) + if err != nil { + glog.Fatalf("Failed to create topic %s: %v", topicName, err) + } + } + } else { + glog.Fatalf("Failed to check topic %s: %v", topicName, err) + } + + return nil +} + +func (k *GooglePubSub) SendMessage(key string, message proto.Message) (err error) { + + bytes, err := proto.Marshal(message) + if err != nil { + return + } + + ctx := context.Background() + result := k.topic.Publish(ctx, &pubsub.Message{ + Data: bytes, + Attributes: map[string]string{"key": key}, + }) + + _, err = result.Get(ctx) + if err != nil { + return fmt.Errorf("send message to google pub sub %s: %v", k.topic.String(), err) + } + + return nil +} diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index ae7751c61..c1beefc33 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -3,11 +3,11 @@ package gcssink import ( "context" "fmt" - "log" "os" "cloud.google.com/go/storage" "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -56,12 +56,12 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str var found bool google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") if !found { - log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") + glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") } } client, err := storage.NewClient(ctx, option.WithCredentialsFile(google_application_credentials)) if err != nil { - log.Fatalf("Failed to create client: %v", err) + glog.Fatalf("Failed to create client: %v", err) } g.client = client diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go new file mode 100644 index 000000000..49b0c56de --- /dev/null +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -0,0 +1,109 @@ +package sub + +import ( + "context" + "fmt" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "cloud.google.com/go/pubsub" + "google.golang.org/api/option" +) + +func init() { + NotificationInputs = append(NotificationInputs, &GooglePubSubInput{}) +} + +type GooglePubSubInput struct { + sub *pubsub.Subscription + topicName string + messageChan chan *pubsub.Message +} + +func (k *GooglePubSubInput) GetName() string { + return "google_pub_sub" +} + +func (k *GooglePubSubInput) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) + return k.initialize( + configuration.GetString("google_application_credentials"), + configuration.GetString("project_id"), + configuration.GetString("topic"), + ) +} + +func (k *GooglePubSubInput) initialize(google_application_credentials, projectId, topicName string) (err error) { + + ctx := context.Background() + // Creates a client. + if google_application_credentials == "" { + var found bool + google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") + if !found { + glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") + } + } + + client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials)) + if err != nil { + glog.Fatalf("Failed to create client: %v", err) + } + + k.topicName = topicName + topic := client.Topic(topicName) + if exists, err := topic.Exists(ctx); err == nil { + if !exists { + topic, err = client.CreateTopic(ctx, topicName) + if err != nil { + glog.Fatalf("Failed to create topic %s: %v", topicName, err) + } + } + } else { + glog.Fatalf("Failed to check topic %s: %v", topicName, err) + } + + subscriptionName := "seaweedfs_sub" + + k.sub = client.Subscription(subscriptionName) + if exists, err := k.sub.Exists(ctx); err == nil { + if !exists { + k.sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic}) + if err != nil { + glog.Fatalf("Failed to create subscription %s: %v", subscriptionName, err) + } + } + } else { + glog.Fatalf("Failed to check subscription %s: %v", topicName, err) + } + + k.messageChan = make(chan *pubsub.Message, 1) + + go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { + k.messageChan <- m + m.Ack() + }) + + return err +} + +func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + + m := <-k.messageChan + + // process the message + key = m.Attributes["key"] + message = &filer_pb.EventNotification{} + err = proto.Unmarshal(m.Data, message) + + if err != nil { + err = fmt.Errorf("unmarshal message from google pubsub %s: %v", k.topicName, err) + return + } + + return +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 65fa26987..f4100e5b2 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" + _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/log" "github.com/chrislusf/seaweedfs/weed/security" "github.com/spf13/viper" @@ -52,6 +53,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go fs.filer.KeepConnectedToMaster() LoadConfiguration("filer", true) + LoadConfiguration("notification", false) v := viper.GetViper() fs.filer.LoadConfiguration(v) @@ -81,15 +83,15 @@ func LoadConfiguration(configFileName string, required bool) { glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) - if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file + if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) if required { - glog.Errorf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ + glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ "\n\nPlease follow this example and add a filer.toml file to "+ "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ - "\n\nOr use this command to generate the default toml file\n"+ - " weed scaffold -config=%s -output=.\n", + "\nOr use this command to generate the default toml file\n"+ + " weed scaffold -config=%s -output=.\n\n\n", configFileName, configFileName, configFileName) } }