seaweedfs/weed/mq/coordinator/consumer_group.go

93 lines
2.3 KiB
Go
Raw Normal View History

2023-10-02 08:02:27 +00:00
package coordinator
import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"sync"
)
func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) {
cg.MinimumActiveInstances = min
cg.MaximumActiveInstances = max
}
func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance {
cgi := &ConsumerGroupInstance{
ClientId: clientId,
}
cg.ConsumerGroupInstances.Set(clientId, cgi)
return cgi
}
func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) {
cg.ConsumerGroupInstances.Remove(clientId)
}
func (cg *ConsumerGroup) CoordinateIfNeeded() {
emptyInstanceCount, activeInstanceCount := int32(0), int32(0)
for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
if cgi.Val.Partition == nil {
// this consumer group instance is not assigned a partition
// need to assign one
emptyInstanceCount++
} else {
activeInstanceCount++
}
}
var delta int32
if emptyInstanceCount > 0 {
if cg.MinimumActiveInstances <= 0 {
// need to assign more partitions
delta = emptyInstanceCount
} else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances {
// need to assign more partitions
delta = cg.MinimumActiveInstances - activeInstanceCount
}
}
if cg.MaximumActiveInstances > 0 {
if activeInstanceCount > cg.MaximumActiveInstances {
// need to remove some partitions
delta = cg.MaximumActiveInstances - activeInstanceCount
}
}
if delta == 0 {
return
}
cg.doCoordinate(activeInstanceCount + delta)
}
func (cg *ConsumerGroup) doCoordinate(target int32) {
// stop existing instances from processing
var wg sync.WaitGroup
for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
if cgi.Val.Partition != nil {
wg.Add(1)
go func(cgi *ConsumerGroupInstance) {
defer wg.Done()
// stop processing
// flush internal state
// wait for all messages to be processed
// close the connection
}(cgi.Val)
}
}
wg.Wait()
partitions := topic.SplitPartitions(target)
// assign partitions to new instances
i := 0
for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
cgi.Val.Partition = partitions[i]
i++
wg.Add(1)
go func(cgi *ConsumerGroupInstance) {
defer wg.Done()
// start processing
// start consuming from the last offset
}(cgi.Val)
}
wg.Wait()
}