diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go index e5cebb42d..7cc8601eb 100644 --- a/weed/messaging/broker/topic_manager.go +++ b/weed/messaging/broker/topic_manager.go @@ -36,14 +36,14 @@ type TopicCursor struct { type TopicManager struct { sync.Mutex - topicControls map[TopicPartition]*TopicCursor - broker *MessageBroker + topicCursors map[TopicPartition]*TopicCursor + broker *MessageBroker } func NewTopicManager(messageBroker *MessageBroker) *TopicManager { return &TopicManager{ - topicControls: make(map[TopicPartition]*TopicCursor), - broker: messageBroker, + topicCursors: make(map[TopicPartition]*TopicCursor), + broker: messageBroker, } } @@ -79,10 +79,10 @@ func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messa tm.Lock() defer tm.Unlock() - lock, found := tm.topicControls[partition] + lock, found := tm.topicCursors[partition] if !found { lock = &TopicCursor{} - tm.topicControls[partition] = lock + tm.topicCursors[partition] = lock lock.subscriptions = NewTopicPartitionSubscriptions() lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig) } @@ -98,7 +98,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) tm.Lock() defer tm.Unlock() - lock, found := tm.topicControls[partition] + lock, found := tm.topicCursors[partition] if !found { return } @@ -108,7 +108,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) lock.subscriberCount-- } if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { - delete(tm.topicControls, partition) + delete(tm.topicCursors, partition) lock.logBuffer.Shutdown() } } @@ -117,7 +117,7 @@ func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { tm.Lock() defer tm.Unlock() - for k := range tm.topicControls { + for k := range tm.topicCursors { tps = append(tps, k) } return