diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 258b29f2f..d93cc8de8 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -49,10 +49,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ - Namespace: topicPartitionStats.Topic.Namespace, - Topic: topicPartitionStats.Topic.Name, - RangeStart: topicPartitionStats.Partition.RangeStart, - RangeStop: topicPartitionStats.Partition.RangeStop, + Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name}, + Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize}, }, ConsumerCount: topicPartitionStats.ConsumerCount, IsLeader: topicPartitionStats.IsLeader, @@ -73,10 +71,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ - Namespace: t.Namespace, - Topic: t.Name, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, + Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, + Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop}, }, ConsumerCount: 0, IsLeader: true, diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go index 7362fbab7..d5b78fc45 100644 --- a/weed/mq/balancer/lookup.go +++ b/weed/mq/balancer/lookup.go @@ -16,7 +16,7 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { topicPartitionStat := topicPartitionStatsItem.Val if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && - topicPartitionStat.TopicPartition.Topic == topic.Name { + topicPartitionStat.TopicPartition.Name == topic.Name { assignment := &mq_pb.BrokerPartitionAssignment{ Partition: &mq_pb.Partition{ RingSize: MaxPartitionCount, diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 93586e22d..74a3a9822 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb topicPartitionStat := topicPartitionStatsItem.Val topic := &mq_pb.Topic{ Namespace: topicPartitionStat.TopicPartition.Namespace, - Name: topicPartitionStat.TopicPartition.Topic, + Name: topicPartitionStat.TopicPartition.Name, } topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name) if _, found := knownTopics[topicKey]; found { diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index e3ee46a1e..0c54f2bb1 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -75,10 +75,12 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { for _, localPartition := range localTopic.Partitions { topicPartition := &TopicPartition{ - Namespace: string(localTopic.Namespace), - Topic: localTopic.Name, - RangeStart: localPartition.RangeStart, - RangeStop: localPartition.RangeStop, + Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name}, + Partition: Partition{ + RingSize: localPartition.RingSize, + RangeStart: localPartition.RangeStart, + RangeStop: localPartition.RangeStop, + }, } stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{ Topic: &mq_pb.Topic{ diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go index 430999179..3d457e6f1 100644 --- a/weed/mq/topic/topic.go +++ b/weed/mq/topic/topic.go @@ -7,14 +7,12 @@ import ( "time" ) -type Namespace string - type Topic struct { - Namespace Namespace + Namespace string Name string } -func NewTopic(namespace Namespace, name string) Topic { +func NewTopic(namespace string, name string) Topic { return Topic{ Namespace: namespace, Name: name, @@ -22,7 +20,7 @@ func NewTopic(namespace Namespace, name string) Topic { } func FromPbTopic(topic *mq_pb.Topic) Topic { return Topic{ - Namespace: Namespace(topic.Namespace), + Namespace: topic.Namespace, Name: topic.Name, } } @@ -41,7 +39,7 @@ type Segment struct { func FromPbSegment(segment *mq_pb.Segment) *Segment { return &Segment{ Topic: Topic{ - Namespace: Namespace(segment.Namespace), + Namespace: segment.Namespace, Name: segment.Topic, }, Id: segment.Id, diff --git a/weed/mq/topic/topic_partition.go b/weed/mq/topic/topic_partition.go index 3d927b1d8..20b33a7e4 100644 --- a/weed/mq/topic/topic_partition.go +++ b/weed/mq/topic/topic_partition.go @@ -3,10 +3,8 @@ package topic import "fmt" type TopicPartition struct { - Namespace string - Topic string - RangeStart int32 - RangeStop int32 + Topic + Partition } func (tp *TopicPartition) String() string {