diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index bfde6a512..0803b2c79 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -11,9 +11,14 @@ import ( func (sub *TopicSubscriber) Subscribe() error { util.RetryUntil("subscribe", func() error { + // ask balancer for brokers of the topic if err := sub.doLookup(sub.bootstrapBroker); err != nil { return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } + // treat the first broker as the topic leader + // connect to the leader broker + + // subscribe to the topic if err := sub.doProcess(); err != nil { return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index f744c6fa2..809673de1 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -6,11 +6,13 @@ import ( ) type SubscriberConfiguration struct { - ClientId string - GroupId string - GroupInstanceId string - BootstrapServers []string - GrpcDialOption grpc.DialOption + ClientId string + GroupId string + GroupInstanceId string + GroupMinimumPeers int32 + GroupMaximumPeers int32 + BootstrapServers []string + GrpcDialOption grpc.DialOption } type ContentConfiguration struct { diff --git a/weed/mq/coordinator/consumer_group.go b/weed/mq/coordinator/consumer_group.go new file mode 100644 index 000000000..e3dec493c --- /dev/null +++ b/weed/mq/coordinator/consumer_group.go @@ -0,0 +1,92 @@ +package coordinator + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "sync" +) + +func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) { + cg.MinimumActiveInstances = min + cg.MaximumActiveInstances = max +} + +func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance { + cgi := &ConsumerGroupInstance{ + ClientId: clientId, + } + cg.ConsumerGroupInstances.Set(clientId, cgi) + return cgi +} + +func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) { + cg.ConsumerGroupInstances.Remove(clientId) +} + +func (cg *ConsumerGroup) CoordinateIfNeeded() { + emptyInstanceCount, activeInstanceCount := int32(0), int32(0) + for cgi := range cg.ConsumerGroupInstances.IterBuffered() { + if cgi.Val.Partition == nil { + // this consumer group instance is not assigned a partition + // need to assign one + emptyInstanceCount++ + } else { + activeInstanceCount++ + } + } + + var delta int32 + if emptyInstanceCount > 0 { + if cg.MinimumActiveInstances <= 0 { + // need to assign more partitions + delta = emptyInstanceCount + } else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances { + // need to assign more partitions + delta = cg.MinimumActiveInstances - activeInstanceCount + } + } + + if cg.MaximumActiveInstances > 0 { + if activeInstanceCount > cg.MaximumActiveInstances { + // need to remove some partitions + delta = cg.MaximumActiveInstances - activeInstanceCount + } + } + if delta == 0 { + return + } + cg.doCoordinate(activeInstanceCount + delta) +} + +func (cg *ConsumerGroup) doCoordinate(target int32) { + // stop existing instances from processing + var wg sync.WaitGroup + for cgi := range cg.ConsumerGroupInstances.IterBuffered() { + if cgi.Val.Partition != nil { + wg.Add(1) + go func(cgi *ConsumerGroupInstance) { + defer wg.Done() + // stop processing + // flush internal state + // wait for all messages to be processed + // close the connection + }(cgi.Val) + } + } + wg.Wait() + + partitions := topic.SplitPartitions(target) + + // assign partitions to new instances + i := 0 + for cgi := range cg.ConsumerGroupInstances.IterBuffered() { + cgi.Val.Partition = partitions[i] + i++ + wg.Add(1) + go func(cgi *ConsumerGroupInstance) { + defer wg.Done() + // start processing + // start consuming from the last offset + }(cgi.Val) + } + wg.Wait() +} diff --git a/weed/mq/coordinator/coordinator.go b/weed/mq/coordinator/coordinator.go new file mode 100644 index 000000000..e94ac3371 --- /dev/null +++ b/weed/mq/coordinator/coordinator.go @@ -0,0 +1,36 @@ +package coordinator + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" +) + +type ConsumerGroupInstance struct { + ClientId string + // the consumer group instance may not have an active partition + Partition *topic.Partition + // processed message count + ProcessedMessageCount int64 +} +type ConsumerGroup struct { + // map a client id to a consumer group instance + ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] + MinimumActiveInstances int32 + MaximumActiveInstances int32 +} +type TopicConsumerGroups struct { + // map a consumer group name to a consumer group + ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] +} + +// Coordinator coordinates the instances in the consumer group for one topic. +// It is responsible for: +// 1. Assigning partitions to consumer instances. +// 2. Reassigning partitions when a consumer instance is down. +// 3. Reassigning partitions when a consumer instance is up. +type Coordinator struct { + // map client id to subscriber + Subscribers cmap.ConcurrentMap[string, *ConsumerGroupInstance] + // map topic name to consumer groups + TopicSubscribers cmap.ConcurrentMap[string, map[string]TopicConsumerGroups] +} diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go index 285bdcb36..79c830f13 100644 --- a/weed/mq/topic/partition.go +++ b/weed/mq/topic/partition.go @@ -30,3 +30,20 @@ func FromPbPartition(partition *mq_pb.Partition) Partition { RingSize: partition.RingSize, } } + +func SplitPartitions(targetCount int32) []*Partition { + partitions := make([]*Partition, 0, targetCount) + partitionSize := PartitionCount / targetCount + for i := int32(0); i < targetCount; i++ { + partitionStop := (i + 1) * partitionSize + if i == targetCount-1 { + partitionStop = PartitionCount + } + partitions = append(partitions, &Partition{ + RangeStart: i * partitionSize, + RangeStop: partitionStop, + RingSize: PartitionCount, + }) + } + return partitions +}