can pub and sub

This commit is contained in:
chrislu 2023-09-01 00:36:51 -07:00
parent 1eb2da46d5
commit cb470d44df
11 changed files with 435 additions and 275 deletions

View file

@ -85,6 +85,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p) localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil { if localTopicPartition == nil {
localTopicPartition = topic.NewLocalPartition(t, p, true, nil) localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
} }
} else { } else {
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)

View file

@ -11,20 +11,22 @@ import (
func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error { func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Init.Topic), localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic),
topic.FromPbPartition(req.Init.Partition)) topic.FromPbPartition(req.Cursor.Partition))
if localTopicPartition == nil { if localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeResponse{ stream.Send(&mq_pb.SubscribeResponse{
Message: &mq_pb.SubscribeResponse_Ctrl{ Message: &mq_pb.SubscribeResponse_Ctrl{
Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{ Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
Error: "not found", Error: "not initialized",
}, },
}, },
}) })
return nil return nil
} }
localTopicPartition.Subscribe("client", time.Now(), func(logEntry *filer_pb.LogEntry) error { clientName := fmt.Sprintf("%s/%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId)
localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
value := logEntry.GetData() value := logEntry.GetData()
if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{ if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
Data: &mq_pb.DataMessage{ Data: &mq_pb.DataMessage{

View file

@ -1,50 +1,30 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
) )
func main() { func main() {
err := pb.WithBrokerGrpcClient(true,
"localhost:17777", subscriber := sub_client.NewTopicSubscriber(
grpc.WithTransportCredentials(insecure.NewCredentials()), &sub_client.SubscriberConfiguration{
func(client mq_pb.SeaweedMessagingClient) error { ConsumerGroup: "test",
subClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ ConsumerId: "test",
Init: &mq_pb.SubscribeRequest_InitMessage{
Topic: &mq_pb.Topic{
Namespace: "test",
Name: "test",
}, },
}, "test", "test")
}) if err := subscriber.Connect("localhost:17777"); err != nil {
if err != nil { fmt.Println(err)
return err return
} }
for { if err := subscriber.Subscribe(func(key, value []byte) bool {
resp, err := subClient.Recv() println(string(key), "=>", string(value))
if err != nil { return true
return err }, func() {
} println("done subscribing")
if resp.GetCtrl() != nil { }); err != nil {
if resp.GetCtrl().Error != "" {
return fmt.Errorf("ctrl error: %v", resp.GetCtrl().Error)
}
}
if resp.GetData() != nil {
println(string(resp.GetData().Key), "=>", string(resp.GetData().Value))
}
}
return nil
})
if err != nil {
fmt.Println(err) fmt.Println(err)
} }
} }

View file

@ -5,14 +5,12 @@ import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
) )
func (p *TopicPublisher) doLookup( func (p *TopicPublisher) doLookup(brokerAddress string) error {
brokerAddress string, grpcDialOption grpc.DialOption) error {
err := pb.WithBrokerGrpcClient(true, err := pb.WithBrokerGrpcClient(true,
brokerAddress, brokerAddress,
grpcDialOption, p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error { func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(), lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{ &mq_pb.LookupTopicBrokersRequest{
@ -36,7 +34,7 @@ func (p *TopicPublisher) doLookup(
// send init message // send init message
// save the publishing client // save the publishing client
brokerAddress := brokerPartitionAssignment.LeaderBroker brokerAddress := brokerPartitionAssignment.LeaderBroker
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption) grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
if err != nil { if err != nil {
return fmt.Errorf("dial broker %s: %v", brokerAddress, err) return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
} }

View file

@ -15,6 +15,7 @@ type TopicPublisher struct {
topic string topic string
partition2Broker *interval.SearchTree[string, int32] partition2Broker *interval.SearchTree[string, int32]
broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient]
grpcDialOption grpc.DialOption
} }
func NewTopicPublisher(namespace, topic string) *TopicPublisher { func NewTopicPublisher(namespace, topic string) *TopicPublisher {
@ -25,11 +26,12 @@ func NewTopicPublisher(namespace, topic string) *TopicPublisher {
return int(a - b) return int(a - b)
}), }),
broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](), broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](),
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
} }
} }
func (p *TopicPublisher) Connect(bootstrapBroker string) error { func (p *TopicPublisher) Connect(bootstrapBroker string) error {
if err := p.doLookup(bootstrapBroker, grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { if err := p.doLookup(bootstrapBroker); err != nil {
return err return err
} }
return nil return nil

View file

@ -5,70 +5,30 @@ import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
) )
func (p *TopicSubscriber) doLookup( func (sub *TopicSubscriber) doLookup(brokerAddress string) error {
brokerAddress string, grpcDialOption grpc.DialOption) error {
err := pb.WithBrokerGrpcClient(true, err := pb.WithBrokerGrpcClient(true,
brokerAddress, brokerAddress,
grpcDialOption, sub.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error { func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(), lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{ &mq_pb.LookupTopicBrokersRequest{
Topic: &mq_pb.Topic{ Topic: &mq_pb.Topic{
Namespace: p.namespace, Namespace: sub.namespace,
Name: p.topic, Name: sub.topic,
}, },
IsForPublish: true, IsForPublish: false,
}) })
if err != nil { if err != nil {
return err return err
} }
for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { sub.brokerPartitionAssignments = lookupResp.BrokerPartitionAssignments
// partition => broker
p.partition2Broker.Insert(
brokerPartitionAssignment.Partition.RangeStart,
brokerPartitionAssignment.Partition.RangeStop,
brokerPartitionAssignment.LeaderBroker)
// broker => publish client
// send init message
// save the publishing client
brokerAddress := brokerPartitionAssignment.LeaderBroker
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption)
if err != nil {
return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
}
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
publishClient, err := brokerClient.Publish(context.Background())
if err != nil {
return fmt.Errorf("create publish client: %v", err)
}
p.broker2PublishClient.Set(brokerAddress, publishClient)
if err = publishClient.Send(&mq_pb.PublishRequest{
Message: &mq_pb.PublishRequest_Init{
Init: &mq_pb.PublishRequest_InitMessage{
Topic: &mq_pb.Topic{
Namespace: p.namespace,
Name: p.topic,
},
Partition: &mq_pb.Partition{
RingSize: brokerPartitionAssignment.Partition.RingSize,
RangeStart: brokerPartitionAssignment.Partition.RangeStart,
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
},
},
}); err != nil {
return fmt.Errorf("send init message: %v", err)
}
}
return nil return nil
}) })
if err != nil { if err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) return fmt.Errorf("lookup topic %s/%s: %v", sub.namespace, sub.topic, err)
} }
return nil return nil
} }

