diff --git a/weed/mq/broker/broker_append.go b/weed/mq/broker/broker_append.go index 4f3af0ff8..c8e0da93c 100644 --- a/weed/mq/broker/broker_append.go +++ b/weed/mq/broker/broker_append.go @@ -14,7 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error { +func (broker *MessageQueueBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error { assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data) if err2 != nil { @@ -46,7 +46,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *mq_pb. return nil } -func (broker *MessageBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { +func (broker *MessageQueueBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { var assignResult = &operation.AssignResult{} @@ -106,9 +106,9 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *mq_pb.TopicConfigurati return assignResult, uploadResult, nil } -var _ = filer_pb.FilerClient(&MessageBroker{}) +var _ = filer_pb.FilerClient(&MessageQueueBroker{}) -func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { +func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { for _, filer := range broker.option.Filers { if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil { @@ -125,6 +125,6 @@ func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_p } -func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string { +func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string { return location.Url } diff --git a/weed/mq/broker/broker_grpc_server_subscribe.go b/weed/mq/broker/broker_grpc_server_subscribe.go index 1a9c62d75..3743218b1 100644 --- a/weed/mq/broker/broker_grpc_server_subscribe.go +++ b/weed/mq/broker/broker_grpc_server_subscribe.go @@ -16,7 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) -func (broker *MessageBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error { +func (broker *MessageQueueBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error { // process initial request in, err := stream.Recv() @@ -138,7 +138,7 @@ func (broker *MessageBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeSe } -func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { +func (broker *MessageQueueBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 5aa5285c9..3fd01fb53 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -13,7 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" ) -type MessageBrokerOption struct { +type MessageQueueBrokerOption struct { Filers []pb.ServerAddress DefaultReplication string MaxMB int @@ -22,16 +22,16 @@ type MessageBrokerOption struct { Cipher bool } -type MessageBroker struct { +type MessageQueueBroker struct { mq_pb.UnimplementedSeaweedMessagingServer - option *MessageBrokerOption + option *MessageQueueBrokerOption grpcDialOption grpc.DialOption topicManager *TopicManager } -func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { +func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageQueueBroker, err error) { - messageBroker = &MessageBroker{ + messageBroker = &MessageQueueBroker{ option: option, grpcDialOption: grpcDialOption, } @@ -45,7 +45,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio return messageBroker, nil } -func (broker *MessageBroker) keepConnectedToOneFiler() { +func (broker *MessageQueueBroker) keepConnectedToOneFiler() { for { for _, filer := range broker.option.Filers { @@ -101,13 +101,13 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { } -func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { +func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) } -func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { +func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) diff --git a/weed/mq/broker/topic_manager.go b/weed/mq/broker/topic_manager.go index 1acf085fa..34f063d9a 100644 --- a/weed/mq/broker/topic_manager.go +++ b/weed/mq/broker/topic_manager.go @@ -36,10 +36,10 @@ type TopicControl struct { type TopicManager struct { sync.Mutex topicControls map[TopicPartition]*TopicControl - broker *MessageBroker + broker *MessageQueueBroker } -func NewTopicManager(messageBroker *MessageBroker) *TopicManager { +func NewTopicManager(messageBroker *MessageQueueBroker) *TopicManager { return &TopicManager{ topicControls: make(map[TopicPartition]*TopicControl), broker: messageBroker,