diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go index c23c6a64a..f83bcd08b 100644 --- a/weed/mq/client/publish_stream_processor.go +++ b/weed/mq/client/publish_stream_processor.go @@ -3,6 +3,7 @@ package client import ( "context" flatbuffers "github.com/google/flatbuffers/go" + "github.com/seaweedfs/seaweedfs/weed/mq/messages" "github.com/seaweedfs/seaweedfs/weed/mq/segment" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -31,7 +32,7 @@ type PublishStreamProcessor struct { timeout time.Duration // convert into bytes - messagesChan chan *Message + messagesChan chan *messages.Message builders chan *flatbuffers.Builder batchMessageCountLimit int @@ -51,7 +52,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), batchMessageCountLimit: batchMessageCountLimit, builders: make(chan *flatbuffers.Builder, batchCountLimit), - messagesChan: make(chan *Message, 1024), + messagesChan: make(chan *messages.Message, 1024), doneChan: make(chan struct{}), timeout: timeout, } @@ -62,7 +63,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration return t } -func (p *PublishStreamProcessor) AddMessage(m *Message) error { +func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error { p.messagesChan <- m return nil } @@ -72,7 +73,7 @@ func (p *PublishStreamProcessor) Shutdown() error { return nil } -func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*Message) error { +func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error { if len(messages) == 0 { return nil @@ -102,7 +103,7 @@ func (p *PublishStreamProcessor) doLoopUpload() { brokerGrpcAddress := "localhost:17777" // TOOD parallelize the uploading with separate uploader - messages := make([]*Message, 0, p.batchMessageCountLimit) + messages := make([]*messages.Message, 0, p.batchMessageCountLimit) util.RetryForever("publish message", func() error { return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { diff --git a/weed/mq/client/publisher.go b/weed/mq/client/publisher.go index 30de47665..826947721 100644 --- a/weed/mq/client/publisher.go +++ b/weed/mq/client/publisher.go @@ -1,12 +1,13 @@ package client import ( + "github.com/seaweedfs/seaweedfs/weed/mq/messages" "github.com/seaweedfs/seaweedfs/weed/pb" "time" ) type PublishProcessor interface { - AddMessage(m *Message) error + AddMessage(m *messages.Message) error Shutdown() error } @@ -30,14 +31,7 @@ func NewPublisher(option *PublisherOption) *Publisher { return p } -type Message struct { - Key []byte - Content []byte - Properties map[string]string - Ts time.Time -} - -func (p Publisher) Publish(m *Message) error { +func (p Publisher) Publish(m *messages.Message) error { return p.processor.AddMessage(m) } diff --git a/weed/mq/cmd/qsend/qsend.go b/weed/mq/cmd/qsend/qsend.go index 34f7e6dc5..c80b220b8 100644 --- a/weed/mq/cmd/qsend/qsend.go +++ b/weed/mq/cmd/qsend/qsend.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client" + "github.com/seaweedfs/seaweedfs/weed/mq/messages" "os" "time" ) @@ -24,7 +25,7 @@ func main() { err := eachLineStdin(func(line string) error { if len(line) > 0 { - if err := publisher.Publish(&client.Message{ + if err := publisher.Publish(&messages.Message{ Key: nil, Content: []byte(line), Properties: nil, diff --git a/weed/mq/messages/messages.go b/weed/mq/messages/messages.go new file mode 100644 index 000000000..b3bd66f52 --- /dev/null +++ b/weed/mq/messages/messages.go @@ -0,0 +1,10 @@ +package messages + +import "time" + +type Message struct { + Key []byte + Content []byte + Properties map[string]string + Ts time.Time +}