subscriber can be notified of the assignment change when topic is just configured

Next: Subscriber needs to read by the timestamp offset.
This commit is contained in:
chrislu 2024-01-03 13:30:30 -08:00
parent efb695fd93
commit 35869b5c80
8 changed files with 55 additions and 25 deletions

View file

@ -124,7 +124,7 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st
} }
func (lock *LiveLock) IsLocked() bool { func (lock *LiveLock) IsLocked() bool {
return lock.isLocked return lock!=nil && lock.isLocked
} }
func (lock *LiveLock) StopLock() error { func (lock *LiveLock) StopLock() error {

View file

@ -47,6 +47,7 @@ type MessageQueueBroker struct {
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
pub_broker_balancer := pub_balancer.NewBalancer() pub_broker_balancer := pub_balancer.NewBalancer()
coordinator := sub_coordinator.NewCoordinator(pub_broker_balancer)
mqBroker = &MessageQueueBroker{ mqBroker = &MessageQueueBroker{
option: option, option: option,
@ -55,9 +56,10 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
filers: make(map[pb.ServerAddress]struct{}), filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(), localTopicManager: topic.NewLocalTopicManager(),
Balancer: pub_broker_balancer, Balancer: pub_broker_balancer,
Coordinator: sub_coordinator.NewCoordinator(pub_broker_balancer), Coordinator: coordinator,
} }
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
go mqBroker.MasterClient.KeepConnectedToMaster() go mqBroker.MasterClient.KeepConnectedToMaster()

View file

@ -30,11 +30,11 @@ func main() {
Namespace: *namespace, Namespace: *namespace,
Topic: *topic, Topic: *topic,
Filter: "", Filter: "",
StartTime: time.Now(), StartTime: time.Unix(0, 0),
} }
processorConfig := sub_client.ProcessorConfiguration{ processorConfig := sub_client.ProcessorConfiguration{
ConcurrentPartitionLimit: 1, ConcurrentPartitionLimit: 6,
} }
brokers := strings.Split(*seedBrokers, ",") brokers := strings.Split(*seedBrokers, ",")

View file

@ -145,7 +145,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
glog.V(3).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) glog.V(3).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv() resp, err := subscribeClient.Recv()
if err != nil { if err != nil {
return fmt.Errorf("subscribe error: %v", err) return fmt.Errorf("subscribe recv: %v", err)
} }
if resp.Message == nil { if resp.Message == nil {
continue continue
@ -156,10 +156,10 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
if processErr != nil { if processErr != nil {
return fmt.Errorf("process error: %v", processErr) return fmt.Errorf("process error: %v", processErr)
} }
sub.alreadyProcessedTsNs = m.Data.TsNs
if !shouldContinue { if !shouldContinue {
return nil return nil
} }
sub.alreadyProcessedTsNs = m.Data.TsNs
case *mq_pb.SubscribeResponse_Ctrl: case *mq_pb.SubscribeResponse_Ctrl:
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return io.EOF return io.EOF

View file

@ -32,6 +32,7 @@ type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
// Collected from all brokers when they connect to the broker leader // Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
} }
func NewBalancer() *Balancer { func NewBalancer() *Balancer {

View file

@ -33,7 +33,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
} }
} }
} }
if len(assignments) > 0 || !publish { if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) {
// glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments) // glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments)
return assignments, nil return assignments, nil
} }
@ -48,5 +48,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
if balancer.Brokers.IsEmpty() { if balancer.Brokers.IsEmpty() {
return nil, ErrNoBroker return nil, ErrNoBroker
} }
return allocateTopicPartitions(balancer.Brokers, partitionCount), nil assignments = allocateTopicPartitions(balancer.Brokers, partitionCount)
balancer.OnPartitionChange(topic, assignments)
return
} }

View file

