refactor TopicPartition struct

This commit is contained in:
chrislu 2023-10-02 01:01:45 -07:00
parent 2a578b9033
commit 734178093e
6 changed files with 18 additions and 24 deletions

View file

@ -49,10 +49,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
for _, topicPartitionStats := range stats.Stats { for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{ tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{ TopicPartition: topic.TopicPartition{
Namespace: topicPartitionStats.Topic.Namespace, Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
Topic: topicPartitionStats.Topic.Name, Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize},
RangeStart: topicPartitionStats.Partition.RangeStart,
RangeStop: topicPartitionStats.Partition.RangeStop,
}, },
ConsumerCount: topicPartitionStats.ConsumerCount, ConsumerCount: topicPartitionStats.ConsumerCount,
IsLeader: topicPartitionStats.IsLeader, IsLeader: topicPartitionStats.IsLeader,
@ -73,10 +71,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) { func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) {
tps := &TopicPartitionStats{ tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{ TopicPartition: topic.TopicPartition{
Namespace: t.Namespace, Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
Topic: t.Name, Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop},
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
}, },
ConsumerCount: 0, ConsumerCount: 0,
IsLeader: true, IsLeader: true,

View file

@ -16,7 +16,7 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b
for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
topicPartitionStat := topicPartitionStatsItem.Val topicPartitionStat := topicPartitionStatsItem.Val
if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
topicPartitionStat.TopicPartition.Topic == topic.Name { topicPartitionStat.TopicPartition.Name == topic.Name {
assignment := &mq_pb.BrokerPartitionAssignment{ assignment := &mq_pb.BrokerPartitionAssignment{
Partition: &mq_pb.Partition{ Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount, RingSize: MaxPartitionCount,

View file

@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb
topicPartitionStat := topicPartitionStatsItem.Val topicPartitionStat := topicPartitionStatsItem.Val
topic := &mq_pb.Topic{ topic := &mq_pb.Topic{
Namespace: topicPartitionStat.TopicPartition.Namespace, Namespace: topicPartitionStat.TopicPartition.Namespace,
Name: topicPartitionStat.TopicPartition.Topic, Name: topicPartitionStat.TopicPartition.Name,
} }
topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name) topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name)
if _, found := knownTopics[topicKey]; found { if _, found := knownTopics[topicKey]; found {

View file

@ -75,10 +75,12 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
for _, localPartition := range localTopic.Partitions { for _, localPartition := range localTopic.Partitions {
topicPartition := &TopicPartition{ topicPartition := &TopicPartition{
Namespace: string(localTopic.Namespace), Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name},
Topic: localTopic.Name, Partition: Partition{
RangeStart: localPartition.RangeStart, RingSize: localPartition.RingSize,
RangeStop: localPartition.RangeStop, RangeStart: localPartition.RangeStart,
RangeStop: localPartition.RangeStop,
},
} }
stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{ stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{
Topic: &mq_pb.Topic{ Topic: &mq_pb.Topic{

View file

@ -7,14 +7,12 @@ import (
"time" "time"
) )
type Namespace string
type Topic struct { type Topic struct {
Namespace Namespace Namespace string
Name string Name string
} }
func NewTopic(namespace Namespace, name string) Topic { func NewTopic(namespace string, name string) Topic {
return Topic{ return Topic{
Namespace: namespace, Namespace: namespace,
Name: name, Name: name,
@ -22,7 +20,7 @@ func NewTopic(namespace Namespace, name string) Topic {
} }
func FromPbTopic(topic *mq_pb.Topic) Topic { func FromPbTopic(topic *mq_pb.Topic) Topic {
return Topic{ return Topic{
Namespace: Namespace(topic.Namespace), Namespace: topic.Namespace,
Name: topic.Name, Name: topic.Name,
} }
} }
@ -41,7 +39,7 @@ type Segment struct {
func FromPbSegment(segment *mq_pb.Segment) *Segment { func FromPbSegment(segment *mq_pb.Segment) *Segment {
return &Segment{ return &Segment{
Topic: Topic{ Topic: Topic{
Namespace: Namespace(segment.Namespace), Namespace: segment.Namespace,
Name: segment.Topic, Name: segment.Topic,
}, },
Id: segment.Id, Id: segment.Id,

View file

@ -3,10 +3,8 @@ package topic
import "fmt" import "fmt"
type TopicPartition struct { type TopicPartition struct {
Namespace string Topic
Topic string Partition
RangeStart int32
RangeStop int32
} }
func (tp *TopicPartition) String() string { func (tp *TopicPartition) String() string {