diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index 926e193dd..01e63df40 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -4,6 +4,7 @@ import ( "context" "io" "time" + "sync" "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" @@ -97,11 +98,17 @@ func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, // Subscribe starts goroutines to process the messages func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + var wg sync.WaitGroup for i := 0; i < len(s.subscriberClients); i++ { if s.subscriberClients[i] != nil { - go doSubscribe(s.subscriberClients[i], processFn) + wg.Add(1) + go func() { + defer wg.Done() + doSubscribe(s.subscriberClients[i], processFn) + }() } } + wg.Wait() } func (s *Subscriber) Shutdown() {