From 39941edc0bae3b9a4a2c3344caf494f7ab80a82a Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 7 Sep 2023 23:55:19 -0700 Subject: [PATCH] add publisher shutdown --- weed/mq/broker/broker_grpc_pub.go | 6 +----- weed/mq/client/cmd/weed_pub/publisher.go | 1 + weed/mq/client/pub_client/lookup.go | 6 ++++++ weed/mq/client/pub_client/publisher.go | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 20a31f09c..acbffefba 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -104,10 +104,6 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS respChan := make(chan *mq_pb.PublishResponse, 128) defer func() { atomic.StoreInt32(&isStopping, 1) - response := &mq_pb.PublishResponse{ - Error: "end of stream", - } - respChan <- response close(respChan) }() go func() { @@ -117,7 +113,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS case resp := <-respChan: if resp != nil { if err := stream.Send(resp); err != nil { - glog.Errorf("Error sending setup response: %v", err) + glog.Errorf("Error sending response %v: %v", resp, err) } } else { return diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index f5a454640..03674db3f 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -51,6 +51,7 @@ func main() { // Wait for all publishers to finish wg.Wait() elapsed := time.Since(startTime) + publisher.Shutdown() log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go index a0b4298d1..28cb29015 100644 --- a/weed/mq/client/pub_client/lookup.go +++ b/weed/mq/client/pub_client/lookup.go @@ -5,6 +5,8 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func (p *TopicPublisher) doLookup(brokerAddress string) error { @@ -100,6 +102,10 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str for { _, err := publishClient.Recv() if err != nil { + e, ok := status.FromError(err) + if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { + return + } publishClient.Err = err fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) return diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index f264375fa..7073457f3 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -2,10 +2,12 @@ package pub_client import ( "github.com/rdleal/intervalst/interval" + "github.com/seaweedfs/seaweedfs/weed/mq/broker" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "sync" + "time" ) type PublisherConfiguration struct { @@ -41,3 +43,15 @@ func (p *TopicPublisher) Connect(bootstrapBroker string) error { } return nil } + +func (p *TopicPublisher) Shutdown() error { + + if clients, found := p.partition2Broker.AllIntersections(0, broker.MaxPartitionCount); found { + for _, client := range clients { + client.CloseSend() + } + } + time.Sleep(1100 * time.Millisecond) + + return nil +}