From 875f562779f239a140d1008732b5375c0e511e61 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 6 Sep 2023 23:16:41 -0700 Subject: [PATCH] server side send response at least once per second --- weed/mq/broker/broker_grpc_pub.go | 32 ++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 854093f8a..20a31f09c 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -7,6 +7,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sync/atomic" "time" ) @@ -99,20 +100,37 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS ackCounter := 0 var ackSequence int64 + var isStopping int32 respChan := make(chan *mq_pb.PublishResponse, 128) - defer close(respChan) + defer func() { + atomic.StoreInt32(&isStopping, 1) + response := &mq_pb.PublishResponse{ + Error: "end of stream", + } + respChan <- response + close(respChan) + }() go func() { + ticker := time.NewTicker(1 * time.Second) for { select { case resp := <-respChan: - if err := stream.Send(resp); err != nil { - glog.Errorf("Error sending setup response: %v", err) + if resp != nil { + if err := stream.Send(resp); err != nil { + glog.Errorf("Error sending setup response: %v", err) + } + } else { + return } - case <-time.After(1 * time.Second): - response := &mq_pb.PublishResponse{ - AckSequence: ackSequence, + case <-ticker.C: + if atomic.LoadInt32(&isStopping) == 0 { + response := &mq_pb.PublishResponse{ + AckSequence: ackSequence, + } + respChan <- response + } else { + return } - respChan <- response } } }()