From 905911853dd5103496e8fc9b47934fa3a48da214 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 26 Aug 2023 13:39:21 -0700 Subject: [PATCH] adjust proto --- weed/mq/broker/brokder_grpc_pub.go | 8 +- weed/mq/topic/local_partition.go | 2 +- weed/pb/mq.proto | 34 +- weed/pb/mq_pb/mq.pb.go | 715 ++++++++++++++++++++++------- weed/pb/mq_pb/mq_grpc.pb.go | 68 +++ 5 files changed, 658 insertions(+), 169 deletions(-) diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/brokder_grpc_pub.go index 58ab6e5d2..8b2e685ec 100644 --- a/weed/mq/broker/brokder_grpc_pub.go +++ b/weed/mq/broker/brokder_grpc_pub.go @@ -93,12 +93,12 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS } } else if initMessage := req.GetInit(); initMessage != nil { localTopicPartition = broker.localTopicManager.GetTopicPartition( - topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic), - topic.FromPbPartition(initMessage.Segment.Partition), + topic.FromPbTopic(initMessage.Topic), + topic.FromPbPartition(initMessage.Partition), ) if localTopicPartition == nil { - response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment) - glog.Errorf("topic partition %v not found", initMessage.Segment) + response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) + glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) } } if err := stream.Send(response); err != nil { diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index e26b7afd1..3fc41f65a 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -14,7 +14,7 @@ type LocalPartition struct { logBuffer *log_buffer.LogBuffer } -func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) { +func (p LocalPartition) Publish(message *mq_pb.DataMessage) { p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) } diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto index 47440a46e..5ba859774 100644 --- a/weed/pb/mq.proto +++ b/weed/pb/mq.proto @@ -34,6 +34,8 @@ service SeaweedMessaging { // data plane rpc Publish (stream PublishRequest) returns (stream PublishResponse) { } + rpc Subscribe (stream SubscribeRequest) returns (stream SubscribeResponse) { + } } ////////////////////////////////////////////////// @@ -144,13 +146,14 @@ message CheckTopicPartitionsStatusResponse { } ////////////////////////////////////////////////// +message DataMessage { + bytes key = 1; + bytes value = 2; +} message PublishRequest { message InitMessage { - Segment segment = 1; - } - message DataMessage { - bytes key = 1; - bytes value = 2; + Topic topic = 1; + Partition partition = 2; } oneof message { InitMessage init = 1; @@ -163,3 +166,24 @@ message PublishResponse { string error = 2; bool is_closed = 3; } +message SubscribeRequest { + message InitMessage { + Topic topic = 1; + Partition partition = 2; + } + oneof message { + InitMessage init = 1; + DataMessage data = 2; + } + int64 sequence = 3; +} +message SubscribeResponse { + message CtrlMessage { + string error = 1; + string redirect_to_broker = 2; + } + oneof message { + CtrlMessage ctrl = 1; + DataMessage data = 2; + } +} diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go index 818acc111..6d37a6c1a 100644 --- a/weed/pb/mq_pb/mq.pb.go +++ b/weed/pb/mq_pb/mq.pb.go @@ -1230,6 +1230,61 @@ func (x *CheckTopicPartitionsStatusResponse) GetTopicPartitionsAssignment() *Top } // //////////////////////////////////////////////// +type DataMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *DataMessage) Reset() { + *x = DataMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[22] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DataMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DataMessage) ProtoMessage() {} + +func (x *DataMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[22] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DataMessage.ProtoReflect.Descriptor instead. +func (*DataMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{22} +} + +func (x *DataMessage) GetKey() []byte { + if x != nil { + return x.Key + } + return nil +} + +func (x *DataMessage) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + type PublishRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1246,7 +1301,7 @@ type PublishRequest struct { func (x *PublishRequest) Reset() { *x = PublishRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[22] + mi := &file_mq_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1259,7 +1314,7 @@ func (x *PublishRequest) String() string { func (*PublishRequest) ProtoMessage() {} func (x *PublishRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[22] + mi := &file_mq_proto_msgTypes[23] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1272,7 +1327,7 @@ func (x *PublishRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. func (*PublishRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{22} + return file_mq_proto_rawDescGZIP(), []int{23} } func (m *PublishRequest) GetMessage() isPublishRequest_Message { @@ -1289,7 +1344,7 @@ func (x *PublishRequest) GetInit() *PublishRequest_InitMessage { return nil } -func (x *PublishRequest) GetData() *PublishRequest_DataMessage { +func (x *PublishRequest) GetData() *DataMessage { if x, ok := x.GetMessage().(*PublishRequest_Data); ok { return x.Data } @@ -1312,7 +1367,7 @@ type PublishRequest_Init struct { } type PublishRequest_Data struct { - Data *PublishRequest_DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"` + Data *DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"` } func (*PublishRequest_Init) isPublishRequest_Message() {} @@ -1332,7 +1387,7 @@ type PublishResponse struct { func (x *PublishResponse) Reset() { *x = PublishResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[23] + mi := &file_mq_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1345,7 +1400,7 @@ func (x *PublishResponse) String() string { func (*PublishResponse) ProtoMessage() {} func (x *PublishResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[23] + mi := &file_mq_proto_msgTypes[24] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1358,7 +1413,7 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. func (*PublishResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{23} + return file_mq_proto_rawDescGZIP(), []int{24} } func (x *PublishResponse) GetAckSequence() int64 { @@ -1382,18 +1437,189 @@ func (x *PublishResponse) GetIsClosed() bool { return false } +type SubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Message: + // + // *SubscribeRequest_Init + // *SubscribeRequest_Data + Message isSubscribeRequest_Message `protobuf_oneof:"message"` + Sequence int64 `protobuf:"varint,3,opt,name=sequence,proto3" json:"sequence,omitempty"` +} + +func (x *SubscribeRequest) Reset() { + *x = SubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRequest) ProtoMessage() {} + +func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[25] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{25} +} + +func (m *SubscribeRequest) GetMessage() isSubscribeRequest_Message { + if m != nil { + return m.Message + } + return nil +} + +func (x *SubscribeRequest) GetInit() *SubscribeRequest_InitMessage { + if x, ok := x.GetMessage().(*SubscribeRequest_Init); ok { + return x.Init + } + return nil +} + +func (x *SubscribeRequest) GetData() *DataMessage { + if x, ok := x.GetMessage().(*SubscribeRequest_Data); ok { + return x.Data + } + return nil +} + +func (x *SubscribeRequest) GetSequence() int64 { + if x != nil { + return x.Sequence + } + return 0 +} + +type isSubscribeRequest_Message interface { + isSubscribeRequest_Message() +} + +type SubscribeRequest_Init struct { + Init *SubscribeRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3,oneof"` +} + +type SubscribeRequest_Data struct { + Data *DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"` +} + +func (*SubscribeRequest_Init) isSubscribeRequest_Message() {} + +func (*SubscribeRequest_Data) isSubscribeRequest_Message() {} + +type SubscribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Message: + // + // *SubscribeResponse_Ctrl + // *SubscribeResponse_Data + Message isSubscribeResponse_Message `protobuf_oneof:"message"` +} + +func (x *SubscribeResponse) Reset() { + *x = SubscribeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeResponse) ProtoMessage() {} + +func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[26] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. +func (*SubscribeResponse) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{26} +} + +func (m *SubscribeResponse) GetMessage() isSubscribeResponse_Message { + if m != nil { + return m.Message + } + return nil +} + +func (x *SubscribeResponse) GetCtrl() *SubscribeResponse_CtrlMessage { + if x, ok := x.GetMessage().(*SubscribeResponse_Ctrl); ok { + return x.Ctrl + } + return nil +} + +func (x *SubscribeResponse) GetData() *DataMessage { + if x, ok := x.GetMessage().(*SubscribeResponse_Data); ok { + return x.Data + } + return nil +} + +type isSubscribeResponse_Message interface { + isSubscribeResponse_Message() +} + +type SubscribeResponse_Ctrl struct { + Ctrl *SubscribeResponse_CtrlMessage `protobuf:"bytes,1,opt,name=ctrl,proto3,oneof"` +} + +type SubscribeResponse_Data struct { + Data *DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"` +} + +func (*SubscribeResponse_Ctrl) isSubscribeResponse_Message() {} + +func (*SubscribeResponse_Data) isSubscribeResponse_Message() {} + type PublishRequest_InitMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Segment *Segment `protobuf:"bytes,1,opt,name=segment,proto3" json:"segment,omitempty"` + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` } func (x *PublishRequest_InitMessage) Reset() { *x = PublishRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[24] + mi := &file_mq_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1406,7 +1632,7 @@ func (x *PublishRequest_InitMessage) String() string { func (*PublishRequest_InitMessage) ProtoMessage() {} func (x *PublishRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[24] + mi := &file_mq_proto_msgTypes[27] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1419,42 +1645,49 @@ func (x *PublishRequest_InitMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishRequest_InitMessage.ProtoReflect.Descriptor instead. func (*PublishRequest_InitMessage) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{22, 0} + return file_mq_proto_rawDescGZIP(), []int{23, 0} } -func (x *PublishRequest_InitMessage) GetSegment() *Segment { +func (x *PublishRequest_InitMessage) GetTopic() *Topic { if x != nil { - return x.Segment + return x.Topic } return nil } -type PublishRequest_DataMessage struct { +func (x *PublishRequest_InitMessage) GetPartition() *Partition { + if x != nil { + return x.Partition + } + return nil +} + +type SubscribeRequest_InitMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` } -func (x *PublishRequest_DataMessage) Reset() { - *x = PublishRequest_DataMessage{} +func (x *SubscribeRequest_InitMessage) Reset() { + *x = SubscribeRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[25] + mi := &file_mq_proto_msgTypes[28] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } -func (x *PublishRequest_DataMessage) String() string { +func (x *SubscribeRequest_InitMessage) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PublishRequest_DataMessage) ProtoMessage() {} +func (*SubscribeRequest_InitMessage) ProtoMessage() {} -func (x *PublishRequest_DataMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[25] +func (x *SubscribeRequest_InitMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[28] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1465,25 +1698,80 @@ func (x *PublishRequest_DataMessage) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PublishRequest_DataMessage.ProtoReflect.Descriptor instead. -func (*PublishRequest_DataMessage) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{22, 1} +// Deprecated: Use SubscribeRequest_InitMessage.ProtoReflect.Descriptor instead. +func (*SubscribeRequest_InitMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{25, 0} } -func (x *PublishRequest_DataMessage) GetKey() []byte { +func (x *SubscribeRequest_InitMessage) GetTopic() *Topic { if x != nil { - return x.Key + return x.Topic } return nil } -func (x *PublishRequest_DataMessage) GetValue() []byte { +func (x *SubscribeRequest_InitMessage) GetPartition() *Partition { if x != nil { - return x.Value + return x.Partition } return nil } +type SubscribeResponse_CtrlMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + RedirectToBroker string `protobuf:"bytes,2,opt,name=redirect_to_broker,json=redirectToBroker,proto3" json:"redirect_to_broker,omitempty"` +} + +func (x *SubscribeResponse_CtrlMessage) Reset() { + *x = SubscribeResponse_CtrlMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[29] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeResponse_CtrlMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeResponse_CtrlMessage) ProtoMessage() {} + +func (x *SubscribeResponse_CtrlMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[29] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeResponse_CtrlMessage.ProtoReflect.Descriptor instead. +func (*SubscribeResponse_CtrlMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{26, 0} +} + +func (x *SubscribeResponse_CtrlMessage) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *SubscribeResponse_CtrlMessage) GetRedirectToBroker() string { + if x != nil { + return x.RedirectToBroker + } + return "" +} + var File_mq_proto protoreflect.FileDescriptor var file_mq_proto_rawDesc = []byte{ @@ -1649,100 +1937,140 @@ var file_mq_proto_rawDesc = []byte{ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x19, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xae, 0x02, - 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x3e, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, + 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x35, 0x0a, + 0x0b, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, + 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x22, 0x99, 0x02, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3e, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, + 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, + 0x65, 0x6e, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, + 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x6f, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, + 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x22, 0x67, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, + 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, + 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, + 0x69, 0x73, 0x5f, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x08, 0x69, 0x73, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x22, 0x9d, 0x02, 0x0a, 0x10, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x40, + 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, - 0x12, 0x3e, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, - 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x3e, 0x0a, 0x0b, - 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x73, - 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d, - 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x35, 0x0a, 0x0b, - 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, - 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x67, - 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, - 0x65, 0x6e, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, - 0x5f, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, - 0x73, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x32, 0xd7, 0x07, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, - 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, - 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, - 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, - 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, - 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, - 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, - 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, - 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, - 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, + 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, + 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, + 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x6f, 0x0a, + 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x05, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, + 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x09, + 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xe5, 0x01, 0x0a, 0x11, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x41, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, + 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74, + 0x72, 0x6c, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, + 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x1a, 0x51, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12, 0x72, 0x65, 0x64, 0x69, + 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x54, 0x6f, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x32, 0xab, 0x08, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, + 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, + 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x73, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, - 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, - 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, - 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2b, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, - 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x81, 0x01, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, + 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x25, 0x2e, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, + 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, + 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, + 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, + 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, + 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x81, 0x01, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, - 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1c, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, - 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, - 0x01, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, - 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, - 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, - 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, + 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, + 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x09, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, + 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, + 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, + 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, + 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1757,7 +2085,7 @@ func file_mq_proto_rawDescGZIP() []byte { return file_mq_proto_rawDescData } -var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 26) +var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 30) var file_mq_proto_goTypes = []interface{}{ (*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest @@ -1781,10 +2109,14 @@ var file_mq_proto_goTypes = []interface{}{ (*AssignTopicPartitionsResponse)(nil), // 19: messaging_pb.AssignTopicPartitionsResponse (*CheckTopicPartitionsStatusRequest)(nil), // 20: messaging_pb.CheckTopicPartitionsStatusRequest (*CheckTopicPartitionsStatusResponse)(nil), // 21: messaging_pb.CheckTopicPartitionsStatusResponse - (*PublishRequest)(nil), // 22: messaging_pb.PublishRequest - (*PublishResponse)(nil), // 23: messaging_pb.PublishResponse - (*PublishRequest_InitMessage)(nil), // 24: messaging_pb.PublishRequest.InitMessage - (*PublishRequest_DataMessage)(nil), // 25: messaging_pb.PublishRequest.DataMessage + (*DataMessage)(nil), // 22: messaging_pb.DataMessage + (*PublishRequest)(nil), // 23: messaging_pb.PublishRequest + (*PublishResponse)(nil), // 24: messaging_pb.PublishResponse + (*SubscribeRequest)(nil), // 25: messaging_pb.SubscribeRequest + (*SubscribeResponse)(nil), // 26: messaging_pb.SubscribeResponse + (*PublishRequest_InitMessage)(nil), // 27: messaging_pb.PublishRequest.InitMessage + (*SubscribeRequest_InitMessage)(nil), // 28: messaging_pb.SubscribeRequest.InitMessage + (*SubscribeResponse_CtrlMessage)(nil), // 29: messaging_pb.SubscribeResponse.CtrlMessage } var file_mq_proto_depIdxs = []int32{ 5, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment @@ -1801,32 +2133,41 @@ var file_mq_proto_depIdxs = []int32{ 15, // 11: messaging_pb.AssignTopicPartitionsRequest.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment 14, // 12: messaging_pb.CheckTopicPartitionsStatusRequest.broker_partitions_assignment:type_name -> messaging_pb.BrokerPartitionsAssignment 15, // 13: messaging_pb.CheckTopicPartitionsStatusResponse.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment - 24, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage - 25, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.PublishRequest.DataMessage - 5, // 16: messaging_pb.PublishRequest.InitMessage.segment:type_name -> messaging_pb.Segment - 1, // 17: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 6, // 18: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest - 8, // 19: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest - 10, // 20: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest - 12, // 21: messaging_pb.SeaweedMessaging.FindTopicBrokers:input_type -> messaging_pb.FindTopicBrokersRequest - 16, // 22: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest - 18, // 23: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest - 20, // 24: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest - 22, // 25: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest - 2, // 26: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 7, // 27: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse - 9, // 28: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse - 11, // 29: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse - 13, // 30: messaging_pb.SeaweedMessaging.FindTopicBrokers:output_type -> messaging_pb.FindTopicBrokersResponse - 17, // 31: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse - 19, // 32: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse - 21, // 33: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse - 23, // 34: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse - 26, // [26:35] is the sub-list for method output_type - 17, // [17:26] is the sub-list for method input_type - 17, // [17:17] is the sub-list for extension type_name - 17, // [17:17] is the sub-list for extension extendee - 0, // [0:17] is the sub-list for field type_name + 27, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage + 22, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.DataMessage + 28, // 16: messaging_pb.SubscribeRequest.init:type_name -> messaging_pb.SubscribeRequest.InitMessage + 22, // 17: messaging_pb.SubscribeRequest.data:type_name -> messaging_pb.DataMessage + 29, // 18: messaging_pb.SubscribeResponse.ctrl:type_name -> messaging_pb.SubscribeResponse.CtrlMessage + 22, // 19: messaging_pb.SubscribeResponse.data:type_name -> messaging_pb.DataMessage + 3, // 20: messaging_pb.PublishRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 21: messaging_pb.PublishRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 3, // 22: messaging_pb.SubscribeRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 23: messaging_pb.SubscribeRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 1, // 24: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 6, // 25: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest + 8, // 26: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest + 10, // 27: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest + 12, // 28: messaging_pb.SeaweedMessaging.FindTopicBrokers:input_type -> messaging_pb.FindTopicBrokersRequest + 16, // 29: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest + 18, // 30: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest + 20, // 31: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest + 23, // 32: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest + 25, // 33: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscribeRequest + 2, // 34: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 7, // 35: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse + 9, // 36: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse + 11, // 37: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse + 13, // 38: messaging_pb.SeaweedMessaging.FindTopicBrokers:output_type -> messaging_pb.FindTopicBrokersResponse + 17, // 39: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse + 19, // 40: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse + 21, // 41: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse + 24, // 42: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse + 26, // 43: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.SubscribeResponse + 34, // [34:44] is the sub-list for method output_type + 24, // [24:34] is the sub-list for method input_type + 24, // [24:24] is the sub-list for extension type_name + 24, // [24:24] is the sub-list for extension extendee + 0, // [0:24] is the sub-list for field type_name } func init() { file_mq_proto_init() } @@ -2100,7 +2441,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[22].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest); i { + switch v := v.(*DataMessage); i { case 0: return &v.state case 1: @@ -2112,7 +2453,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[23].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishResponse); i { + switch v := v.(*PublishRequest); i { case 0: return &v.state case 1: @@ -2124,7 +2465,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[24].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest_InitMessage); i { + switch v := v.(*PublishResponse); i { case 0: return &v.state case 1: @@ -2136,7 +2477,55 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest_DataMessage); i { + switch v := v.(*SubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishRequest_InitMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest_InitMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeResponse_CtrlMessage); i { case 0: return &v.state case 1: @@ -2148,17 +2537,25 @@ func file_mq_proto_init() { } } } - file_mq_proto_msgTypes[22].OneofWrappers = []interface{}{ + file_mq_proto_msgTypes[23].OneofWrappers = []interface{}{ (*PublishRequest_Init)(nil), (*PublishRequest_Data)(nil), } + file_mq_proto_msgTypes[25].OneofWrappers = []interface{}{ + (*SubscribeRequest_Init)(nil), + (*SubscribeRequest_Data)(nil), + } + file_mq_proto_msgTypes[26].OneofWrappers = []interface{}{ + (*SubscribeResponse_Ctrl)(nil), + (*SubscribeResponse_Data)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mq_proto_rawDesc, NumEnums: 0, - NumMessages: 26, + NumMessages: 30, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go index c60ecbe5b..f655c4e5b 100644 --- a/weed/pb/mq_pb/mq_grpc.pb.go +++ b/weed/pb/mq_pb/mq_grpc.pb.go @@ -35,6 +35,7 @@ type SeaweedMessagingClient interface { CheckTopicPartitionsStatus(ctx context.Context, in *CheckTopicPartitionsStatusRequest, opts ...grpc.CallOption) (*CheckTopicPartitionsStatusResponse, error) // data plane Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) + Subscribe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) } type seaweedMessagingClient struct { @@ -148,6 +149,37 @@ func (x *seaweedMessagingPublishClient) Recv() (*PublishResponse, error) { return m, nil } +func (c *seaweedMessagingClient) Subscribe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) { + stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[1], "/messaging_pb.SeaweedMessaging/Subscribe", opts...) + if err != nil { + return nil, err + } + x := &seaweedMessagingSubscribeClient{stream} + return x, nil +} + +type SeaweedMessaging_SubscribeClient interface { + Send(*SubscribeRequest) error + Recv() (*SubscribeResponse, error) + grpc.ClientStream +} + +type seaweedMessagingSubscribeClient struct { + grpc.ClientStream +} + +func (x *seaweedMessagingSubscribeClient) Send(m *SubscribeRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *seaweedMessagingSubscribeClient) Recv() (*SubscribeResponse, error) { + m := new(SubscribeResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // SeaweedMessagingServer is the server API for SeaweedMessaging service. // All implementations must embed UnimplementedSeaweedMessagingServer // for forward compatibility @@ -165,6 +197,7 @@ type SeaweedMessagingServer interface { CheckTopicPartitionsStatus(context.Context, *CheckTopicPartitionsStatusRequest) (*CheckTopicPartitionsStatusResponse, error) // data plane Publish(SeaweedMessaging_PublishServer) error + Subscribe(SeaweedMessaging_SubscribeServer) error mustEmbedUnimplementedSeaweedMessagingServer() } @@ -199,6 +232,9 @@ func (UnimplementedSeaweedMessagingServer) CheckTopicPartitionsStatus(context.Co func (UnimplementedSeaweedMessagingServer) Publish(SeaweedMessaging_PublishServer) error { return status.Errorf(codes.Unimplemented, "method Publish not implemented") } +func (UnimplementedSeaweedMessagingServer) Subscribe(SeaweedMessaging_SubscribeServer) error { + return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") +} func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {} // UnsafeSeaweedMessagingServer may be embedded to opt out of forward compatibility for this service. @@ -382,6 +418,32 @@ func (x *seaweedMessagingPublishServer) Recv() (*PublishRequest, error) { return m, nil } +func _SeaweedMessaging_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SeaweedMessagingServer).Subscribe(&seaweedMessagingSubscribeServer{stream}) +} + +type SeaweedMessaging_SubscribeServer interface { + Send(*SubscribeResponse) error + Recv() (*SubscribeRequest, error) + grpc.ServerStream +} + +type seaweedMessagingSubscribeServer struct { + grpc.ServerStream +} + +func (x *seaweedMessagingSubscribeServer) Send(m *SubscribeResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *seaweedMessagingSubscribeServer) Recv() (*SubscribeRequest, error) { + m := new(SubscribeRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -429,6 +491,12 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "Subscribe", + Handler: _SeaweedMessaging_Subscribe_Handler, + ServerStreams: true, + ClientStreams: true, + }, }, Metadata: "mq.proto", }