From c9caf3311995290c36ed369b48fbbab23d6bc7b5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 27 Aug 2023 18:33:46 -0700 Subject: [PATCH] move functions --- weed/mq/broker/broker_grpc_admin.go | 31 ----------------------- weed/mq/broker/broker_grpc_lookup.go | 37 ++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 31 deletions(-) create mode 100644 weed/mq/broker/broker_grpc_lookup.go diff --git a/weed/mq/broker/broker_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go index a417a5041..5c9dc726e 100644 --- a/weed/mq/broker/broker_grpc_admin.go +++ b/weed/mq/broker/broker_grpc_admin.go @@ -88,37 +88,6 @@ func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq return ret, nil } -// FindTopicBrokers returns the brokers that are serving the topic -// -// 1. lock the topic -// -// 2. find the topic partitions on the filer -// 2.1 if the topic is not found, return error -// 2.2 if the request is_for_publish, create the topic -// 2.2.1 if the request is_for_subscribe, return error not found -// 2.2.2 if the request is_for_publish, create the topic -// 2.2 if the topic is found, return the brokers -// -// 3. unlock the topic -func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) { - ret := &mq_pb.FindTopicBrokersResponse{} - // lock the topic - - // find the topic partitions on the filer - // if the topic is not found - // if the request is_for_publish - // create the topic - // if the request is_for_subscribe - // return error not found - return ret, nil -} - -// CheckTopicPartitionsStatus check the topic partitions on the broker -func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) { - ret := &mq_pb.CheckTopicPartitionsStatusResponse{} - return ret, nil -} - // createOrUpdateTopicPartitions creates the topic partitions on the broker // 1. check func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) { diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go new file mode 100644 index 000000000..30a3ff1ce --- /dev/null +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -0,0 +1,37 @@ +package broker + +import ( + "context" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +// FindTopicBrokers returns the brokers that are serving the topic +// +// 1. lock the topic +// +// 2. find the topic partitions on the filer +// 2.1 if the topic is not found, return error +// 2.2 if the request is_for_publish, create the topic +// 2.2.1 if the request is_for_subscribe, return error not found +// 2.2.2 if the request is_for_publish, create the topic +// 2.2 if the topic is found, return the brokers +// +// 3. unlock the topic +func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) { + ret := &mq_pb.FindTopicBrokersResponse{} + // TODO lock the topic + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + return ret, nil +} + +// CheckTopicPartitionsStatus check the topic partitions on the broker +func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) { + ret := &mq_pb.CheckTopicPartitionsStatusResponse{} + return ret, nil +}