diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 96448be83..9fc9c9fe4 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -3,6 +3,8 @@ package broker import ( "context" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // FindTopicBrokers returns the brokers that are serving the topic @@ -17,30 +19,25 @@ import ( // 2.2 if the topic is found, return the brokers // // 3. unlock the topic -func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) { - ret := &mq_pb.LookupTopicBrokersResponse{} - // TODO lock the topic - - // find the topic partitions on the filer - // if the topic is not found - // if the request is_for_publish - // create the topic - // if the request is_for_subscribe - // return error not found - // t := topic.FromPbTopic(request.Topic) - ret.Topic = request.Topic - ret.BrokerPartitionAssignments = []*mq_pb.BrokerPartitionAssignment{ - { - LeaderBroker: "localhost:17777", - FollowerBrokers: []string{"localhost:17777"}, - Partition: &mq_pb.Partition{ - RingSize: MaxPartitionCount, - RangeStart: 0, - RangeStop: MaxPartitionCount, - }, - }, +func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) { + if broker.currentBalancer == "" { + return nil, status.Errorf(codes.Unavailable, "no balancer") } - return ret, nil + if !broker.lockAsBalancer.IsLocked() { + proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error { + resp, err = client.LookupTopicBrokers(ctx, request) + return nil + }) + if proxyErr != nil { + return nil, proxyErr + } + return resp, err + } + + ret := &mq_pb.LookupTopicBrokersResponse{} + ret.Topic = request.Topic + ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish) + return ret, err } // CheckTopicPartitionsStatus check the topic partitions on the broker diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index db8329989..2ab20ac52 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -39,6 +39,7 @@ type MessageQueueBroker struct { localTopicManager *topic.LocalTopicManager Balancer *balancer.Balancer lockAsBalancer *cluster.LiveLock + currentBalancer pb.ServerAddress } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {