diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 45a573633..e0e138ef2 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -48,8 +48,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // TODO check whether current broker should be the leader for the topic partition ackInterval := 1 initMessage := req.GetInit() + var t topic.Topic + var p topic.Partition if initMessage != nil { - t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) if localTopicPartition == nil { response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) @@ -75,6 +77,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis atomic.StoreInt32(&isStopping, 1) close(respChan) localTopicPartition.Publishers.RemovePublisher(clientName) + if localTopicPartition.MaybeShutdownLocalPartition() { + b.localTopicManager.RemoveTopicPartition(t, p) + } }() go func() { ticker := time.NewTicker(1 * time.Second) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 5ab47b61f..d6114ad23 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -58,6 +58,9 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest isConnected = false localTopicPartition.Subscribers.RemoveSubscriber(clientName) glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) + if localTopicPartition.MaybeShutdownLocalPartition() { + b.localTopicManager.RemoveTopicPartition(t, partition) + } }() var startPosition log_buffer.MessagePosition diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index f4a080f38..9b7281b65 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -109,3 +109,11 @@ func (p *LocalPartition) WaitUntilNoPublishers() { time.Sleep(113 * time.Millisecond) } } + +func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { + if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { + p.logBuffer.ShutdownLogBuffer() + hasShutdown = true + } + return +} diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go index e177ec7e8..caadff278 100644 --- a/weed/mq/topic/local_partition_subscribers.go +++ b/weed/mq/topic/local_partition_subscribers.go @@ -47,3 +47,10 @@ func (p *LocalPartitionSubscribers) SignalShutdown() { Subscriber.SignalShutdown() } } + +func (p *LocalPartitionSubscribers) IsEmpty() bool { + p.SubscribersLock.RLock() + defer p.SubscribersLock.RUnlock() + + return len(p.Subscribers) == 0 +}