View file

@ -1,28 +1,71 @@
package sub_client package sub_client
import ( import (
cmap "github.com/orcaman/concurrent-map" "context"
"github.com/rdleal/intervalst/interval" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
) )
type SubscriberConfiguration struct { type EachMessageFunc func(key, value []byte) (shouldContinue bool)
} type FinalFunc func()
type TopicSubscriber struct { func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn FinalFunc) error {
namespace string var wg sync.WaitGroup
topic string for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments {
partition2Broker *interval.SearchTree[string, int32] brokerAddress := brokerPartitionAssignment.LeaderBroker
broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.grpcDialOption)
} if err != nil {
return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber {
return &TopicSubscriber{
namespace: namespace,
topic: topic,
partition2Broker: interval.NewSearchTree[string](func(a, b int32) int {
return int(a - b)
}),
broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](),
} }
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
Consumer: &mq_pb.SubscribeRequest_Consumer{
ConsumerGroup: sub.config.ConsumerGroup,
ConsumerId: sub.config.ConsumerId,
},
Cursor: &mq_pb.SubscribeRequest_Cursor{
Topic: &mq_pb.Topic{
Namespace: sub.namespace,
Name: sub.topic,
},
Partition: &mq_pb.Partition{
RingSize: brokerPartitionAssignment.Partition.RingSize,
RangeStart: brokerPartitionAssignment.Partition.RangeStart,
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
},
})
if err != nil {
return fmt.Errorf("create subscribe client: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
if finalFn != nil {
defer finalFn()
}
for {
resp, err := subscribeClient.Recv()
if err != nil {
fmt.Printf("subscribe error: %v\n", err)
return
}
if resp.Message == nil {
continue
}
switch m := resp.Message.(type) {
case *mq_pb.SubscribeResponse_Data:
if !eachMessageFn(m.Data.Key, m.Data.Value) {
return
}
case *mq_pb.SubscribeResponse_Ctrl:
// ignore
}
}
}()
}
wg.Wait()
return nil
} }

