seaweedfs/weed/mq/sub_coordinator/coordinator.go
Chris Lu 580940bf82
Merge accumulated changes related to message queue (#5098)
* balance partitions on brokers

* prepare topic partition first and then publish, move partition

* purge unused APIs

* clean up

* adjust logs

* add BalanceTopics() grpc API

* configure topic

* configure topic command

* refactor

* repair missing partitions

* sequence of operations to ensure ordering

* proto to close publishers and consumers

* rename file

* topic partition versioned by unixTimeNs

* create local topic partition

* close publishers

* randomize the client name

* wait until no publishers

* logs

* close stop publisher channel

* send last ack

* comments

* comment

* comments

* support list of brokers

* add cli options

* Update .gitignore

* logs

* return io.eof directly

* refactor

* optionally create topic

* refactoring

* detect consumer disconnection

* sub client wait for more messages

* subscribe by time stamp

* rename

* rename to sub_balancer

* rename

* adjust comments

* rename

* fix compilation

* rename

* rename

* SubscriberToSubCoordinator

* sticky rebalance

* go fmt

* add tests

* balance partitions on brokers

* prepare topic partition first and then publish, move partition

* purge unused APIs

* clean up

* adjust logs

* add BalanceTopics() grpc API

* configure topic

* configure topic command

* refactor

* repair missing partitions

* sequence of operations to ensure ordering

* proto to close publishers and consumers

* rename file

* topic partition versioned by unixTimeNs

* create local topic partition

* close publishers

* randomize the client name

* wait until no publishers

* logs

* close stop publisher channel

* send last ack

* comments

* comment

* comments

* support list of brokers

* add cli options

* Update .gitignore

* logs

* return io.eof directly

* refactor

* optionally create topic

* refactoring

* detect consumer disconnection

* sub client wait for more messages

* subscribe by time stamp

* rename

* rename to sub_balancer

* rename

* adjust comments

* rename

* fix compilation

* rename

* rename

* SubscriberToSubCoordinator

* sticky rebalance

* go fmt

* add tests

* tracking topic=>broker

* merge

* comment
2023-12-11 12:05:54 -08:00

87 lines
2.5 KiB
Go

package sub_coordinator
import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
type TopicConsumerGroups struct {
// map a consumer group name to a consumer group
ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
}
// Coordinator coordinates the instances in the consumer group for one topic.
// It is responsible for:
// 1. (Maybe) assigning partitions when a consumer instance is up/down.
type Coordinator struct {
// map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
balancer *pub_balancer.Balancer
}
func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
return &Coordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](),
balancer: balancer,
}
}
func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic) *TopicConsumerGroups {
topicName := toTopicName(topic)
tcg, _ := c.TopicSubscribers.Get(topicName)
if tcg == nil {
tcg = &TopicConsumerGroups{
ConsumerGroups: cmap.New[*ConsumerGroup](),
}
c.TopicSubscribers.Set(topicName, tcg)
}
return tcg
}
func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic) {
topicName := toTopicName(topic)
c.TopicSubscribers.Remove(topicName)
}
func toTopicName(topic *mq_pb.Topic) string {
topicName := topic.Namespace + "." + topic.Name
return topicName
}
func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance{
tcg := c.GetTopicConsumerGroups(topic)
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
if cg == nil {
cg = NewConsumerGroup()
tcg.ConsumerGroups.Set(consumerGroup, cg)
}
cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
if cgi == nil {
cgi = NewConsumerGroupInstance(consumerGroupInstance)
cg.ConsumerGroupInstances.Set(consumerGroupInstance, cgi)
}
cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic)
return cgi
}
func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) {
tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
if tcg == nil {
return
}
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
if cg == nil {
return
}
cg.ConsumerGroupInstances.Remove(consumerGroupInstance)
cg.OnRemoveConsumerGroupInstance(consumerGroupInstance, topic)
if cg.ConsumerGroupInstances.Count() == 0 {
tcg.ConsumerGroups.Remove(consumerGroup)
}
if tcg.ConsumerGroups.Count() == 0 {
c.RemoveTopic(topic)
}
}