diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index bab1083ab..b4b5855ff 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -20,10 +20,10 @@ import ( ) var ( - messageBrokerStandaloneOptions QueueOptions + messageBrokerStandaloneOptions MessageBrokerOptions ) -type QueueOptions struct { +type MessageBrokerOptions struct { filer *string ip *string port *int @@ -41,8 +41,8 @@ func init() { } var cmdMsgBroker = &Command{ - UsageLine: "msg.broker [-port=17777] [-filer=]", - Short: " start a message queue broker", + UsageLine: "msgBroker [-port=17777] [-filer=]", + Short: "start a message queue broker", Long: `start a message queue broker The broker can accept gRPC calls to write or read messages. The messages are stored via filer. @@ -59,7 +59,7 @@ func runMsgBroker(cmd *Command, args []string) bool { } -func (msgBrokerOpt *QueueOptions) startQueueServer() bool { +func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) diff --git a/weed/command/server.go b/weed/command/server.go index 0d54004f8..c006f00eb 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -18,10 +18,11 @@ type ServerOptions struct { } var ( - serverOptions ServerOptions - masterOptions MasterOptions - filerOptions FilerOptions - s3Options S3Options + serverOptions ServerOptions + masterOptions MasterOptions + filerOptions FilerOptions + s3Options S3Options + msgBrokerOptions MessageBrokerOptions ) func init() { @@ -57,6 +58,7 @@ var ( pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") + isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") serverWhiteList []string ) @@ -98,6 +100,8 @@ func init() { s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file") + msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") + } func runServer(cmd *Command, args []string) bool { @@ -117,6 +121,9 @@ func runServer(cmd *Command, args []string) bool { if *isStartingS3 { *isStartingFiler = true } + if *isStartingMsgBroker { + *isStartingFiler = true + } _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) peers := strings.Join(peerList, ",") @@ -133,6 +140,7 @@ func runServer(cmd *Command, args []string) bool { serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack + msgBrokerOptions.ip = serverIp serverOptions.v.pulseSeconds = pulseSeconds masterOptions.pulseSeconds = pulseSeconds @@ -145,6 +153,7 @@ func runServer(cmd *Command, args []string) bool { filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port) s3Options.filer = &filerAddress + msgBrokerOptions.filer = &filerAddress if *filerOptions.defaultReplicaPlacement == "" { *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication @@ -188,6 +197,13 @@ func runServer(cmd *Command, args []string) bool { }() } + if *isStartingMsgBroker { + go func() { + time.Sleep(2 * time.Second) + msgBrokerOptions.startQueueServer() + }() + } + // start volume server { go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)