diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 0172352b0..6d0eb0d43 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -23,7 +23,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { stream, err := client.SubscriberToSubCoordinator(ctx) if err != nil { - glog.V(1).Infof("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) + glog.V(0).Infof("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) return err } waitTime = 1 * time.Second @@ -42,7 +42,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { }, }, }); err != nil { - glog.V(1).Infof("subscriber %s/%s send init: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) + glog.V(0).Infof("subscriber %s/%s send init: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) return err } @@ -50,7 +50,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { for { resp, err := stream.Recv() if err != nil { - glog.V(1).Infof("subscriber %s/%s receive: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) + glog.V(0).Infof("subscriber %s/%s receive: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) return err } assignment := resp.GetAssignment() @@ -63,7 +63,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { return nil }) } - glog.V(4).Infof("subscriber %s/%s/%s waiting for more assignments", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + glog.V(0).Infof("subscriber %s/%s/%s waiting for more assignments", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) if waitTime < 10*time.Second { waitTime += 1 * time.Second } @@ -109,11 +109,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s Name: sub.ContentConfig.Topic, }, PartitionOffset: &mq_pb.PartitionOffset{ - Partition: &mq_pb.Partition{ - RingSize: partition.RingSize, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, - }, + Partition: partition, StartTsNs: sub.alreadyProcessedTsNs, StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, }, @@ -143,7 +139,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s }() for { - glog.V(3).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) resp, err := subscribeClient.Recv() if err != nil { return fmt.Errorf("subscribe recv: %v", err) @@ -163,6 +159,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s return nil } case *mq_pb.SubscribeMessageResponse_Ctrl: + // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { return io.EOF }