passing timestamp

This commit is contained in:
chrislu 2024-01-11 23:03:55 -08:00
parent 45994641e9
commit f750a5e03b
2 changed files with 4 additions and 6 deletions

View file

@ -71,8 +71,6 @@ func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartit
func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) { func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) {
glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason) glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason)
now := time.Now().UnixNano()
// collect current topic partitions // collect current topic partitions
partitionSlotToBrokerList := knownPartitionSlotToBrokerList partitionSlotToBrokerList := knownPartitionSlotToBrokerList
if partitionSlotToBrokerList == nil { if partitionSlotToBrokerList == nil {
@ -104,7 +102,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
if !found { if !found {
partitionSlots = make([]*PartitionSlotToConsumerInstance, 0) partitionSlots = make([]*PartitionSlotToConsumerInstance, 0)
} }
consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots, now) consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots)
assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(partitionSlots)) assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(partitionSlots))
for i, partitionSlot := range partitionSlots { for i, partitionSlot := range partitionSlots {
assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
@ -112,7 +110,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
RangeStop: partitionSlot.RangeStop, RangeStop: partitionSlot.RangeStop,
RangeStart: partitionSlot.RangeStart, RangeStart: partitionSlot.RangeStart,
RingSize: partitionSlotToBrokerList.RingSize, RingSize: partitionSlotToBrokerList.RingSize,
UnixTimeNs: now, UnixTimeNs: partitionSlot.UnixTimeNs,
}, },
Broker: partitionSlot.Broker, Broker: partitionSlot.Broker,
} }

View file

@ -23,10 +23,10 @@ func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *Part
} }
} }
func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance, unixTimeNs int64) []*topic.Partition { func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition {
partitions := make([]*topic.Partition, 0, len(slots)) partitions := make([]*topic.Partition, 0, len(slots))
for _, slot := range slots { for _, slot := range slots {
partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, unixTimeNs)) partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, slot.UnixTimeNs))
} }
return partitions return partitions
} }