diff --git a/weed/mq/balancer/balance.go b/weed/mq/balancer/balance.go new file mode 100644 index 000000000..5be968399 --- /dev/null +++ b/weed/mq/balancer/balance.go @@ -0,0 +1,26 @@ +package balancer + +import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + +func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) ([]*mq_pb.BrokerPartitionAssignment, error) { + // TODO lock the topic + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + // t := topic.FromPbTopic(request.Topic) + return []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + FollowerBrokers: []string{"localhost:17777"}, + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + }, nil +} diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index eb452cce0..1aa47831e 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -14,29 +14,6 @@ type Balancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] } -func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) ([]*mq_pb.BrokerPartitionAssignment, error) { - // TODO lock the topic - - // find the topic partitions on the filer - // if the topic is not found - // if the request is_for_publish - // create the topic - // if the request is_for_subscribe - // return error not found - // t := topic.FromPbTopic(request.Topic) - return []*mq_pb.BrokerPartitionAssignment{ - { - LeaderBroker: "localhost:17777", - FollowerBrokers: []string{"localhost:17777"}, - Partition: &mq_pb.Partition{ - RingSize: MaxPartitionCount, - RangeStart: 0, - RangeStop: MaxPartitionCount, - }, - }, - }, nil -} - type BrokerStats struct { TopicPartitionCount int32 ConsumerCount int32