mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
renaming
This commit is contained in:
parent
8c4edf7b40
commit
21b6b07dd8
|
@ -10,10 +10,10 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/grace"
|
"github.com/chrislusf/seaweedfs/weed/util/grace"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
|
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
@ -100,7 +100,7 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
|
||||||
glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
|
glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
|
||||||
}
|
}
|
||||||
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
|
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
|
||||||
messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
|
mq_pb.RegisterSeaweedMessagingServer(grpcS, qs)
|
||||||
reflection.Register(grpcS)
|
reflection.Register(grpcS)
|
||||||
grpcS.Serve(grpcL)
|
grpcS.Serve(grpcL)
|
||||||
|
|
||||||
|
|
|
@ -10,11 +10,11 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
|
func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error {
|
||||||
|
|
||||||
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
|
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
|
@ -46,7 +46,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
|
func (broker *MessageBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
|
||||||
|
|
||||||
var assignResult = &operation.AssignResult{}
|
var assignResult = &operation.AssignResult{}
|
||||||
|
|
|
@ -6,15 +6,15 @@ import (
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
|
func (broker *MessageBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) {
|
func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) {
|
||||||
resp := &messaging_pb.DeleteTopicResponse{}
|
resp := &mq_pb.DeleteTopicResponse{}
|
||||||
dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
|
dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
|
||||||
if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
|
if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -24,7 +24,7 @@ func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_p
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
|
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -26,9 +26,9 @@ If one of the pub or sub connects very late, and the system topo changed quite a
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
|
func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) {
|
||||||
|
|
||||||
t := &messaging_pb.FindBrokerResponse{}
|
t := &mq_pb.FindBrokerResponse{}
|
||||||
var peers []string
|
var peers []string
|
||||||
|
|
||||||
targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
|
targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
|
|
@ -10,10 +10,10 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
|
func (broker *MessageBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
|
||||||
|
|
||||||
// process initial request
|
// process initial request
|
||||||
in, err := stream.Recv()
|
in, err := stream.Recv()
|
||||||
|
@ -25,12 +25,12 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO look it up
|
// TODO look it up
|
||||||
topicConfig := &messaging_pb.TopicConfiguration{
|
topicConfig := &mq_pb.TopicConfiguration{
|
||||||
// IsTransient: true,
|
// IsTransient: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
// send init response
|
// send init response
|
||||||
initResponse := &messaging_pb.PublishResponse{
|
initResponse := &mq_pb.PublishResponse{
|
||||||
Config: nil,
|
Config: nil,
|
||||||
Redirect: nil,
|
Redirect: nil,
|
||||||
}
|
}
|
||||||
|
@ -104,7 +104,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
|
||||||
|
|
||||||
// send the close ack
|
// send the close ack
|
||||||
// println("server send ack closing")
|
// println("server send ack closing")
|
||||||
if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
|
if err := stream.Send(&mq_pb.PublishResponse{IsClosed: true}); err != nil {
|
||||||
glog.V(0).Infof("err sending close response: %v", err)
|
glog.V(0).Infof("err sending close response: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
|
@ -13,10 +13,10 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
|
func (broker *MessageBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error {
|
||||||
|
|
||||||
// process initial request
|
// process initial request
|
||||||
in, err := stream.Recv()
|
in, err := stream.Recv()
|
||||||
|
@ -32,7 +32,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
|
||||||
subscriberId := in.Init.SubscriberId
|
subscriberId := in.Init.SubscriberId
|
||||||
|
|
||||||
// TODO look it up
|
// TODO look it up
|
||||||
topicConfig := &messaging_pb.TopicConfiguration{
|
topicConfig := &mq_pb.TopicConfiguration{
|
||||||
// IsTransient: true,
|
// IsTransient: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,17 +63,17 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
|
||||||
|
|
||||||
lastReadTime := time.Now()
|
lastReadTime := time.Now()
|
||||||
switch in.Init.StartPosition {
|
switch in.Init.StartPosition {
|
||||||
case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
|
case mq_pb.SubscriberMessage_InitMessage_TIMESTAMP:
|
||||||
lastReadTime = time.Unix(0, in.Init.TimestampNs)
|
lastReadTime = time.Unix(0, in.Init.TimestampNs)
|
||||||
case messaging_pb.SubscriberMessage_InitMessage_LATEST:
|
case mq_pb.SubscriberMessage_InitMessage_LATEST:
|
||||||
case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
|
case mq_pb.SubscriberMessage_InitMessage_EARLIEST:
|
||||||
lastReadTime = time.Unix(0, 0)
|
lastReadTime = time.Unix(0, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// how to process each message
|
// how to process each message
|
||||||
// an error returned will end the subscription
|
// an error returned will end the subscription
|
||||||
eachMessageFn := func(m *messaging_pb.Message) error {
|
eachMessageFn := func(m *mq_pb.Message) error {
|
||||||
err := stream.Send(&messaging_pb.BrokerMessage{
|
err := stream.Send(&mq_pb.BrokerMessage{
|
||||||
Data: m,
|
Data: m,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -83,9 +83,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
|
||||||
}
|
}
|
||||||
|
|
||||||
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
|
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
|
||||||
m := &messaging_pb.Message{}
|
m := &mq_pb.Message{}
|
||||||
if err = proto.Unmarshal(logEntry.Data, m); err != nil {
|
if err = proto.Unmarshal(logEntry.Data, m); err != nil {
|
||||||
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
|
glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
|
// fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
|
|
@ -2,7 +2,7 @@ package broker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
@ -23,7 +23,7 @@ type MessageBrokerOption struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type MessageBroker struct {
|
type MessageBroker struct {
|
||||||
messaging_pb.UnimplementedSeaweedMessagingServer
|
mq_pb.UnimplementedSeaweedMessagingServer
|
||||||
option *MessageBrokerOption
|
option *MessageBrokerOption
|
||||||
grpcDialOption grpc.DialOption
|
grpcDialOption grpc.DialOption
|
||||||
topicManager *TopicManager
|
topicManager *TopicManager
|
|
@ -7,7 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
|
func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *mq_pb.TopicConfiguration) *log_buffer.LogBuffer {
|
||||||
|
|
||||||
flushFn := func(startTime, stopTime time.Time, buf []byte) {
|
flushFn := func(startTime, stopTime time.Time, buf []byte) {
|
||||||
|
|
||||||
|
@ -75,7 +75,7 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topi
|
||||||
return logBuffer
|
return logBuffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
|
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *mq_pb.TopicConfiguration, isPublisher bool) *TopicControl {
|
||||||
tm.Lock()
|
tm.Lock()
|
||||||
defer tm.Unlock()
|
defer tm.Unlock()
|
||||||
|
|
|
@ -8,12 +8,12 @@ import (
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
|
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PubChannel struct {
|
type PubChannel struct {
|
||||||
client messaging_pb.SeaweedMessaging_PublishClient
|
client mq_pb.SeaweedMessaging_PublishClient
|
||||||
grpcConnection *grpc.ClientConn
|
grpcConnection *grpc.ClientConn
|
||||||
md5hash hash.Hash
|
md5hash hash.Hash
|
||||||
}
|
}
|
||||||
|
@ -40,8 +40,8 @@ func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *PubChannel) Publish(m []byte) error {
|
func (pc *PubChannel) Publish(m []byte) error {
|
||||||
err := pc.client.Send(&messaging_pb.PublishRequest{
|
err := pc.client.Send(&mq_pb.PublishRequest{
|
||||||
Data: &messaging_pb.Message{
|
Data: &mq_pb.Message{
|
||||||
Value: m,
|
Value: m,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
@ -53,8 +53,8 @@ func (pc *PubChannel) Publish(m []byte) error {
|
||||||
func (pc *PubChannel) Close() error {
|
func (pc *PubChannel) Close() error {
|
||||||
|
|
||||||
// println("send closing")
|
// println("send closing")
|
||||||
if err := pc.client.Send(&messaging_pb.PublishRequest{
|
if err := pc.client.Send(&mq_pb.PublishRequest{
|
||||||
Data: &messaging_pb.Message{
|
Data: &mq_pb.Message{
|
||||||
IsClose: true,
|
IsClose: true,
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
|
@ -8,13 +8,13 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
|
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SubChannel struct {
|
type SubChannel struct {
|
||||||
ch chan []byte
|
ch chan []byte
|
||||||
stream messaging_pb.SeaweedMessaging_SubscribeClient
|
stream mq_pb.SeaweedMessaging_SubscribeClient
|
||||||
md5hash hash.Hash
|
md5hash hash.Hash
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if resp.Data.IsClose {
|
if resp.Data.IsClose {
|
||||||
t.stream.Send(&messaging_pb.SubscriberMessage{
|
t.stream.Send(&mq_pb.SubscriberMessage{
|
||||||
IsClose: true,
|
IsClose: true,
|
||||||
})
|
})
|
||||||
close(t.ch)
|
close(t.ch)
|
|
@ -7,9 +7,9 @@ import (
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
|
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
@ -38,8 +38,8 @@ func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientCon
|
||||||
}
|
}
|
||||||
defer grpcConnection.Close()
|
defer grpcConnection.Close()
|
||||||
|
|
||||||
resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
|
resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
|
||||||
&messaging_pb.FindBrokerRequest{
|
&mq_pb.FindBrokerRequest{
|
||||||
Namespace: tp.Namespace,
|
Namespace: tp.Namespace,
|
||||||
Topic: tp.Topic,
|
Topic: tp.Topic,
|
||||||
Parition: tp.Partition,
|
Parition: tp.Partition,
|
|
@ -4,19 +4,19 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
|
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
|
func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
|
||||||
|
|
||||||
return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
|
return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
|
||||||
_, err := client.ConfigureTopic(context.Background(),
|
_, err := client.ConfigureTopic(context.Background(),
|
||||||
&messaging_pb.ConfigureTopicRequest{
|
&mq_pb.ConfigureTopicRequest{
|
||||||
Namespace: tp.Namespace,
|
Namespace: tp.Namespace,
|
||||||
Topic: tp.Topic,
|
Topic: tp.Topic,
|
||||||
Configuration: &messaging_pb.TopicConfiguration{
|
Configuration: &mq_pb.TopicConfiguration{
|
||||||
PartitionCount: 0,
|
PartitionCount: 0,
|
||||||
Collection: "",
|
Collection: "",
|
||||||
Replication: "",
|
Replication: "",
|
||||||
|
@ -31,9 +31,9 @@ func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
|
||||||
|
|
||||||
func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
|
func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
|
||||||
|
|
||||||
return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
|
return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
|
||||||
_, err := client.DeleteTopic(context.Background(),
|
_, err := client.DeleteTopic(context.Background(),
|
||||||
&messaging_pb.DeleteTopicRequest{
|
&mq_pb.DeleteTopicRequest{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
})
|
})
|
||||||
|
@ -41,7 +41,7 @@ func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error {
|
func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error {
|
||||||
|
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for _, broker := range mc.bootstrapBrokers {
|
for _, broker := range mc.bootstrapBrokers {
|
||||||
|
@ -52,7 +52,7 @@ func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMess
|
||||||
}
|
}
|
||||||
defer grpcConnection.Close()
|
defer grpcConnection.Close()
|
||||||
|
|
||||||
err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection))
|
err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
|
@ -6,23 +6,23 @@ import (
|
||||||
"github.com/OneOfOne/xxhash"
|
"github.com/OneOfOne/xxhash"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
|
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Publisher struct {
|
type Publisher struct {
|
||||||
publishClients []messaging_pb.SeaweedMessaging_PublishClient
|
publishClients []mq_pb.SeaweedMessaging_PublishClient
|
||||||
topicConfiguration *messaging_pb.TopicConfiguration
|
topicConfiguration *mq_pb.TopicConfiguration
|
||||||
messageCount uint64
|
messageCount uint64
|
||||||
publisherId string
|
publisherId string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
|
func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
|
||||||
// read topic configuration
|
// read topic configuration
|
||||||
topicConfiguration := &messaging_pb.TopicConfiguration{
|
topicConfiguration := &mq_pb.TopicConfiguration{
|
||||||
PartitionCount: 4,
|
PartitionCount: 4,
|
||||||
}
|
}
|
||||||
publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
|
publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
|
||||||
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
|
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
|
||||||
tp := broker.TopicPartition{
|
tp := broker.TopicPartition{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
|
@ -45,16 +45,16 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
|
func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) {
|
||||||
|
|
||||||
stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
|
stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// send init message
|
// send init message
|
||||||
err = stream.Send(&messaging_pb.PublishRequest{
|
err = stream.Send(&mq_pb.PublishRequest{
|
||||||
Init: &messaging_pb.PublishRequest_InitMessage{
|
Init: &mq_pb.PublishRequest_InitMessage{
|
||||||
Namespace: tp.Namespace,
|
Namespace: tp.Namespace,
|
||||||
Topic: tp.Topic,
|
Topic: tp.Topic,
|
||||||
Partition: tp.Partition,
|
Partition: tp.Partition,
|
||||||
|
@ -95,14 +95,14 @@ func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartit
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Publisher) Publish(m *messaging_pb.Message) error {
|
func (p *Publisher) Publish(m *mq_pb.Message) error {
|
||||||
hashValue := p.messageCount
|
hashValue := p.messageCount
|
||||||
p.messageCount++
|
p.messageCount++
|
||||||
if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
|
if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash {
|
||||||
if m.Key != nil {
|
if m.Key != nil {
|
||||||
hashValue = xxhash.Checksum64(m.Key)
|
hashValue = xxhash.Checksum64(m.Key)
|
||||||
}
|
}
|
||||||
} else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
|
} else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash {
|
||||||
hashValue = xxhash.Checksum64(m.Key)
|
hashValue = xxhash.Checksum64(m.Key)
|
||||||
} else {
|
} else {
|
||||||
// round robin
|
// round robin
|
||||||
|
@ -112,7 +112,7 @@ func (p *Publisher) Publish(m *messaging_pb.Message) error {
|
||||||
if idx < 0 {
|
if idx < 0 {
|
||||||
idx += len(p.publishClients)
|
idx += len(p.publishClients)
|
||||||
}
|
}
|
||||||
return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
|
return p.publishClients[idx].Send(&mq_pb.PublishRequest{
|
||||||
Data: m,
|
Data: m,
|
||||||
})
|
})
|
||||||
}
|
}
|
|
@ -6,23 +6,23 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
|
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Subscriber struct {
|
type Subscriber struct {
|
||||||
subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
|
subscriberClients []mq_pb.SeaweedMessaging_SubscribeClient
|
||||||
subscriberCancels []context.CancelFunc
|
subscriberCancels []context.CancelFunc
|
||||||
subscriberId string
|
subscriberId string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
|
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
|
||||||
// read topic configuration
|
// read topic configuration
|
||||||
topicConfiguration := &messaging_pb.TopicConfiguration{
|
topicConfiguration := &mq_pb.TopicConfiguration{
|
||||||
PartitionCount: 4,
|
PartitionCount: 4,
|
||||||
}
|
}
|
||||||
subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
|
subscriberClients := make([]mq_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
|
||||||
subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
|
subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
|
||||||
|
|
||||||
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
|
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
|
||||||
|
@ -54,19 +54,19 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
|
func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream mq_pb.SeaweedMessaging_SubscribeClient, err error) {
|
||||||
stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
|
stream, err = mq_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// send init message
|
// send init message
|
||||||
err = stream.Send(&messaging_pb.SubscriberMessage{
|
err = stream.Send(&mq_pb.SubscriberMessage{
|
||||||
Init: &messaging_pb.SubscriberMessage_InitMessage{
|
Init: &mq_pb.SubscriberMessage_InitMessage{
|
||||||
Namespace: tp.Namespace,
|
Namespace: tp.Namespace,
|
||||||
Topic: tp.Topic,
|
Topic: tp.Topic,
|
||||||
Partition: tp.Partition,
|
Partition: tp.Partition,
|
||||||
StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
|
StartPosition: mq_pb.SubscriberMessage_InitMessage_TIMESTAMP,
|
||||||
TimestampNs: startTime.UnixNano(),
|
TimestampNs: startTime.UnixNano(),
|
||||||
SubscriberId: subscriberId,
|
SubscriberId: subscriberId,
|
||||||
},
|
},
|
||||||
|
@ -78,7 +78,7 @@ func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn,
|
||||||
return stream, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
|
func doSubscribe(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient, processFn func(m *mq_pb.Message)) error {
|
||||||
for {
|
for {
|
||||||
resp, listenErr := subscriberClient.Recv()
|
resp, listenErr := subscriberClient.Recv()
|
||||||
if listenErr == io.EOF {
|
if listenErr == io.EOF {
|
||||||
|
@ -97,12 +97,12 @@ func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe starts goroutines to process the messages
|
// Subscribe starts goroutines to process the messages
|
||||||
func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
|
func (s *Subscriber) Subscribe(processFn func(m *mq_pb.Message)) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < len(s.subscriberClients); i++ {
|
for i := 0; i < len(s.subscriberClients); i++ {
|
||||||
if s.subscriberClients[i] != nil {
|
if s.subscriberClients[i] != nil {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) {
|
go func(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
doSubscribe(subscriberClient, processFn)
|
doSubscribe(subscriberClient, processFn)
|
||||||
}(s.subscriberClients[i])
|
}(s.subscriberClients[i])
|
|
@ -10,6 +10,6 @@ gen:
|
||||||
protoc iam.proto --go_out=./iam_pb --go-grpc_out=./iam_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
protoc iam.proto --go_out=./iam_pb --go-grpc_out=./iam_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||||
protoc mount.proto --go_out=./mount_pb --go-grpc_out=./mount_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
protoc mount.proto --go_out=./mount_pb --go-grpc_out=./mount_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||||
protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||||
protoc messaging.proto --go_out=./messaging_pb --go-grpc_out=./messaging_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
protoc mq.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||||
# protoc filer.proto --java_out=../../other/java/client/src/main/java
|
# protoc filer.proto --java_out=../../other/java/client/src/main/java
|
||||||
cp filer.proto ../../other/java/client/src/main/proto
|
cp filer.proto ../../other/java/client/src/main/proto
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -231,10 +231,10 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
|
func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error {
|
||||||
|
|
||||||
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
|
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
|
||||||
client := messaging_pb.NewSeaweedMessagingClient(grpcConnection)
|
client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(client)
|
||||||
}, brokerGrpcAddress, grpcDialOption)
|
}, brokerGrpcAddress, grpcDialOption)
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ syntax = "proto3";
|
||||||
|
|
||||||
package messaging_pb;
|
package messaging_pb;
|
||||||
|
|
||||||
option go_package = "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb";
|
option go_package = "github.com/chrislusf/seaweedfs/weed/pb/mq_pb";
|
||||||
option java_package = "seaweedfs.client";
|
option java_package = "seaweedfs.client";
|
||||||
option java_outer_classname = "MessagingProto";
|
option java_outer_classname = "MessagingProto";
|
||||||
|
|
||||||
|
|
|
@ -143,12 +143,12 @@ func file_mount_proto_rawDescGZIP() []byte {
|
||||||
|
|
||||||
var file_mount_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
|
var file_mount_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
|
||||||
var file_mount_proto_goTypes = []interface{}{
|
var file_mount_proto_goTypes = []interface{}{
|
||||||
(*ConfigureRequest)(nil), // 0: messaging_pb.ConfigureRequest
|
(*ConfigureRequest)(nil), // 0: mq_pb.ConfigureRequest
|
||||||
(*ConfigureResponse)(nil), // 1: messaging_pb.ConfigureResponse
|
(*ConfigureResponse)(nil), // 1: mq_pb.ConfigureResponse
|
||||||
}
|
}
|
||||||
var file_mount_proto_depIdxs = []int32{
|
var file_mount_proto_depIdxs = []int32{
|
||||||
0, // 0: messaging_pb.SeaweedMount.Configure:input_type -> messaging_pb.ConfigureRequest
|
0, // 0: mq_pb.SeaweedMount.Configure:input_type -> mq_pb.ConfigureRequest
|
||||||
1, // 1: messaging_pb.SeaweedMount.Configure:output_type -> messaging_pb.ConfigureResponse
|
1, // 1: mq_pb.SeaweedMount.Configure:output_type -> mq_pb.ConfigureResponse
|
||||||
1, // [1:2] is the sub-list for method output_type
|
1, // [1:2] is the sub-list for method output_type
|
||||||
0, // [0:1] is the sub-list for method input_type
|
0, // [0:1] is the sub-list for method input_type
|
||||||
0, // [0:0] is the sub-list for extension type_name
|
0, // [0:0] is the sub-list for extension type_name
|
||||||
|
|
|
@ -31,7 +31,7 @@ func NewSeaweedMountClient(cc grpc.ClientConnInterface) SeaweedMountClient {
|
||||||
|
|
||||||
func (c *seaweedMountClient) Configure(ctx context.Context, in *ConfigureRequest, opts ...grpc.CallOption) (*ConfigureResponse, error) {
|
func (c *seaweedMountClient) Configure(ctx context.Context, in *ConfigureRequest, opts ...grpc.CallOption) (*ConfigureResponse, error) {
|
||||||
out := new(ConfigureResponse)
|
out := new(ConfigureResponse)
|
||||||
err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMount/Configure", in, out, opts...)
|
err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMount/Configure", in, out, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -76,7 +76,7 @@ func _SeaweedMount_Configure_Handler(srv interface{}, ctx context.Context, dec f
|
||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/messaging_pb.SeaweedMount/Configure",
|
FullMethod: "/mq_pb.SeaweedMount/Configure",
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(SeaweedMountServer).Configure(ctx, req.(*ConfigureRequest))
|
return srv.(SeaweedMountServer).Configure(ctx, req.(*ConfigureRequest))
|
||||||
|
@ -88,7 +88,7 @@ func _SeaweedMount_Configure_Handler(srv interface{}, ctx context.Context, dec f
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
var SeaweedMount_ServiceDesc = grpc.ServiceDesc{
|
var SeaweedMount_ServiceDesc = grpc.ServiceDesc{
|
||||||
ServiceName: "messaging_pb.SeaweedMount",
|
ServiceName: "mq_pb.SeaweedMount",
|
||||||
HandlerType: (*SeaweedMountServer)(nil),
|
HandlerType: (*SeaweedMountServer)(nil),
|
||||||
Methods: []grpc.MethodDesc{
|
Methods: []grpc.MethodDesc{
|
||||||
{
|
{
|
||||||
|
|
|
@ -2,9 +2,9 @@
|
||||||
// versions:
|
// versions:
|
||||||
// protoc-gen-go v1.26.0
|
// protoc-gen-go v1.26.0
|
||||||
// protoc v3.17.3
|
// protoc v3.17.3
|
||||||
// source: messaging.proto
|
// source: mq.proto
|
||||||
|
|
||||||
package messaging_pb
|
package mq_pb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||||
|
@ -840,7 +840,7 @@ type TopicConfiguration struct {
|
||||||
Collection string `protobuf:"bytes,2,opt,name=collection,proto3" json:"collection,omitempty"`
|
Collection string `protobuf:"bytes,2,opt,name=collection,proto3" json:"collection,omitempty"`
|
||||||
Replication string `protobuf:"bytes,3,opt,name=replication,proto3" json:"replication,omitempty"`
|
Replication string `protobuf:"bytes,3,opt,name=replication,proto3" json:"replication,omitempty"`
|
||||||
IsTransient bool `protobuf:"varint,4,opt,name=is_transient,json=isTransient,proto3" json:"is_transient,omitempty"`
|
IsTransient bool `protobuf:"varint,4,opt,name=is_transient,json=isTransient,proto3" json:"is_transient,omitempty"`
|
||||||
Partitoning TopicConfiguration_Partitioning `protobuf:"varint,5,opt,name=partitoning,proto3,enum=messaging_pb.TopicConfiguration_Partitioning" json:"partitoning,omitempty"`
|
Partitoning TopicConfiguration_Partitioning `protobuf:"varint,5,opt,name=partitoning,proto3,enum=mq_pb.TopicConfiguration_Partitioning" json:"partitoning,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *TopicConfiguration) Reset() {
|
func (x *TopicConfiguration) Reset() {
|
||||||
|
@ -918,7 +918,7 @@ type SubscriberMessage_InitMessage struct {
|
||||||
Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"`
|
Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"`
|
||||||
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
|
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
|
||||||
Partition int32 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"`
|
Partition int32 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"`
|
||||||
StartPosition SubscriberMessage_InitMessage_StartPosition `protobuf:"varint,4,opt,name=startPosition,proto3,enum=messaging_pb.SubscriberMessage_InitMessage_StartPosition" json:"startPosition,omitempty"` // Where to begin consuming from
|
StartPosition SubscriberMessage_InitMessage_StartPosition `protobuf:"varint,4,opt,name=startPosition,proto3,enum=mq_pb.SubscriberMessage_InitMessage_StartPosition" json:"startPosition,omitempty"` // Where to begin consuming from
|
||||||
TimestampNs int64 `protobuf:"varint,5,opt,name=timestampNs,proto3" json:"timestampNs,omitempty"` // timestamp in nano seconds
|
TimestampNs int64 `protobuf:"varint,5,opt,name=timestampNs,proto3" json:"timestampNs,omitempty"` // timestamp in nano seconds
|
||||||
SubscriberId string `protobuf:"bytes,6,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` // uniquely identify a subscriber to track consumption
|
SubscriberId string `protobuf:"bytes,6,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` // uniquely identify a subscriber to track consumption
|
||||||
}
|
}
|
||||||
|
@ -1407,54 +1407,54 @@ func file_messaging_proto_rawDescGZIP() []byte {
|
||||||
var file_messaging_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
|
var file_messaging_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
|
||||||
var file_messaging_proto_msgTypes = make([]protoimpl.MessageInfo, 20)
|
var file_messaging_proto_msgTypes = make([]protoimpl.MessageInfo, 20)
|
||||||
var file_messaging_proto_goTypes = []interface{}{
|
var file_messaging_proto_goTypes = []interface{}{
|
||||||
(SubscriberMessage_InitMessage_StartPosition)(0), // 0: messaging_pb.SubscriberMessage.InitMessage.StartPosition
|
(SubscriberMessage_InitMessage_StartPosition)(0), // 0: mq_pb.SubscriberMessage.InitMessage.StartPosition
|
||||||
(TopicConfiguration_Partitioning)(0), // 1: messaging_pb.TopicConfiguration.Partitioning
|
(TopicConfiguration_Partitioning)(0), // 1: mq_pb.TopicConfiguration.Partitioning
|
||||||
(*SubscriberMessage)(nil), // 2: messaging_pb.SubscriberMessage
|
(*SubscriberMessage)(nil), // 2: mq_pb.SubscriberMessage
|
||||||
(*Message)(nil), // 3: messaging_pb.Message
|
(*Message)(nil), // 3: mq_pb.Message
|
||||||
(*BrokerMessage)(nil), // 4: messaging_pb.BrokerMessage
|
(*BrokerMessage)(nil), // 4: mq_pb.BrokerMessage
|
||||||
(*PublishRequest)(nil), // 5: messaging_pb.PublishRequest
|
(*PublishRequest)(nil), // 5: mq_pb.PublishRequest
|
||||||
(*PublishResponse)(nil), // 6: messaging_pb.PublishResponse
|
(*PublishResponse)(nil), // 6: mq_pb.PublishResponse
|
||||||
(*DeleteTopicRequest)(nil), // 7: messaging_pb.DeleteTopicRequest
|
(*DeleteTopicRequest)(nil), // 7: mq_pb.DeleteTopicRequest
|
||||||
(*DeleteTopicResponse)(nil), // 8: messaging_pb.DeleteTopicResponse
|
(*DeleteTopicResponse)(nil), // 8: mq_pb.DeleteTopicResponse
|
||||||
(*ConfigureTopicRequest)(nil), // 9: messaging_pb.ConfigureTopicRequest
|
(*ConfigureTopicRequest)(nil), // 9: mq_pb.ConfigureTopicRequest
|
||||||
(*ConfigureTopicResponse)(nil), // 10: messaging_pb.ConfigureTopicResponse
|
(*ConfigureTopicResponse)(nil), // 10: mq_pb.ConfigureTopicResponse
|
||||||
(*GetTopicConfigurationRequest)(nil), // 11: messaging_pb.GetTopicConfigurationRequest
|
(*GetTopicConfigurationRequest)(nil), // 11: mq_pb.GetTopicConfigurationRequest
|
||||||
(*GetTopicConfigurationResponse)(nil), // 12: messaging_pb.GetTopicConfigurationResponse
|
(*GetTopicConfigurationResponse)(nil), // 12: mq_pb.GetTopicConfigurationResponse
|
||||||
(*FindBrokerRequest)(nil), // 13: messaging_pb.FindBrokerRequest
|
(*FindBrokerRequest)(nil), // 13: mq_pb.FindBrokerRequest
|
||||||
(*FindBrokerResponse)(nil), // 14: messaging_pb.FindBrokerResponse
|
(*FindBrokerResponse)(nil), // 14: mq_pb.FindBrokerResponse
|
||||||
(*TopicConfiguration)(nil), // 15: messaging_pb.TopicConfiguration
|
(*TopicConfiguration)(nil), // 15: mq_pb.TopicConfiguration
|
||||||
(*SubscriberMessage_InitMessage)(nil), // 16: messaging_pb.SubscriberMessage.InitMessage
|
(*SubscriberMessage_InitMessage)(nil), // 16: mq_pb.SubscriberMessage.InitMessage
|
||||||
(*SubscriberMessage_AckMessage)(nil), // 17: messaging_pb.SubscriberMessage.AckMessage
|
(*SubscriberMessage_AckMessage)(nil), // 17: mq_pb.SubscriberMessage.AckMessage
|
||||||
nil, // 18: messaging_pb.Message.HeadersEntry
|
nil, // 18: mq_pb.Message.HeadersEntry
|
||||||
(*PublishRequest_InitMessage)(nil), // 19: messaging_pb.PublishRequest.InitMessage
|
(*PublishRequest_InitMessage)(nil), // 19: mq_pb.PublishRequest.InitMessage
|
||||||
(*PublishResponse_ConfigMessage)(nil), // 20: messaging_pb.PublishResponse.ConfigMessage
|
(*PublishResponse_ConfigMessage)(nil), // 20: mq_pb.PublishResponse.ConfigMessage
|
||||||
(*PublishResponse_RedirectMessage)(nil), // 21: messaging_pb.PublishResponse.RedirectMessage
|
(*PublishResponse_RedirectMessage)(nil), // 21: mq_pb.PublishResponse.RedirectMessage
|
||||||
}
|
}
|
||||||
var file_messaging_proto_depIdxs = []int32{
|
var file_messaging_proto_depIdxs = []int32{
|
||||||
16, // 0: messaging_pb.SubscriberMessage.init:type_name -> messaging_pb.SubscriberMessage.InitMessage
|
16, // 0: mq_pb.SubscriberMessage.init:type_name -> mq_pb.SubscriberMessage.InitMessage
|
||||||
17, // 1: messaging_pb.SubscriberMessage.ack:type_name -> messaging_pb.SubscriberMessage.AckMessage
|
17, // 1: mq_pb.SubscriberMessage.ack:type_name -> mq_pb.SubscriberMessage.AckMessage
|
||||||
18, // 2: messaging_pb.Message.headers:type_name -> messaging_pb.Message.HeadersEntry
|
18, // 2: mq_pb.Message.headers:type_name -> mq_pb.Message.HeadersEntry
|
||||||
3, // 3: messaging_pb.BrokerMessage.data:type_name -> messaging_pb.Message
|
3, // 3: mq_pb.BrokerMessage.data:type_name -> mq_pb.Message
|
||||||
19, // 4: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage
|
19, // 4: mq_pb.PublishRequest.init:type_name -> mq_pb.PublishRequest.InitMessage
|
||||||
3, // 5: messaging_pb.PublishRequest.data:type_name -> messaging_pb.Message
|
3, // 5: mq_pb.PublishRequest.data:type_name -> mq_pb.Message
|
||||||
20, // 6: messaging_pb.PublishResponse.config:type_name -> messaging_pb.PublishResponse.ConfigMessage
|
20, // 6: mq_pb.PublishResponse.config:type_name -> mq_pb.PublishResponse.ConfigMessage
|
||||||
21, // 7: messaging_pb.PublishResponse.redirect:type_name -> messaging_pb.PublishResponse.RedirectMessage
|
21, // 7: mq_pb.PublishResponse.redirect:type_name -> mq_pb.PublishResponse.RedirectMessage
|
||||||
15, // 8: messaging_pb.ConfigureTopicRequest.configuration:type_name -> messaging_pb.TopicConfiguration
|
15, // 8: mq_pb.ConfigureTopicRequest.configuration:type_name -> mq_pb.TopicConfiguration
|
||||||
15, // 9: messaging_pb.GetTopicConfigurationResponse.configuration:type_name -> messaging_pb.TopicConfiguration
|
15, // 9: mq_pb.GetTopicConfigurationResponse.configuration:type_name -> mq_pb.TopicConfiguration
|
||||||
1, // 10: messaging_pb.TopicConfiguration.partitoning:type_name -> messaging_pb.TopicConfiguration.Partitioning
|
1, // 10: mq_pb.TopicConfiguration.partitoning:type_name -> mq_pb.TopicConfiguration.Partitioning
|
||||||
0, // 11: messaging_pb.SubscriberMessage.InitMessage.startPosition:type_name -> messaging_pb.SubscriberMessage.InitMessage.StartPosition
|
0, // 11: mq_pb.SubscriberMessage.InitMessage.startPosition:type_name -> mq_pb.SubscriberMessage.InitMessage.StartPosition
|
||||||
2, // 12: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscriberMessage
|
2, // 12: mq_pb.SeaweedMessaging.Subscribe:input_type -> mq_pb.SubscriberMessage
|
||||||
5, // 13: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest
|
5, // 13: mq_pb.SeaweedMessaging.Publish:input_type -> mq_pb.PublishRequest
|
||||||
7, // 14: messaging_pb.SeaweedMessaging.DeleteTopic:input_type -> messaging_pb.DeleteTopicRequest
|
7, // 14: mq_pb.SeaweedMessaging.DeleteTopic:input_type -> mq_pb.DeleteTopicRequest
|
||||||
9, // 15: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest
|
9, // 15: mq_pb.SeaweedMessaging.ConfigureTopic:input_type -> mq_pb.ConfigureTopicRequest
|
||||||
11, // 16: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest
|
11, // 16: mq_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> mq_pb.GetTopicConfigurationRequest
|
||||||
13, // 17: messaging_pb.SeaweedMessaging.FindBroker:input_type -> messaging_pb.FindBrokerRequest
|
13, // 17: mq_pb.SeaweedMessaging.FindBroker:input_type -> mq_pb.FindBrokerRequest
|
||||||
4, // 18: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.BrokerMessage
|
4, // 18: mq_pb.SeaweedMessaging.Subscribe:output_type -> mq_pb.BrokerMessage
|
||||||
6, // 19: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse
|
6, // 19: mq_pb.SeaweedMessaging.Publish:output_type -> mq_pb.PublishResponse
|
||||||
8, // 20: messaging_pb.SeaweedMessaging.DeleteTopic:output_type -> messaging_pb.DeleteTopicResponse
|
8, // 20: mq_pb.SeaweedMessaging.DeleteTopic:output_type -> mq_pb.DeleteTopicResponse
|
||||||
10, // 21: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse
|
10, // 21: mq_pb.SeaweedMessaging.ConfigureTopic:output_type -> mq_pb.ConfigureTopicResponse
|
||||||
12, // 22: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse
|
12, // 22: mq_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> mq_pb.GetTopicConfigurationResponse
|
||||||
14, // 23: messaging_pb.SeaweedMessaging.FindBroker:output_type -> messaging_pb.FindBrokerResponse
|
14, // 23: mq_pb.SeaweedMessaging.FindBroker:output_type -> mq_pb.FindBrokerResponse
|
||||||
18, // [18:24] is the sub-list for method output_type
|
18, // [18:24] is the sub-list for method output_type
|
||||||
12, // [12:18] is the sub-list for method input_type
|
12, // [12:18] is the sub-list for method input_type
|
||||||
12, // [12:12] is the sub-list for extension type_name
|
12, // [12:12] is the sub-list for extension type_name
|
|
@ -1,6 +1,6 @@
|
||||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||||
|
|
||||||
package messaging_pb
|
package mq_pb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
context "context"
|
context "context"
|
||||||
|
@ -35,7 +35,7 @@ func NewSeaweedMessagingClient(cc grpc.ClientConnInterface) SeaweedMessagingClie
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *seaweedMessagingClient) Subscribe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) {
|
func (c *seaweedMessagingClient) Subscribe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) {
|
||||||
stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/messaging_pb.SeaweedMessaging/Subscribe", opts...)
|
stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/mq_pb.SeaweedMessaging/Subscribe", opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,7 @@ func (x *seaweedMessagingSubscribeClient) Recv() (*BrokerMessage, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *seaweedMessagingClient) Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) {
|
func (c *seaweedMessagingClient) Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) {
|
||||||
stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[1], "/messaging_pb.SeaweedMessaging/Publish", opts...)
|
stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[1], "/mq_pb.SeaweedMessaging/Publish", opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -98,7 +98,7 @@ func (x *seaweedMessagingPublishClient) Recv() (*PublishResponse, error) {
|
||||||
|
|
||||||
func (c *seaweedMessagingClient) DeleteTopic(ctx context.Context, in *DeleteTopicRequest, opts ...grpc.CallOption) (*DeleteTopicResponse, error) {
|
func (c *seaweedMessagingClient) DeleteTopic(ctx context.Context, in *DeleteTopicRequest, opts ...grpc.CallOption) (*DeleteTopicResponse, error) {
|
||||||
out := new(DeleteTopicResponse)
|
out := new(DeleteTopicResponse)
|
||||||
err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/DeleteTopic", in, out, opts...)
|
err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/DeleteTopic", in, out, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,7 @@ func (c *seaweedMessagingClient) DeleteTopic(ctx context.Context, in *DeleteTopi
|
||||||
|
|
||||||
func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) {
|
func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) {
|
||||||
out := new(ConfigureTopicResponse)
|
out := new(ConfigureTopicResponse)
|
||||||
err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/ConfigureTopic", in, out, opts...)
|
err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/ConfigureTopic", in, out, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,7 @@ func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *Configu
|
||||||
|
|
||||||
func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in *GetTopicConfigurationRequest, opts ...grpc.CallOption) (*GetTopicConfigurationResponse, error) {
|
func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in *GetTopicConfigurationRequest, opts ...grpc.CallOption) (*GetTopicConfigurationResponse, error) {
|
||||||
out := new(GetTopicConfigurationResponse)
|
out := new(GetTopicConfigurationResponse)
|
||||||
err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/GetTopicConfiguration", in, out, opts...)
|
err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/GetTopicConfiguration", in, out, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -125,7 +125,7 @@ func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in *
|
||||||
|
|
||||||
func (c *seaweedMessagingClient) FindBroker(ctx context.Context, in *FindBrokerRequest, opts ...grpc.CallOption) (*FindBrokerResponse, error) {
|
func (c *seaweedMessagingClient) FindBroker(ctx context.Context, in *FindBrokerRequest, opts ...grpc.CallOption) (*FindBrokerResponse, error) {
|
||||||
out := new(FindBrokerResponse)
|
out := new(FindBrokerResponse)
|
||||||
err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/FindBroker", in, out, opts...)
|
err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/FindBroker", in, out, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -242,7 +242,7 @@ func _SeaweedMessaging_DeleteTopic_Handler(srv interface{}, ctx context.Context,
|
||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/messaging_pb.SeaweedMessaging/DeleteTopic",
|
FullMethod: "/mq_pb.SeaweedMessaging/DeleteTopic",
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(SeaweedMessagingServer).DeleteTopic(ctx, req.(*DeleteTopicRequest))
|
return srv.(SeaweedMessagingServer).DeleteTopic(ctx, req.(*DeleteTopicRequest))
|
||||||
|
@ -260,7 +260,7 @@ func _SeaweedMessaging_ConfigureTopic_Handler(srv interface{}, ctx context.Conte
|
||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/messaging_pb.SeaweedMessaging/ConfigureTopic",
|
FullMethod: "/mq_pb.SeaweedMessaging/ConfigureTopic",
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(SeaweedMessagingServer).ConfigureTopic(ctx, req.(*ConfigureTopicRequest))
|
return srv.(SeaweedMessagingServer).ConfigureTopic(ctx, req.(*ConfigureTopicRequest))
|
||||||
|
@ -278,7 +278,7 @@ func _SeaweedMessaging_GetTopicConfiguration_Handler(srv interface{}, ctx contex
|
||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/messaging_pb.SeaweedMessaging/GetTopicConfiguration",
|
FullMethod: "/mq_pb.SeaweedMessaging/GetTopicConfiguration",
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(SeaweedMessagingServer).GetTopicConfiguration(ctx, req.(*GetTopicConfigurationRequest))
|
return srv.(SeaweedMessagingServer).GetTopicConfiguration(ctx, req.(*GetTopicConfigurationRequest))
|
||||||
|
@ -296,7 +296,7 @@ func _SeaweedMessaging_FindBroker_Handler(srv interface{}, ctx context.Context,
|
||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/messaging_pb.SeaweedMessaging/FindBroker",
|
FullMethod: "/mq_pb.SeaweedMessaging/FindBroker",
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(SeaweedMessagingServer).FindBroker(ctx, req.(*FindBrokerRequest))
|
return srv.(SeaweedMessagingServer).FindBroker(ctx, req.(*FindBrokerRequest))
|
||||||
|
@ -308,7 +308,7 @@ func _SeaweedMessaging_FindBroker_Handler(srv interface{}, ctx context.Context,
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
|
var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
|
||||||
ServiceName: "messaging_pb.SeaweedMessaging",
|
ServiceName: "mq_pb.SeaweedMessaging",
|
||||||
HandlerType: (*SeaweedMessagingServer)(nil),
|
HandlerType: (*SeaweedMessagingServer)(nil),
|
||||||
Methods: []grpc.MethodDesc{
|
Methods: []grpc.MethodDesc{
|
||||||
{
|
{
|
||||||
|
@ -342,5 +342,5 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
|
||||||
ClientStreams: true,
|
ClientStreams: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Metadata: "messaging.proto",
|
Metadata: "mq.proto",
|
||||||
}
|
}
|
|
@ -283,20 +283,20 @@ func file_s3_proto_rawDescGZIP() []byte {
|
||||||
|
|
||||||
var file_s3_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
|
var file_s3_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
|
||||||
var file_s3_proto_goTypes = []interface{}{
|
var file_s3_proto_goTypes = []interface{}{
|
||||||
(*S3ConfigureRequest)(nil), // 0: messaging_pb.S3ConfigureRequest
|
(*S3ConfigureRequest)(nil), // 0: mq_pb.S3ConfigureRequest
|
||||||
(*S3ConfigureResponse)(nil), // 1: messaging_pb.S3ConfigureResponse
|
(*S3ConfigureResponse)(nil), // 1: mq_pb.S3ConfigureResponse
|
||||||
(*S3CircuitBreakerConfig)(nil), // 2: messaging_pb.S3CircuitBreakerConfig
|
(*S3CircuitBreakerConfig)(nil), // 2: mq_pb.S3CircuitBreakerConfig
|
||||||
(*S3CircuitBreakerOptions)(nil), // 3: messaging_pb.S3CircuitBreakerOptions
|
(*S3CircuitBreakerOptions)(nil), // 3: mq_pb.S3CircuitBreakerOptions
|
||||||
nil, // 4: messaging_pb.S3CircuitBreakerConfig.BucketsEntry
|
nil, // 4: mq_pb.S3CircuitBreakerConfig.BucketsEntry
|
||||||
nil, // 5: messaging_pb.S3CircuitBreakerOptions.ActionsEntry
|
nil, // 5: mq_pb.S3CircuitBreakerOptions.ActionsEntry
|
||||||
}
|
}
|
||||||
var file_s3_proto_depIdxs = []int32{
|
var file_s3_proto_depIdxs = []int32{
|
||||||
3, // 0: messaging_pb.S3CircuitBreakerConfig.global:type_name -> messaging_pb.S3CircuitBreakerOptions
|
3, // 0: mq_pb.S3CircuitBreakerConfig.global:type_name -> mq_pb.S3CircuitBreakerOptions
|
||||||
4, // 1: messaging_pb.S3CircuitBreakerConfig.buckets:type_name -> messaging_pb.S3CircuitBreakerConfig.BucketsEntry
|
4, // 1: mq_pb.S3CircuitBreakerConfig.buckets:type_name -> mq_pb.S3CircuitBreakerConfig.BucketsEntry
|
||||||
5, // 2: messaging_pb.S3CircuitBreakerOptions.actions:type_name -> messaging_pb.S3CircuitBreakerOptions.ActionsEntry
|
5, // 2: mq_pb.S3CircuitBreakerOptions.actions:type_name -> mq_pb.S3CircuitBreakerOptions.ActionsEntry
|
||||||
3, // 3: messaging_pb.S3CircuitBreakerConfig.BucketsEntry.value:type_name -> messaging_pb.S3CircuitBreakerOptions
|
3, // 3: mq_pb.S3CircuitBreakerConfig.BucketsEntry.value:type_name -> mq_pb.S3CircuitBreakerOptions
|
||||||
0, // 4: messaging_pb.SeaweedS3.Configure:input_type -> messaging_pb.S3ConfigureRequest
|
0, // 4: mq_pb.SeaweedS3.Configure:input_type -> mq_pb.S3ConfigureRequest
|
||||||
1, // 5: messaging_pb.SeaweedS3.Configure:output_type -> messaging_pb.S3ConfigureResponse
|
1, // 5: mq_pb.SeaweedS3.Configure:output_type -> mq_pb.S3ConfigureResponse
|
||||||
5, // [5:6] is the sub-list for method output_type
|
5, // [5:6] is the sub-list for method output_type
|
||||||
4, // [4:5] is the sub-list for method input_type
|
4, // [4:5] is the sub-list for method input_type
|
||||||
4, // [4:4] is the sub-list for extension type_name
|
4, // [4:4] is the sub-list for extension type_name
|
||||||
|
|
|
@ -31,7 +31,7 @@ func NewSeaweedS3Client(cc grpc.ClientConnInterface) SeaweedS3Client {
|
||||||
|
|
||||||
func (c *seaweedS3Client) Configure(ctx context.Context, in *S3ConfigureRequest, opts ...grpc.CallOption) (*S3ConfigureResponse, error) {
|
func (c *seaweedS3Client) Configure(ctx context.Context, in *S3ConfigureRequest, opts ...grpc.CallOption) (*S3ConfigureResponse, error) {
|
||||||
out := new(S3ConfigureResponse)
|
out := new(S3ConfigureResponse)
|
||||||
err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedS3/Configure", in, out, opts...)
|
err := c.cc.Invoke(ctx, "/mq_pb.SeaweedS3/Configure", in, out, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -76,7 +76,7 @@ func _SeaweedS3_Configure_Handler(srv interface{}, ctx context.Context, dec func
|
||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/messaging_pb.SeaweedS3/Configure",
|
FullMethod: "/mq_pb.SeaweedS3/Configure",
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(SeaweedS3Server).Configure(ctx, req.(*S3ConfigureRequest))
|
return srv.(SeaweedS3Server).Configure(ctx, req.(*S3ConfigureRequest))
|
||||||
|
@ -88,7 +88,7 @@ func _SeaweedS3_Configure_Handler(srv interface{}, ctx context.Context, dec func
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
var SeaweedS3_ServiceDesc = grpc.ServiceDesc{
|
var SeaweedS3_ServiceDesc = grpc.ServiceDesc{
|
||||||
ServiceName: "messaging_pb.SeaweedS3",
|
ServiceName: "mq_pb.SeaweedS3",
|
||||||
HandlerType: (*SeaweedS3Server)(nil),
|
HandlerType: (*SeaweedS3Server)(nil),
|
||||||
Methods: []grpc.MethodDesc{
|
Methods: []grpc.MethodDesc{
|
||||||
{
|
{
|
||||||
|
|
|
@ -68,7 +68,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime
|
||||||
|
|
||||||
logEntry := &filer_pb.LogEntry{}
|
logEntry := &filer_pb.LogEntry{}
|
||||||
if err = proto.Unmarshal(entryData, logEntry); err != nil {
|
if err = proto.Unmarshal(entryData, logEntry); err != nil {
|
||||||
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
|
glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
|
||||||
pos += 4 + int(size)
|
pos += 4 + int(size)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue