diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go index 2ebad4ce6..53e7ffc7d 100644 --- a/weed/messaging/client/subscriber.go +++ b/weed/messaging/client/subscriber.go @@ -13,7 +13,7 @@ type Subscriber struct { subscriberId string } -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) { +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ PartitionCount: 4, @@ -21,7 +21,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i)) + client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) if err != nil { return nil, err } @@ -34,7 +34,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) }, nil } -func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { +func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) if err != nil { @@ -48,7 +48,7 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic Topic: topic, Partition: partition, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, - TimestampNs: time.Now().UnixNano(), + TimestampNs: startTime.UnixNano(), SubscriberId: subscriberId, }, })