diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index d8f33c2a5..79393332f 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -85,6 +85,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p) if localTopicPartition == nil { localTopicPartition = topic.NewLocalPartition(t, p, true, nil) + broker.localTopicManager.AddTopicPartition(t, localTopicPartition) } } else { response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 8a673009d..9a7e53ca1 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -11,20 +11,22 @@ import ( func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error { - localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Init.Topic), - topic.FromPbPartition(req.Init.Partition)) + localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic), + topic.FromPbPartition(req.Cursor.Partition)) if localTopicPartition == nil { stream.Send(&mq_pb.SubscribeResponse{ Message: &mq_pb.SubscribeResponse_Ctrl{ Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{ - Error: "not found", + Error: "not initialized", }, }, }) return nil } - localTopicPartition.Subscribe("client", time.Now(), func(logEntry *filer_pb.LogEntry) error { + clientName := fmt.Sprintf("%s/%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId) + + localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error { value := logEntry.GetData() if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{ Data: &mq_pb.DataMessage{ diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go index b7ae2fe10..529d09a4d 100644 --- a/weed/mq/client/cmd/weed_sub/subscriber.go +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -1,50 +1,30 @@ package main import ( - "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" ) func main() { - err := pb.WithBrokerGrpcClient(true, - "localhost:17777", - grpc.WithTransportCredentials(insecure.NewCredentials()), - func(client mq_pb.SeaweedMessagingClient) error { - subClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ - Init: &mq_pb.SubscribeRequest_InitMessage{ - Topic: &mq_pb.Topic{ - Namespace: "test", - Name: "test", - }, - }, - }) - if err != nil { - return err - } - for { - resp, err := subClient.Recv() - if err != nil { - return err - } - if resp.GetCtrl() != nil { - if resp.GetCtrl().Error != "" { - return fmt.Errorf("ctrl error: %v", resp.GetCtrl().Error) - } - } - if resp.GetData() != nil { - println(string(resp.GetData().Key), "=>", string(resp.GetData().Value)) - } + subscriber := sub_client.NewTopicSubscriber( + &sub_client.SubscriberConfiguration{ + ConsumerGroup: "test", + ConsumerId: "test", + }, + "test", "test") + if err := subscriber.Connect("localhost:17777"); err != nil { + fmt.Println(err) + return + } - } - return nil - }) - - if err != nil { + if err := subscriber.Subscribe(func(key, value []byte) bool { + println(string(key), "=>", string(value)) + return true + }, func() { + println("done subscribing") + }); err != nil { fmt.Println(err) } + } diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go index c54b2687d..5a9376ab1 100644 --- a/weed/mq/client/pub_client/lookup.go +++ b/weed/mq/client/pub_client/lookup.go @@ -5,14 +5,12 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc" ) -func (p *TopicPublisher) doLookup( - brokerAddress string, grpcDialOption grpc.DialOption) error { +func (p *TopicPublisher) doLookup(brokerAddress string) error { err := pb.WithBrokerGrpcClient(true, brokerAddress, - grpcDialOption, + p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { lookupResp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{ @@ -36,7 +34,7 @@ func (p *TopicPublisher) doLookup( // send init message // save the publishing client brokerAddress := brokerPartitionAssignment.LeaderBroker - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption) + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) if err != nil { return fmt.Errorf("dial broker %s: %v", brokerAddress, err) } diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 171b5ebd7..5963838ce 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -15,6 +15,7 @@ type TopicPublisher struct { topic string partition2Broker *interval.SearchTree[string, int32] broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] + grpcDialOption grpc.DialOption } func NewTopicPublisher(namespace, topic string) *TopicPublisher { @@ -25,11 +26,12 @@ func NewTopicPublisher(namespace, topic string) *TopicPublisher { return int(a - b) }), broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](), + grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), } } func (p *TopicPublisher) Connect(bootstrapBroker string) error { - if err := p.doLookup(bootstrapBroker, grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { + if err := p.doLookup(bootstrapBroker); err != nil { return err } return nil diff --git a/weed/mq/client/sub_client/lookup.go b/weed/mq/client/sub_client/lookup.go index 89d3d2c45..e836c4864 100644 --- a/weed/mq/client/sub_client/lookup.go +++ b/weed/mq/client/sub_client/lookup.go @@ -5,70 +5,30 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc" ) -func (p *TopicSubscriber) doLookup( - brokerAddress string, grpcDialOption grpc.DialOption) error { +func (sub *TopicSubscriber) doLookup(brokerAddress string) error { err := pb.WithBrokerGrpcClient(true, brokerAddress, - grpcDialOption, + sub.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { lookupResp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{ Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, + Namespace: sub.namespace, + Name: sub.topic, }, - IsForPublish: true, + IsForPublish: false, }) if err != nil { return err } - for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { - // partition => broker - p.partition2Broker.Insert( - brokerPartitionAssignment.Partition.RangeStart, - brokerPartitionAssignment.Partition.RangeStop, - brokerPartitionAssignment.LeaderBroker) - - // broker => publish client - // send init message - // save the publishing client - brokerAddress := brokerPartitionAssignment.LeaderBroker - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption) - if err != nil { - return fmt.Errorf("dial broker %s: %v", brokerAddress, err) - } - brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - publishClient, err := brokerClient.Publish(context.Background()) - if err != nil { - return fmt.Errorf("create publish client: %v", err) - } - p.broker2PublishClient.Set(brokerAddress, publishClient) - if err = publishClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Init{ - Init: &mq_pb.PublishRequest_InitMessage{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, - Partition: &mq_pb.Partition{ - RingSize: brokerPartitionAssignment.Partition.RingSize, - RangeStart: brokerPartitionAssignment.Partition.RangeStart, - RangeStop: brokerPartitionAssignment.Partition.RangeStop, - }, - }, - }, - }); err != nil { - return fmt.Errorf("send init message: %v", err) - } - } + sub.brokerPartitionAssignments = lookupResp.BrokerPartitionAssignments return nil }) if err != nil { - return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) + return fmt.Errorf("lookup topic %s/%s: %v", sub.namespace, sub.topic, err) } return nil } diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index 158c93010..622b88828 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -1,28 +1,71 @@ package sub_client import ( - cmap "github.com/orcaman/concurrent-map" - "github.com/rdleal/intervalst/interval" + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sync" ) -type SubscriberConfiguration struct { -} +type EachMessageFunc func(key, value []byte) (shouldContinue bool) +type FinalFunc func() -type TopicSubscriber struct { - namespace string - topic string - partition2Broker *interval.SearchTree[string, int32] - broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] -} - -func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber { - return &TopicSubscriber{ - namespace: namespace, - topic: topic, - partition2Broker: interval.NewSearchTree[string](func(a, b int32) int { - return int(a - b) - }), - broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](), +func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn FinalFunc) error { + var wg sync.WaitGroup + for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { + brokerAddress := brokerPartitionAssignment.LeaderBroker + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.grpcDialOption) + if err != nil { + return fmt.Errorf("dial broker %s: %v", brokerAddress, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ + Consumer: &mq_pb.SubscribeRequest_Consumer{ + ConsumerGroup: sub.config.ConsumerGroup, + ConsumerId: sub.config.ConsumerId, + }, + Cursor: &mq_pb.SubscribeRequest_Cursor{ + Topic: &mq_pb.Topic{ + Namespace: sub.namespace, + Name: sub.topic, + }, + Partition: &mq_pb.Partition{ + RingSize: brokerPartitionAssignment.Partition.RingSize, + RangeStart: brokerPartitionAssignment.Partition.RangeStart, + RangeStop: brokerPartitionAssignment.Partition.RangeStop, + }, + }, + }) + if err != nil { + return fmt.Errorf("create subscribe client: %v", err) + } + wg.Add(1) + go func() { + defer wg.Done() + if finalFn != nil { + defer finalFn() + } + for { + resp, err := subscribeClient.Recv() + if err != nil { + fmt.Printf("subscribe error: %v\n", err) + return + } + if resp.Message == nil { + continue + } + switch m := resp.Message.(type) { + case *mq_pb.SubscribeResponse_Data: + if !eachMessageFn(m.Data.Key, m.Data.Value) { + return + } + case *mq_pb.SubscribeResponse_Ctrl: + // ignore + } + } + }() } + wg.Wait() + return nil } diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go new file mode 100644 index 000000000..a193730b0 --- /dev/null +++ b/weed/mq/client/sub_client/subscriber.go @@ -0,0 +1,36 @@ +package sub_client + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type SubscriberConfiguration struct { + ConsumerGroup string + ConsumerId string +} + +type TopicSubscriber struct { + config *SubscriberConfiguration + namespace string + topic string + brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment + grpcDialOption grpc.DialOption +} + +func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber { + return &TopicSubscriber{ + config: config, + namespace: namespace, + topic: topic, + grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + } +} + +func (sub *TopicSubscriber) Connect(bootstrapBroker string) error { + if err := sub.doLookup(bootstrapBroker); err != nil { + return err + } + return nil +} diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 168e3d561..6e7db5d08 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -25,6 +25,7 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition Partitions: make([]*LocalPartition, 0), } } + manager.topics.SetIfAbsent(topic.String(), localTopic) if localTopic.findPartition(localPartition.Partition) != nil { return } diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto index 8b5422596..424dc4b4f 100644 --- a/weed/pb/mq.proto +++ b/weed/pb/mq.proto @@ -162,12 +162,20 @@ message PublishResponse { string redirect_to_broker = 3; } message SubscribeRequest { - message InitMessage { + message Consumer { + string consumer_group = 1; + string consumer_id = 2; + } + message Cursor { Topic topic = 1; Partition partition = 2; + oneof offset { + int64 start_offset = 3; + int64 start_timestamp_ns = 4; + } } - InitMessage init = 1; - int64 sequence = 2; + Consumer consumer = 1; + Cursor cursor = 2; } message SubscribeResponse { message CtrlMessage { diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go index 3cec8deaa..1394d01be 100644 --- a/weed/pb/mq_pb/mq.pb.go +++ b/weed/pb/mq_pb/mq.pb.go @@ -1379,8 +1379,8 @@ type SubscribeRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Init *SubscribeRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3" json:"init,omitempty"` - Sequence int64 `protobuf:"varint,2,opt,name=sequence,proto3" json:"sequence,omitempty"` + Consumer *SubscribeRequest_Consumer `protobuf:"bytes,1,opt,name=consumer,proto3" json:"consumer,omitempty"` + Cursor *SubscribeRequest_Cursor `protobuf:"bytes,2,opt,name=cursor,proto3" json:"cursor,omitempty"` } func (x *SubscribeRequest) Reset() { @@ -1415,18 +1415,18 @@ func (*SubscribeRequest) Descriptor() ([]byte, []int) { return file_mq_proto_rawDescGZIP(), []int{24} } -func (x *SubscribeRequest) GetInit() *SubscribeRequest_InitMessage { +func (x *SubscribeRequest) GetConsumer() *SubscribeRequest_Consumer { if x != nil { - return x.Init + return x.Consumer } return nil } -func (x *SubscribeRequest) GetSequence() int64 { +func (x *SubscribeRequest) GetCursor() *SubscribeRequest_Cursor { if x != nil { - return x.Sequence + return x.Cursor } - return 0 + return nil } type SubscribeResponse struct { @@ -1565,17 +1565,17 @@ func (x *PublishRequest_InitMessage) GetPartition() *Partition { return nil } -type SubscribeRequest_InitMessage struct { +type SubscribeRequest_Consumer struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"` + ConsumerId string `protobuf:"bytes,2,opt,name=consumer_id,json=consumerId,proto3" json:"consumer_id,omitempty"` } -func (x *SubscribeRequest_InitMessage) Reset() { - *x = SubscribeRequest_InitMessage{} +func (x *SubscribeRequest_Consumer) Reset() { + *x = SubscribeRequest_Consumer{} if protoimpl.UnsafeEnabled { mi := &file_mq_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -1583,13 +1583,13 @@ func (x *SubscribeRequest_InitMessage) Reset() { } } -func (x *SubscribeRequest_InitMessage) String() string { +func (x *SubscribeRequest_Consumer) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SubscribeRequest_InitMessage) ProtoMessage() {} +func (*SubscribeRequest_Consumer) ProtoMessage() {} -func (x *SubscribeRequest_InitMessage) ProtoReflect() protoreflect.Message { +func (x *SubscribeRequest_Consumer) ProtoReflect() protoreflect.Message { mi := &file_mq_proto_msgTypes[27] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -1601,25 +1601,122 @@ func (x *SubscribeRequest_InitMessage) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SubscribeRequest_InitMessage.ProtoReflect.Descriptor instead. -func (*SubscribeRequest_InitMessage) Descriptor() ([]byte, []int) { +// Deprecated: Use SubscribeRequest_Consumer.ProtoReflect.Descriptor instead. +func (*SubscribeRequest_Consumer) Descriptor() ([]byte, []int) { return file_mq_proto_rawDescGZIP(), []int{24, 0} } -func (x *SubscribeRequest_InitMessage) GetTopic() *Topic { +func (x *SubscribeRequest_Consumer) GetConsumerGroup() string { + if x != nil { + return x.ConsumerGroup + } + return "" +} + +func (x *SubscribeRequest_Consumer) GetConsumerId() string { + if x != nil { + return x.ConsumerId + } + return "" +} + +type SubscribeRequest_Cursor struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + // Types that are assignable to Offset: + // + // *SubscribeRequest_Cursor_StartOffset + // *SubscribeRequest_Cursor_StartTimestampNs + Offset isSubscribeRequest_Cursor_Offset `protobuf_oneof:"offset"` +} + +func (x *SubscribeRequest_Cursor) Reset() { + *x = SubscribeRequest_Cursor{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[28] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRequest_Cursor) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRequest_Cursor) ProtoMessage() {} + +func (x *SubscribeRequest_Cursor) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[28] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRequest_Cursor.ProtoReflect.Descriptor instead. +func (*SubscribeRequest_Cursor) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{24, 1} +} + +func (x *SubscribeRequest_Cursor) GetTopic() *Topic { if x != nil { return x.Topic } return nil } -func (x *SubscribeRequest_InitMessage) GetPartition() *Partition { +func (x *SubscribeRequest_Cursor) GetPartition() *Partition { if x != nil { return x.Partition } return nil } +func (m *SubscribeRequest_Cursor) GetOffset() isSubscribeRequest_Cursor_Offset { + if m != nil { + return m.Offset + } + return nil +} + +func (x *SubscribeRequest_Cursor) GetStartOffset() int64 { + if x, ok := x.GetOffset().(*SubscribeRequest_Cursor_StartOffset); ok { + return x.StartOffset + } + return 0 +} + +func (x *SubscribeRequest_Cursor) GetStartTimestampNs() int64 { + if x, ok := x.GetOffset().(*SubscribeRequest_Cursor_StartTimestampNs); ok { + return x.StartTimestampNs + } + return 0 +} + +type isSubscribeRequest_Cursor_Offset interface { + isSubscribeRequest_Cursor_Offset() +} + +type SubscribeRequest_Cursor_StartOffset struct { + StartOffset int64 `protobuf:"varint,3,opt,name=start_offset,json=startOffset,proto3,oneof"` +} + +type SubscribeRequest_Cursor_StartTimestampNs struct { + StartTimestampNs int64 `protobuf:"varint,4,opt,name=start_timestamp_ns,json=startTimestampNs,proto3,oneof"` +} + +func (*SubscribeRequest_Cursor_StartOffset) isSubscribeRequest_Cursor_Offset() {} + +func (*SubscribeRequest_Cursor_StartTimestampNs) isSubscribeRequest_Cursor_Offset() {} + type SubscribeResponse_CtrlMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1632,7 +1729,7 @@ type SubscribeResponse_CtrlMessage struct { func (x *SubscribeResponse_CtrlMessage) Reset() { *x = SubscribeResponse_CtrlMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[28] + mi := &file_mq_proto_msgTypes[29] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1645,7 +1742,7 @@ func (x *SubscribeResponse_CtrlMessage) String() string { func (*SubscribeResponse_CtrlMessage) ProtoMessage() {} func (x *SubscribeResponse_CtrlMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[28] + mi := &file_mq_proto_msgTypes[29] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1858,108 +1955,122 @@ var file_mq_proto_rawDesc = []byte{ 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, - 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x22, 0xdf, 0x01, 0x0a, 0x10, + 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x22, 0xb6, 0x03, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x3e, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, - 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, - 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x6f, 0x0a, 0x0b, - 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74, - 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, - 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0xe5, 0x01, - 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, - 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, - 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x51, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12, - 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, - 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xaf, 0x08, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, - 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, - 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, - 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, - 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, - 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, - 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, - 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, - 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, - 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, - 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, - 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, - 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, - 0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, - 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, - 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, - 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, - 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x81, 0x01, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, - 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, - 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, - 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, - 0x62, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x12, 0x43, 0x0a, 0x08, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, - 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, - 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, - 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, - 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x74, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x52, 0x08, 0x63, 0x6f, 0x6e, + 0x73, 0x75, 0x6d, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x06, 0x63, 0x75, + 0x72, 0x73, 0x6f, 0x72, 0x1a, 0x52, 0x0a, 0x08, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, + 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, + 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, + 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x73, 0x75, + 0x6d, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, + 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x1a, 0xc9, 0x01, 0x0a, 0x06, 0x43, 0x75, 0x72, + 0x73, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, + 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0c, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x6f, + 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x48, 0x00, 0x52, 0x0b, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x2e, 0x0a, 0x12, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x6e, 0x73, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x48, 0x00, 0x52, 0x10, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x4e, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x6f, 0x66, + 0x66, 0x73, 0x65, 0x74, 0x22, 0xe5, 0x01, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x63, 0x74, + 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, 0x0a, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x51, + 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, + 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xaf, 0x08, 0x0a, + 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, + 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, + 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, + 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, + 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, + 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, + 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, + 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, + 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, + 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, + 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, + 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x81, 0x01, 0x0a, 0x1a, 0x43, + 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, + 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x09, + 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x4e, + 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, + 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, + 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1974,7 +2085,7 @@ func file_mq_proto_rawDescGZIP() []byte { return file_mq_proto_rawDescData } -var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 29) +var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 30) var file_mq_proto_goTypes = []interface{}{ (*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest @@ -2003,8 +2114,9 @@ var file_mq_proto_goTypes = []interface{}{ (*SubscribeRequest)(nil), // 24: messaging_pb.SubscribeRequest (*SubscribeResponse)(nil), // 25: messaging_pb.SubscribeResponse (*PublishRequest_InitMessage)(nil), // 26: messaging_pb.PublishRequest.InitMessage - (*SubscribeRequest_InitMessage)(nil), // 27: messaging_pb.SubscribeRequest.InitMessage - (*SubscribeResponse_CtrlMessage)(nil), // 28: messaging_pb.SubscribeResponse.CtrlMessage + (*SubscribeRequest_Consumer)(nil), // 27: messaging_pb.SubscribeRequest.Consumer + (*SubscribeRequest_Cursor)(nil), // 28: messaging_pb.SubscribeRequest.Cursor + (*SubscribeResponse_CtrlMessage)(nil), // 29: messaging_pb.SubscribeResponse.CtrlMessage } var file_mq_proto_depIdxs = []int32{ 5, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment @@ -2023,38 +2135,39 @@ var file_mq_proto_depIdxs = []int32{ 14, // 13: messaging_pb.CheckTopicPartitionsStatusResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment 26, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage 21, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.DataMessage - 27, // 16: messaging_pb.SubscribeRequest.init:type_name -> messaging_pb.SubscribeRequest.InitMessage - 28, // 17: messaging_pb.SubscribeResponse.ctrl:type_name -> messaging_pb.SubscribeResponse.CtrlMessage - 21, // 18: messaging_pb.SubscribeResponse.data:type_name -> messaging_pb.DataMessage - 3, // 19: messaging_pb.PublishRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 4, // 20: messaging_pb.PublishRequest.InitMessage.partition:type_name -> messaging_pb.Partition - 3, // 21: messaging_pb.SubscribeRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 4, // 22: messaging_pb.SubscribeRequest.InitMessage.partition:type_name -> messaging_pb.Partition - 1, // 23: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 6, // 24: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest - 8, // 25: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest - 10, // 26: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest - 12, // 27: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest - 15, // 28: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest - 17, // 29: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest - 19, // 30: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest - 22, // 31: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest - 24, // 32: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscribeRequest - 2, // 33: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 7, // 34: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse - 9, // 35: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse - 11, // 36: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse - 13, // 37: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse - 16, // 38: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse - 18, // 39: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse - 20, // 40: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse - 23, // 41: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse - 25, // 42: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.SubscribeResponse - 33, // [33:43] is the sub-list for method output_type - 23, // [23:33] is the sub-list for method input_type - 23, // [23:23] is the sub-list for extension type_name - 23, // [23:23] is the sub-list for extension extendee - 0, // [0:23] is the sub-list for field type_name + 27, // 16: messaging_pb.SubscribeRequest.consumer:type_name -> messaging_pb.SubscribeRequest.Consumer + 28, // 17: messaging_pb.SubscribeRequest.cursor:type_name -> messaging_pb.SubscribeRequest.Cursor + 29, // 18: messaging_pb.SubscribeResponse.ctrl:type_name -> messaging_pb.SubscribeResponse.CtrlMessage + 21, // 19: messaging_pb.SubscribeResponse.data:type_name -> messaging_pb.DataMessage + 3, // 20: messaging_pb.PublishRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 21: messaging_pb.PublishRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 3, // 22: messaging_pb.SubscribeRequest.Cursor.topic:type_name -> messaging_pb.Topic + 4, // 23: messaging_pb.SubscribeRequest.Cursor.partition:type_name -> messaging_pb.Partition + 1, // 24: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 6, // 25: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest + 8, // 26: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest + 10, // 27: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest + 12, // 28: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest + 15, // 29: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest + 17, // 30: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest + 19, // 31: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest + 22, // 32: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest + 24, // 33: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscribeRequest + 2, // 34: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 7, // 35: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse + 9, // 36: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse + 11, // 37: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse + 13, // 38: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse + 16, // 39: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse + 18, // 40: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse + 20, // 41: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse + 23, // 42: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse + 25, // 43: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.SubscribeResponse + 34, // [34:44] is the sub-list for method output_type + 24, // [24:34] is the sub-list for method input_type + 24, // [24:24] is the sub-list for extension type_name + 24, // [24:24] is the sub-list for extension extendee + 0, // [0:24] is the sub-list for field type_name } func init() { file_mq_proto_init() } @@ -2388,7 +2501,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeRequest_InitMessage); i { + switch v := v.(*SubscribeRequest_Consumer); i { case 0: return &v.state case 1: @@ -2400,6 +2513,18 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest_Cursor); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeResponse_CtrlMessage); i { case 0: return &v.state @@ -2420,13 +2545,17 @@ func file_mq_proto_init() { (*SubscribeResponse_Ctrl)(nil), (*SubscribeResponse_Data)(nil), } + file_mq_proto_msgTypes[28].OneofWrappers = []interface{}{ + (*SubscribeRequest_Cursor_StartOffset)(nil), + (*SubscribeRequest_Cursor_StartTimestampNs)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mq_proto_rawDesc, NumEnums: 0, - NumMessages: 29, + NumMessages: 30, NumExtensions: 0, NumServices: 1, },