From db3670a3a5dda3fb512cb45e8d082bcf9358468b Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 16 Jan 2024 08:55:47 -0800 Subject: [PATCH] simplify api --- weed/mq/broker/broker_grpc_configure.go | 2 +- weed/mq/broker/broker_grpc_lookup.go | 2 +- weed/mq/pub_balancer/lookup.go | 7 +++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 8e36675c1..e8b70a0ce 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -38,7 +38,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } ret := &mq_pb.ConfigureTopicResponse{} - ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) + ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, request.PartitionCount) for _, bpa := range ret.BrokerPartitionAssignments { fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker) diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index ea0c8c0b4..74456c6e3 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -35,7 +35,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq ret := &mq_pb.LookupTopicBrokersResponse{} ret.Topic = request.Topic - ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, false, -1) + ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, -1) return ret, err } diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index 5b9ca5a05..b74909729 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -10,7 +10,7 @@ var ( ErrNoBroker = errors.New("no broker") ) -func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) { +func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) { if partitionCount == 0 { partitionCount = 6 } @@ -35,10 +35,13 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu } } } - if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) { + if len(assignments) > 0 { glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments) return assignments, true, nil } + if partitionCount < 0 { + return nil, false, nil + } // find the topic partitions on the filer // if the topic is not found