release local topic partition if no publisher and subscribers

This commit is contained in:
chrislu 2024-01-16 08:43:07 -08:00
parent f782165638
commit 3795d8dca8
4 changed files with 24 additions and 1 deletions

View file

@ -48,8 +48,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// TODO check whether current broker should be the leader for the topic partition // TODO check whether current broker should be the leader for the topic partition
ackInterval := 1 ackInterval := 1
initMessage := req.GetInit() initMessage := req.GetInit()
var t topic.Topic
var p topic.Partition
if initMessage != nil { if initMessage != nil {
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil { if localTopicPartition == nil {
response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
@ -75,6 +77,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
atomic.StoreInt32(&isStopping, 1) atomic.StoreInt32(&isStopping, 1)
close(respChan) close(respChan)
localTopicPartition.Publishers.RemovePublisher(clientName) localTopicPartition.Publishers.RemovePublisher(clientName)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveTopicPartition(t, p)
}
}() }()
go func() { go func() {
ticker := time.NewTicker(1 * time.Second) ticker := time.NewTicker(1 * time.Second)

View file

@ -58,6 +58,9 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
isConnected = false isConnected = false
localTopicPartition.Subscribers.RemoveSubscriber(clientName) localTopicPartition.Subscribers.RemoveSubscriber(clientName)
glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveTopicPartition(t, partition)
}
}() }()
var startPosition log_buffer.MessagePosition var startPosition log_buffer.MessagePosition

View file

@ -109,3 +109,11 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
time.Sleep(113 * time.Millisecond) time.Sleep(113 * time.Millisecond)
} }
} }
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() {
p.logBuffer.ShutdownLogBuffer()
hasShutdown = true
}
return
}

View file

@ -47,3 +47,10 @@ func (p *LocalPartitionSubscribers) SignalShutdown() {
Subscriber.SignalShutdown() Subscriber.SignalShutdown()
} }
} }
func (p *LocalPartitionSubscribers) IsEmpty() bool {
p.SubscribersLock.RLock()
defer p.SubscribersLock.RUnlock()
return len(p.Subscribers) == 0
}