diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 79393332f..37f937949 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -87,6 +87,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS localTopicPartition = topic.NewLocalPartition(t, p, true, nil) broker.localTopicManager.AddTopicPartition(t, localTopicPartition) } + stream.Send(response) } else { response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) @@ -106,7 +107,6 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS AckSequence: sequence, } if dataMessage := req.GetData(); dataMessage != nil { - print("+") localTopicPartition.Publish(dataMessage) } if err := stream.Send(response); err != nil { @@ -114,6 +114,8 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS } } + glog.Infof("publish stream closed") + return nil } diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index a540143a4..f5a454640 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -1,12 +1,34 @@ package main import ( + "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "log" + "sync" + "time" ) -func main() { +var ( + messageCount = flag.Int("n", 1000, "message count") + concurrency = flag.Int("c", 4, "concurrency count") +) +func doPublish(publisher *pub_client.TopicPublisher, id int) { + startTime := time.Now() + for i := 0; i < *messageCount / *concurrency; i++ { + // Simulate publishing a message + key := []byte(fmt.Sprintf("key-%d-%d", id, i)) + value := []byte(fmt.Sprintf("value-%d-%d", id, i)) + publisher.Publish(key, value) // Call your publisher function here + // println("Published", string(key), string(value)) + } + elapsed := time.Since(startTime) + log.Printf("Publisher %d finished in %s", id, elapsed) +} + +func main() { + flag.Parse() publisher := pub_client.NewTopicPublisher( "test", "test") if err := publisher.Connect("localhost:17777"); err != nil { @@ -14,16 +36,22 @@ func main() { return } - for i := 0; i < 10; i++ { - if dataErr := publisher.Publish( - []byte(fmt.Sprintf("key-%d", i)), - []byte(fmt.Sprintf("value-%d", i)), - ); dataErr != nil { - fmt.Println(dataErr) - return - } + startTime := time.Now() + + // Start multiple publishers + var wg sync.WaitGroup + for i := 0; i < *concurrency; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + doPublish(publisher, id) + }(i) } - fmt.Println("done publishing") + // Wait for all publishers to finish + wg.Wait() + elapsed := time.Since(startTime) + + log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) } diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go index 5a9376ab1..8e279fb0b 100644 --- a/weed/mq/client/pub_client/lookup.go +++ b/weed/mq/client/pub_client/lookup.go @@ -24,43 +24,21 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error { return err } for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { - // partition => broker + // partition => publishClient + publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) + if err != nil { + return err + } + for redirectTo != "" { + publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo) + if err != nil { + return err + } + } p.partition2Broker.Insert( brokerPartitionAssignment.Partition.RangeStart, brokerPartitionAssignment.Partition.RangeStop, - brokerPartitionAssignment.LeaderBroker) - - // broker => publish client - // send init message - // save the publishing client - brokerAddress := brokerPartitionAssignment.LeaderBroker - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) - if err != nil { - return fmt.Errorf("dial broker %s: %v", brokerAddress, err) - } - brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - publishClient, err := brokerClient.Publish(context.Background()) - if err != nil { - return fmt.Errorf("create publish client: %v", err) - } - p.broker2PublishClient.Set(brokerAddress, publishClient) - if err = publishClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Init{ - Init: &mq_pb.PublishRequest_InitMessage{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, - Partition: &mq_pb.Partition{ - RingSize: brokerPartitionAssignment.Partition.RingSize, - RangeStart: brokerPartitionAssignment.Partition.RangeStart, - RangeStop: brokerPartitionAssignment.Partition.RangeStop, - }, - }, - }, - }); err != nil { - return fmt.Errorf("send init message: %v", err) - } + publishClient) } return nil }) @@ -70,3 +48,62 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error { } return nil } + +// broker => publish client +// send init message +// save the publishing client +func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) { + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) + if err != nil { + return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + stream, err := brokerClient.Publish(context.Background()) + if err != nil { + return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err) + } + publishClient = &PublishClient{ + SeaweedMessaging_PublishClient: stream, + Broker: brokerAddress, + } + if err = publishClient.Send(&mq_pb.PublishRequest{ + Message: &mq_pb.PublishRequest_Init{ + Init: &mq_pb.PublishRequest_InitMessage{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + Partition: &mq_pb.Partition{ + RingSize: partition.RingSize, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + }, + }, + }, + }); err != nil { + return publishClient, redirectTo, fmt.Errorf("send init message: %v", err) + } + resp, err := stream.Recv() + if err != nil { + return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err) + } + if resp.Error != "" { + return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error) + } + if resp.RedirectToBroker != "" { + redirectTo = resp.RedirectToBroker + return publishClient, redirectTo, nil + } + + go func() { + for { + _, err := publishClient.Recv() + if err != nil { + publishClient.Err = err + fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) + return + } + } + }() + return publishClient, redirectTo, nil +} diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 0ecb55c9b..de4714831 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -12,14 +12,21 @@ func (p *TopicPublisher) Publish(key, value []byte) error { if hashKey < 0 { hashKey = -hashKey } - brokerAddress, found := p.partition2Broker.Floor(hashKey, hashKey) + publishClient, found := p.partition2Broker.Floor(hashKey, hashKey) if !found { return fmt.Errorf("no broker found for key %d", hashKey) } - publishClient, found := p.broker2PublishClient.Get(brokerAddress) - if !found { - return fmt.Errorf("no publish client found for broker %s", brokerAddress) - } + p.Lock() + defer p.Unlock() + // dead lock here + //google.golang.org/grpc/internal/transport.(*writeQuota).get(flowcontrol.go:59) + //google.golang.org/grpc/internal/transport.(*http2Client).Write(http2_client.go:1047) + //google.golang.org/grpc.(*csAttempt).sendMsg(stream.go:1040) + //google.golang.org/grpc.(*clientStream).SendMsg.func2(stream.go:892) + //google.golang.org/grpc.(*clientStream).withRetry(stream.go:752) + //google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894) + //github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141) + //github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19) if err := publishClient.Send(&mq_pb.PublishRequest{ Message: &mq_pb.PublishRequest_Data{ Data: &mq_pb.DataMessage{ diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 5963838ce..f264375fa 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -1,32 +1,37 @@ package pub_client import ( - cmap "github.com/orcaman/concurrent-map/v2" "github.com/rdleal/intervalst/interval" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "sync" ) type PublisherConfiguration struct { } + +type PublishClient struct { + mq_pb.SeaweedMessaging_PublishClient + Broker string + Err error +} type TopicPublisher struct { - namespace string - topic string - partition2Broker *interval.SearchTree[string, int32] - broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] - grpcDialOption grpc.DialOption + namespace string + topic string + partition2Broker *interval.SearchTree[*PublishClient, int32] + grpcDialOption grpc.DialOption + sync.Mutex // protects grpc } func NewTopicPublisher(namespace, topic string) *TopicPublisher { return &TopicPublisher{ namespace: namespace, topic: topic, - partition2Broker: interval.NewSearchTree[string](func(a, b int32) int { + partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int { return int(a - b) }), - broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](), - grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), } }