View file

@ -0,0 +1,36 @@
package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type SubscriberConfiguration struct {
ConsumerGroup string
ConsumerId string
}
type TopicSubscriber struct {
config *SubscriberConfiguration
namespace string
topic string
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
grpcDialOption grpc.DialOption
}
func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber {
return &TopicSubscriber{
config: config,
namespace: namespace,
topic: topic,
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
}
}
func (sub *TopicSubscriber) Connect(bootstrapBroker string) error {
if err := sub.doLookup(bootstrapBroker); err != nil {
return err
}
return nil
}

View file

@ -25,6 +25,7 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition
Partitions: make([]*LocalPartition, 0), Partitions: make([]*LocalPartition, 0),
} }
} }
manager.topics.SetIfAbsent(topic.String(), localTopic)
if localTopic.findPartition(localPartition.Partition) != nil { if localTopic.findPartition(localPartition.Partition) != nil {
return return
} }

View file

@ -162,12 +162,20 @@ message PublishResponse {
string redirect_to_broker = 3; string redirect_to_broker = 3;
} }
message SubscribeRequest { message SubscribeRequest {
message InitMessage { message Consumer {
string consumer_group = 1;
string consumer_id = 2;
}
message Cursor {
Topic topic = 1; Topic topic = 1;
Partition partition = 2; Partition partition = 2;
oneof offset {
int64 start_offset = 3;
int64 start_timestamp_ns = 4;
} }
InitMessage init = 1; }
int64 sequence = 2; Consumer consumer = 1;
Cursor cursor = 2;
} }
message SubscribeResponse { message SubscribeResponse {
message CtrlMessage { message CtrlMessage {

View file

@ -1379,8 +1379,8 @@ type SubscribeRequest struct {
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
Init *SubscribeRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3" json:"init,omitempty"` Consumer *SubscribeRequest_Consumer `protobuf:"bytes,1,opt,name=consumer,proto3" json:"consumer,omitempty"`
Sequence int64 `protobuf:"varint,2,opt,name=sequence,proto3" json:"sequence,omitempty"` Cursor *SubscribeRequest_Cursor `protobuf:"bytes,2,opt,name=cursor,proto3" json:"cursor,omitempty"`
} }
func (x *SubscribeRequest) Reset() { func (x *SubscribeRequest) Reset() {
@ -1415,18 +1415,18 @@ func (*SubscribeRequest) Descriptor() ([]byte, []int) {
return file_mq_proto_rawDescGZIP(), []int{24} return file_mq_proto_rawDescGZIP(), []int{24}
} }
func (x *SubscribeRequest) GetInit() *SubscribeRequest_InitMessage { func (x *SubscribeRequest) GetConsumer() *SubscribeRequest_Consumer {
if x != nil { if x != nil {
return x.Init return x.Consumer
} }
return nil return nil
} }
func (x *SubscribeRequest) GetSequence() int64 { func (x *SubscribeRequest) GetCursor() *SubscribeRequest_Cursor {
if x != nil { if x != nil {
return x.Sequence return x.Cursor
} }
return 0 return nil
} }
type SubscribeResponse struct { type SubscribeResponse struct {
@ -1565,17 +1565,17 @@ func (x *PublishRequest_InitMessage) GetPartition() *Partition {
return nil return nil
} }
type SubscribeRequest_InitMessage struct { type SubscribeRequest_Consumer struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"`
Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` ConsumerId string `protobuf:"bytes,2,opt,name=consumer_id,json=consumerId,proto3" json:"consumer_id,omitempty"`
} }
func (x *SubscribeRequest_InitMessage) Reset() { func (x *SubscribeRequest_Consumer) Reset() {
*x = SubscribeRequest_InitMessage{} *x = SubscribeRequest_Consumer{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_mq_proto_msgTypes[27] mi := &file_mq_proto_msgTypes[27]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -1583,13 +1583,13 @@ func (x *SubscribeRequest_InitMessage) Reset() {
} }
} }
func (x *SubscribeRequest_InitMessage) String() string { func (x *SubscribeRequest_Consumer) String() string {
return protoimpl.X.MessageStringOf(x) return protoimpl.X.MessageStringOf(x)
} }
func (*SubscribeRequest_InitMessage) ProtoMessage() {} func (*SubscribeRequest_Consumer) ProtoMessage() {}
func (x *SubscribeRequest_InitMessage) ProtoReflect() protoreflect.Message { func (x *SubscribeRequest_Consumer) ProtoReflect() protoreflect.Message {
mi := &file_mq_proto_msgTypes[27] mi := &file_mq_proto_msgTypes[27]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -1601,25 +1601,122 @@ func (x *SubscribeRequest_InitMessage) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x) return mi.MessageOf(x)
} }
// Deprecated: Use SubscribeRequest_InitMessage.ProtoReflect.Descriptor instead. // Deprecated: Use SubscribeRequest_Consumer.ProtoReflect.Descriptor instead.
func (*SubscribeRequest_InitMessage) Descriptor() ([]byte, []int) { func (*SubscribeRequest_Consumer) Descriptor() ([]byte, []int) {
return file_mq_proto_rawDescGZIP(), []int{24, 0} return file_mq_proto_rawDescGZIP(), []int{24, 0}
} }
func (x *SubscribeRequest_InitMessage) GetTopic() *Topic { func (x *SubscribeRequest_Consumer) GetConsumerGroup() string {
if x != nil {
return x.ConsumerGroup
}
return ""
}
func (x *SubscribeRequest_Consumer) GetConsumerId() string {
if x != nil {
return x.ConsumerId
}
return ""
}
type SubscribeRequest_Cursor struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"`
// Types that are assignable to Offset:
//
// *SubscribeRequest_Cursor_StartOffset
// *SubscribeRequest_Cursor_StartTimestampNs
Offset isSubscribeRequest_Cursor_Offset `protobuf_oneof:"offset"`
}
func (x *SubscribeRequest_Cursor) Reset() {
*x = SubscribeRequest_Cursor{}
if protoimpl.UnsafeEnabled {
mi := &file_mq_proto_msgTypes[28]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SubscribeRequest_Cursor) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SubscribeRequest_Cursor) ProtoMessage() {}
func (x *SubscribeRequest_Cursor) ProtoReflect() protoreflect.Message {
mi := &file_mq_proto_msgTypes[28]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SubscribeRequest_Cursor.ProtoReflect.Descriptor instead.
func (*SubscribeRequest_Cursor) Descriptor() ([]byte, []int) {
return file_mq_proto_rawDescGZIP(), []int{24, 1}
}
func (x *SubscribeRequest_Cursor) GetTopic() *Topic {
if x != nil { if x != nil {
return x.Topic return x.Topic
} }
return nil return nil
} }
func (x *SubscribeRequest_InitMessage) GetPartition() *Partition { func (x *SubscribeRequest_Cursor) GetPartition() *Partition {
if x != nil { if x != nil {
return x.Partition return x.Partition
} }
return nil return nil
} }
func (m *SubscribeRequest_Cursor) GetOffset() isSubscribeRequest_Cursor_Offset {
if m != nil {
return m.Offset
}
return nil
}
func (x *SubscribeRequest_Cursor) GetStartOffset() int64 {
if x, ok := x.GetOffset().(*SubscribeRequest_Cursor_StartOffset); ok {
return x.StartOffset
}
return 0
}
func (x *SubscribeRequest_Cursor) GetStartTimestampNs() int64 {
if x, ok := x.GetOffset().(*SubscribeRequest_Cursor_StartTimestampNs); ok {
return x.StartTimestampNs
}
return 0
}
type isSubscribeRequest_Cursor_Offset interface {
isSubscribeRequest_Cursor_Offset()
}
type SubscribeRequest_Cursor_StartOffset struct {
StartOffset int64 `protobuf:"varint,3,opt,name=start_offset,json=startOffset,proto3,oneof"`
}
type SubscribeRequest_Cursor_StartTimestampNs struct {
StartTimestampNs int64 `protobuf:"varint,4,opt,name=start_timestamp_ns,json=startTimestampNs,proto3,oneof"`
}
func (*SubscribeRequest_Cursor_StartOffset) isSubscribeRequest_Cursor_Offset() {}
func (*SubscribeRequest_Cursor_StartTimestampNs) isSubscribeRequest_Cursor_Offset() {}
type SubscribeResponse_CtrlMessage struct { type SubscribeResponse_CtrlMessage struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -1632,7 +1729,7 @@ type SubscribeResponse_CtrlMessage struct {
func (x *SubscribeResponse_CtrlMessage) Reset() { func (x *SubscribeResponse_CtrlMessage) Reset() {
*x = SubscribeResponse_CtrlMessage{} *x = SubscribeResponse_CtrlMessage{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_mq_proto_msgTypes[28] mi := &file_mq_proto_msgTypes[29]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -1645,7 +1742,7 @@ func (x *SubscribeResponse_CtrlMessage) String() string {
func (*SubscribeResponse_CtrlMessage) ProtoMessage() {} func (*SubscribeResponse_CtrlMessage) ProtoMessage() {}
func (x *SubscribeResponse_CtrlMessage) ProtoReflect() protoreflect.Message { func (x *SubscribeResponse_CtrlMessage) ProtoReflect() protoreflect.Message {
mi := &file_mq_proto_msgTypes[28] mi := &file_mq_proto_msgTypes[29]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -1858,108 +1955,122 @@ var file_mq_proto_rawDesc = []byte{
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a,
0x12, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x12, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f,
0x6b, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x6b, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72,
0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x22, 0xdf, 0x01, 0x0a, 0x10, 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x22, 0xb6, 0x03, 0x0a, 0x10,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x3e, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a, 0x12, 0x43, 0x0a, 0x08, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01,
0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49,
0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74,
0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01,
0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x6f, 0x0a, 0x0b,
0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74,
0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52,
0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0xe5, 0x01,
0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00,
0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48,
0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x51, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12,
0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b,
0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65,
0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xaf, 0x08, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65,
0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69,
0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25,
0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69,
0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c,
0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74,
0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67,
0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42,
0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74,
0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65,
0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43,
0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75,
0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43,
0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24,
0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68,
0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c,
0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a,
0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b,
0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72,
0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b,
0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f,
0x6e, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61,
0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61,
0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50,
0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x22, 0x00, 0x12, 0x81, 0x01, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70,
0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74,
0x75, 0x73, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74,
0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72,
0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69,
0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
0x62, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x73, 0x74, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x52, 0x08, 0x63, 0x6f, 0x6e,
0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65,
0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x06, 0x63, 0x75,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x72, 0x73, 0x6f, 0x72, 0x1a, 0x52, 0x0a, 0x08, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72,
0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f,
0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d,
0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x73, 0x75,
0x6d, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f,
0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x1a, 0xc9, 0x01, 0x0a, 0x06, 0x43, 0x75, 0x72,
0x73, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35,
0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74,
0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0c, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x6f,
0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x48, 0x00, 0x52, 0x0b, 0x73,
0x74, 0x61, 0x72, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x2e, 0x0a, 0x12, 0x73, 0x74,
0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x6e, 0x73,
0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x48, 0x00, 0x52, 0x10, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x4e, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x6f, 0x66,
0x66, 0x73, 0x65, 0x74, 0x22, 0xe5, 0x01, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x63, 0x74,
0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, 0x0a,
0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x51,
0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f,
0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65,
0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xaf, 0x08, 0x0a,
0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c,
0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c,
0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64,
0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e,
0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29,
0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73,
0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65,
0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53,
0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b,
0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65,
0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65,
0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65,
0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72,
0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b,
0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70,
0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72,
0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50,
0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70,
0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e,
0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12,
0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41,
0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67,
0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x81, 0x01, 0x0a, 0x1a, 0x43,
0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54,
0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74,
0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c,
0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x09,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x4e,
0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f,
0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61,
0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73,
0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -1974,7 +2085,7 @@ func file_mq_proto_rawDescGZIP() []byte {
return file_mq_proto_rawDescData return file_mq_proto_rawDescData
} }
var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 29) var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 30)
var file_mq_proto_goTypes = []interface{}{ var file_mq_proto_goTypes = []interface{}{
(*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo (*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo
(*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest
@ -2003,8 +2114,9 @@ var file_mq_proto_goTypes = []interface{}{
(*SubscribeRequest)(nil), // 24: messaging_pb.SubscribeRequest (*SubscribeRequest)(nil), // 24: messaging_pb.SubscribeRequest
(*SubscribeResponse)(nil), // 25: messaging_pb.SubscribeResponse (*SubscribeResponse)(nil), // 25: messaging_pb.SubscribeResponse
(*PublishRequest_InitMessage)(nil), // 26: messaging_pb.PublishRequest.InitMessage (*PublishRequest_InitMessage)(nil), // 26: messaging_pb.PublishRequest.InitMessage
(*SubscribeRequest_InitMessage)(nil), // 27: messaging_pb.SubscribeRequest.InitMessage (*SubscribeRequest_Consumer)(nil), // 27: messaging_pb.SubscribeRequest.Consumer
(*SubscribeResponse_CtrlMessage)(nil), // 28: messaging_pb.SubscribeResponse.CtrlMessage (*SubscribeRequest_Cursor)(nil), // 28: messaging_pb.SubscribeRequest.Cursor
(*SubscribeResponse_CtrlMessage)(nil), // 29: messaging_pb.SubscribeResponse.CtrlMessage
} }
var file_mq_proto_depIdxs = []int32{ var file_mq_proto_depIdxs = []int32{
5, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment 5, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment
@ -2023,38 +2135,39 @@ var file_mq_proto_depIdxs = []int32{
14, // 13: messaging_pb.CheckTopicPartitionsStatusResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment 14, // 13: messaging_pb.CheckTopicPartitionsStatusResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
26, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage 26, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage
21, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.DataMessage 21, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.DataMessage
27, // 16: messaging_pb.SubscribeRequest.init:type_name -> messaging_pb.SubscribeRequest.InitMessage 27, // 16: messaging_pb.SubscribeRequest.consumer:type_name -> messaging_pb.SubscribeRequest.Consumer
28, // 17: messaging_pb.SubscribeResponse.ctrl:type_name -> messaging_pb.SubscribeResponse.CtrlMessage 28, // 17: messaging_pb.SubscribeRequest.cursor:type_name -> messaging_pb.SubscribeRequest.Cursor
21, // 18: messaging_pb.SubscribeResponse.data:type_name -> messaging_pb.DataMessage 29, // 18: messaging_pb.SubscribeResponse.ctrl:type_name -> messaging_pb.SubscribeResponse.CtrlMessage
3, // 19: messaging_pb.PublishRequest.InitMessage.topic:type_name -> messaging_pb.Topic 21, // 19: messaging_pb.SubscribeResponse.data:type_name -> messaging_pb.DataMessage
4, // 20: messaging_pb.PublishRequest.InitMessage.partition:type_name -> messaging_pb.Partition 3, // 20: messaging_pb.PublishRequest.InitMessage.topic:type_name -> messaging_pb.Topic
3, // 21: messaging_pb.SubscribeRequest.InitMessage.topic:type_name -> messaging_pb.Topic 4, // 21: messaging_pb.PublishRequest.InitMessage.partition:type_name -> messaging_pb.Partition
4, // 22: messaging_pb.SubscribeRequest.InitMessage.partition:type_name -> messaging_pb.Partition 3, // 22: messaging_pb.SubscribeRequest.Cursor.topic:type_name -> messaging_pb.Topic
1, // 23: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest 4, // 23: messaging_pb.SubscribeRequest.Cursor.partition:type_name -> messaging_pb.Partition
6, // 24: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest 1, // 24: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
8, // 25: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest 6, // 25: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest
10, // 26: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest 8, // 26: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest
12, // 27: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest 10, // 27: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest
15, // 28: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest 12, // 28: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest
17, // 29: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest 15, // 29: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest
19, // 30: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest 17, // 30: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest
22, // 31: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest 19, // 31: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest
24, // 32: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscribeRequest 22, // 32: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest
2, // 33: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse 24, // 33: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscribeRequest
7, // 34: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse 2, // 34: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
9, // 35: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse 7, // 35: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse
11, // 36: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse 9, // 36: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse
13, // 37: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse 11, // 37: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse
16, // 38: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse 13, // 38: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse
18, // 39: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse 16, // 39: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse
20, // 40: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse 18, // 40: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse
23, // 41: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse 20, // 41: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse
25, // 42: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.SubscribeResponse 23, // 42: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse
33, // [33:43] is the sub-list for method output_type 25, // 43: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.SubscribeResponse
23, // [23:33] is the sub-list for method input_type 34, // [34:44] is the sub-list for method output_type
23, // [23:23] is the sub-list for extension type_name 24, // [24:34] is the sub-list for method input_type
23, // [23:23] is the sub-list for extension extendee 24, // [24:24] is the sub-list for extension type_name
0, // [0:23] is the sub-list for field type_name 24, // [24:24] is the sub-list for extension extendee
0, // [0:24] is the sub-list for field type_name
} }
func init() { file_mq_proto_init() } func init() { file_mq_proto_init() }
@ -2388,7 +2501,7 @@ func file_mq_proto_init() {
} }
} }
file_mq_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { file_mq_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeRequest_InitMessage); i { switch v := v.(*SubscribeRequest_Consumer); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -2400,6 +2513,18 @@ func file_mq_proto_init() {
} }
} }
file_mq_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} { file_mq_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeRequest_Cursor); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_mq_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeResponse_CtrlMessage); i { switch v := v.(*SubscribeResponse_CtrlMessage); i {
case 0: case 0:
return &v.state return &v.state
@ -2420,13 +2545,17 @@ func file_mq_proto_init() {
(*SubscribeResponse_Ctrl)(nil), (*SubscribeResponse_Ctrl)(nil),
(*SubscribeResponse_Data)(nil), (*SubscribeResponse_Data)(nil),
} }
file_mq_proto_msgTypes[28].OneofWrappers = []interface{}{
(*SubscribeRequest_Cursor_StartOffset)(nil),
(*SubscribeRequest_Cursor_StartTimestampNs)(nil),
}
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{ File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_mq_proto_rawDesc, RawDescriptor: file_mq_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 29, NumMessages: 30,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },