From 3a57aef7a910ba66d6026f73d1b3055c2d8a1e4d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 17 May 2020 17:33:53 -0700 Subject: [PATCH] sync subscribe() --- weed/messaging/msgclient/subscriber.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index 926e193dd..01e63df40 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -4,6 +4,7 @@ import ( "context" "io" "time" + "sync" "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" @@ -97,11 +98,17 @@ func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, // Subscribe starts goroutines to process the messages func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + var wg sync.WaitGroup for i := 0; i < len(s.subscriberClients); i++ { if s.subscriberClients[i] != nil { - go doSubscribe(s.subscriberClients[i], processFn) + wg.Add(1) + go func() { + defer wg.Done() + doSubscribe(s.subscriberClients[i], processFn) + }() } } + wg.Wait() } func (s *Subscriber) Shutdown() {