diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go index ebb6d3f2a..b0fc5afbf 100644 --- a/weed/messaging/msgclient/publisher.go +++ b/weed/messaging/msgclient/publisher.go @@ -17,7 +17,6 @@ type Publisher struct { publisherId string } -/* func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ @@ -25,11 +24,16 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* } publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := setupPublisherClient(broker.TopicPartition{ + tp := broker.TopicPartition{ Namespace: namespace, Topic: topic, Partition: int32(i), - }) + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + client, err := setupPublisherClient(grpcClientConn, tp) if err != nil { return nil, err } @@ -40,7 +44,6 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* topicConfiguration: topicConfiguration, }, nil } -*/ func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { @@ -113,9 +116,3 @@ func (p *Publisher) Publish(m *messaging_pb.Message) error { Data: m, }) } - -func (p *Publisher) Shutdown() { - for _, client := range p.publishClients { - client.CloseSend() - } -}