diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index e7cbb6441..9e951ce65 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -50,6 +50,17 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs lock := broker.topicManager.RequestLock(tp, topicConfig, false) defer broker.topicManager.ReleaseLock(tp, false) + isConnected := true + go func() { + for isConnected { + time.Sleep(1737 * time.Millisecond) + if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil { + isConnected = false + lock.cond.Signal() + } + } + }() + lastReadTime := time.Now() switch in.Init.StartPosition { case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP: @@ -93,8 +104,10 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs } if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { - // println("stopping from persisted logs") - return err + if err != io.EOF { + println("stopping from persisted logs", err.Error()) + return err + } } if processedTsNs != 0 { @@ -105,7 +118,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs lock.Mutex.Lock() lock.cond.Wait() lock.Mutex.Unlock() - return true + return isConnected }, eachLogEntryFn) return err diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go index 5465c5913..3eabc6210 100644 --- a/weed/messaging/msgclient/sub_chan.go +++ b/weed/messaging/msgclient/sub_chan.go @@ -27,7 +27,7 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha if err != nil { return nil, err } - sc, err := setupSubscriberClient(grpcConnection, subscriberId, "chan", chanName, 0, time.Unix(0, 0)) + sc, err := setupSubscriberClient(grpcConnection, tp, subscriberId, time.Unix(0, 0)) if err != nil { return nil, err } @@ -48,6 +48,10 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha log.Printf("fail to receive from netchan %s: %v", chanName, subErr) return } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } if resp.Data.IsClose { t.stream.Send(&messaging_pb.SubscriberMessage{ IsClose: true, diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index efbfa0337..2e66923e2 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -5,6 +5,7 @@ import ( "io" "time" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "google.golang.org/grpc" ) @@ -14,7 +15,6 @@ type Subscriber struct { subscriberId string } -/* func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ @@ -23,7 +23,16 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) + tp := broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + client, err := setupSubscriberClient(grpcClientConn, tp, subscriberId, startTime) if err != nil { return nil, err } @@ -36,22 +45,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, }, nil } -func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { - - stream, err := setupSubscriberClient(subscriberId, namespace, topic, partition, startTime) - if err != nil { - return stream, err - } - if newBroker != nil { - - } - - return stream, nil - -} -*/ - -func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { +func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background()) if err != nil { return @@ -60,9 +54,9 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, // send init message err = stream.Send(&messaging_pb.SubscriberMessage{ Init: &messaging_pb.SubscriberMessage_InitMessage{ - Namespace: namespace, - Topic: topic, - Partition: partition, + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, TimestampNs: startTime.UnixNano(), SubscriberId: subscriberId, @@ -90,6 +84,10 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M println(listenErr.Error()) return listenErr } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } processFn(resp.Data) } }