From d74348048ad4259e14e7d948106e282b27c326f7 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 24 Sep 2023 15:08:44 -0700 Subject: [PATCH] implement create topic --- weed/mq/balancer/allocate.go | 46 +++++++++++++++++++++++----- weed/mq/balancer/allocate_test.go | 5 ++- weed/mq/balancer/lookup.go | 10 +++++- weed/mq/broker/broker_grpc_lookup.go | 4 +-- 4 files changed, 52 insertions(+), 13 deletions(-) diff --git a/weed/mq/balancer/allocate.go b/weed/mq/balancer/allocate.go index d594c60fb..f7b17ab4b 100644 --- a/weed/mq/balancer/allocate.go +++ b/weed/mq/balancer/allocate.go @@ -3,18 +3,50 @@ package balancer import ( cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "math/rand" ) func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) { - return []*mq_pb.BrokerPartitionAssignment{ - { - LeaderBroker: "localhost:17777", - FollowerBrokers: []string{"localhost:17777"}, + // divide the ring into partitions + rangeSize := MaxPartitionCount / partitionCount + for i := 0; i < partitionCount; i++ { + assignment := &mq_pb.BrokerPartitionAssignment{ Partition: &mq_pb.Partition{ RingSize: MaxPartitionCount, - RangeStart: 0, - RangeStop: MaxPartitionCount, + RangeStart: int32(i * rangeSize), + RangeStop: int32((i + 1) * rangeSize), }, - }, + } + if i == partitionCount-1 { + assignment.Partition.RangeStop = MaxPartitionCount + } + assignments = append(assignments, assignment) } + + // pick the brokers + pickedBrokers := pickBrokers(brokers, partitionCount) + + // assign the partitions to brokers + for i, assignment := range assignments { + assignment.LeaderBroker = pickedBrokers[i] + } + return +} + +// for now: randomly pick brokers +// TODO pick brokers based on the broker stats +func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int) []string { + candidates := make([]string, 0, brokers.Count()) + for brokerStatsItem := range brokers.IterBuffered() { + candidates = append(candidates, brokerStatsItem.Key) + } + pickedBrokers := make([]string, 0, count) + for i := 0; i < count; i++ { + p := rand.Int() % len(candidates) + if p < 0 { + p = -p + } + pickedBrokers = append(pickedBrokers, candidates[p]) + } + return pickedBrokers } diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/balancer/allocate_test.go index c714788e6..cb87d4ec3 100644 --- a/weed/mq/balancer/allocate_test.go +++ b/weed/mq/balancer/allocate_test.go @@ -24,12 +24,11 @@ func Test_allocateOneBroker(t *testing.T) { name: "test only one broker", args: args{ brokers: brokers, - partitionCount: 6, + partitionCount: 1, }, wantAssignments: []*mq_pb.BrokerPartitionAssignment{ { - LeaderBroker: "localhost:17777", - FollowerBrokers: []string{"localhost:17777"}, + LeaderBroker: "localhost:17777", Partition: &mq_pb.Partition{ RingSize: MaxPartitionCount, RangeStart: 0, diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go index 55ed3e95d..f906a7d74 100644 --- a/weed/mq/balancer/lookup.go +++ b/weed/mq/balancer/lookup.go @@ -1,10 +1,15 @@ package balancer import ( + "errors" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) -func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { +var ( + ErrNoBroker = errors.New("no broker") +) + +func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { // find existing topic partition assignments for brokerStatsItem := range b.Brokers.IterBuffered() { broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val @@ -39,5 +44,8 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b // if the request is_for_subscribe // return error not found // t := topic.FromPbTopic(request.Topic) + if b.Brokers.IsEmpty() { + return nil, ErrNoBroker + } return allocateTopicPartitions(b.Brokers, 6), nil } diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 9dda9b7d1..5bc7068b2 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -23,7 +23,7 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p } ret := &mq_pb.CreateTopicResponse{} - ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true) + ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) return ret, err } @@ -56,7 +56,7 @@ func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, reques ret := &mq_pb.LookupTopicBrokersResponse{} ret.Topic = request.Topic - ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish) + ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, 6) return ret, err }