@ -52,32 +52,40 @@ func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){
cg.reBalanceTimer = nil cg.reBalanceTimer = nil
} }
cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() { cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() {
cg.RebalanceConsumberGroupInstances(reason) cg.RebalanceConsumberGroupInstances(nil, reason)
cg.reBalanceTimer = nil cg.reBalanceTimer = nil
}) })
} }
func (cg *ConsumerGroup) OnPartitionListChange() { func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
if cg.reBalanceTimer != nil { if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop() cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil cg.reBalanceTimer = nil
} }
cg.RebalanceConsumberGroupInstances("partition list change") partitionSlotToBrokerList := pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
for _, assignment := range assignments {
partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker)
}
cg.RebalanceConsumberGroupInstances(partitionSlotToBrokerList, "partition list change")
} }
func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) { func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) {
println("rebalance due to", reason, "...") glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason)
now := time.Now().UnixNano() now := time.Now().UnixNano()
// collect current topic partitions // collect current topic partitions
partitionSlotToBrokerList, found := cg.pubBalancer.TopicToBrokers.Get(cg.topic.String()) partitionSlotToBrokerList := knownPartitionSlotToBrokerList
if !found { if partitionSlotToBrokerList == nil {
glog.V(0).Infof("topic %s not found in balancer", cg.topic.String()) var found bool
return partitionSlotToBrokerList, found = cg.pubBalancer.TopicToBrokers.Get(cg.topic.String())
if !found {
glog.V(0).Infof("topic %s not found in balancer", cg.topic.String())
return
}
} }
// collect current consumer group instance ids // collect current consumer group instance ids
consumerInstanceIds := make([]string, 0) var consumerInstanceIds []string
for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() { for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() {
consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId) consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId)
} }
@ -116,6 +124,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) {
}, },
}, },
} }
println("sending response to", consumerGroupInstance.InstanceId, "...")
consumerGroupInstance.ResponseChan <- response consumerGroupInstance.ResponseChan <- response
} }

View file

@ -28,14 +28,16 @@ func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
} }
} }
func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic) *TopicConsumerGroups { func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
topicName := toTopicName(topic) topicName := toTopicName(topic)
tcg, _ := c.TopicSubscribers.Get(topicName) tcg, _ := c.TopicSubscribers.Get(topicName)
if tcg == nil { if tcg == nil && createIfMissing{
tcg = &TopicConsumerGroups{ tcg = &TopicConsumerGroups{
ConsumerGroups: cmap.New[*ConsumerGroup](), ConsumerGroups: cmap.New[*ConsumerGroup](),
} }
c.TopicSubscribers.Set(topicName, tcg) if !c.TopicSubscribers.SetIfAbsent(topicName, tcg) {
tcg, _ = c.TopicSubscribers.Get(topicName)
}
} }
return tcg return tcg
} }
@ -50,23 +52,27 @@ func toTopicName(topic *mq_pb.Topic) string {
} }
func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance { func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance {
tcg := c.GetTopicConsumerGroups(topic) tcg := c.GetTopicConsumerGroups(topic, true)
cg, _ := tcg.ConsumerGroups.Get(consumerGroup) cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
if cg == nil { if cg == nil {
cg = NewConsumerGroup(topic, c.balancer) cg = NewConsumerGroup(topic, c.balancer)
tcg.ConsumerGroups.Set(consumerGroup, cg) if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg){
cg, _ = tcg.ConsumerGroups.Get(consumerGroup)
}
} }
cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance) cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
if cgi == nil { if cgi == nil {
cgi = NewConsumerGroupInstance(consumerGroupInstance) cgi = NewConsumerGroupInstance(consumerGroupInstance)
cg.ConsumerGroupInstances.Set(consumerGroupInstance, cgi) if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi){
cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance)
}
} }
cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic) cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic)
return cgi return cgi
} }
func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) { func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) {
tcg, _ := c.TopicSubscribers.Get(toTopicName(topic)) tcg := c.GetTopicConsumerGroups(topic, false)
if tcg == nil { if tcg == nil {
return return
} }
@ -83,3 +89,13 @@ func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance stri
c.RemoveTopic(topic) c.RemoveTopic(topic)
} }
} }
func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) {
tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
if tcg == nil {
return
}
for _, cg := range tcg.ConsumerGroups.Items() {
cg.OnPartitionListChange(assignments)
}
}