package broker import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "sync/atomic" "time" ) // For a new or re-configured topic, or one of the broker went offline, // the pub clients ask one broker what are the brokers for all the topic partitions. // The broker will lock the topic on write. // 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers // 2. if the topic is found, return the brokers for the topic partitions // For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions. // The broker will lock the topic on read. // 1. if the topic is not found, return error // 2. if the topic is found, return the brokers for the topic partitions // // If the topic needs to be re-balanced, the admin client will lock the topic, // 1. collect throughput information for all the brokers // 2. adjust the topic partitions to the brokers // 3. notify the brokers to add/remove partitions to host // 3.1 When locking the topic, the partitions and brokers should be remembered in the lock. // 4. the brokers will stop process incoming messages if not the right partition // 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3 // 4.2 the sub clients will need to change the brokers to read from // // The following is from each individual component's perspective: // For a pub client // For current topic/partition, ask one broker for the brokers for the topic partitions // 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved. // For a sub client // For current topic/partition, ask one broker for the brokers for the topic partitions // 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved. // For a broker // Upon a pub client lookup: // 1. lock the topic // 2. if already has topic partition assignment, check all brokers are healthy // 3. if not, create topic partition assignment // 2. return the brokers for the topic partitions // 3. unlock the topic // Upon a sub client lookup: // 1. lock the topic // 2. if already has topic partition assignment, check all brokers are healthy // 3. if not, return error // 2. return the brokers for the topic partitions // 3. unlock the topic // For an admin tool // 0. collect stats from all the brokers, and find the topic worth moving // 1. lock the topic // 2. collect throughput information for all the brokers // 3. adjust the topic partitions to the brokers // 4. notify the brokers to add/remove partitions to host // 5. the brokers will stop process incoming messages if not the right partition // 6. unlock the topic /* The messages are buffered in memory, and saved to filer under /topics/////*.msg /topics////segment /topics//info/segment_.meta */ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { // 1. write to the volume server // 2. find the topic metadata owning filer // 3. write to the filer var localTopicPartition *topic.LocalPartition req, err := stream.Recv() if err != nil { return err } response := &mq_pb.PublishResponse{} // TODO check whether current broker should be the leader for the topic partition ackInterval := 1 initMessage := req.GetInit() if initMessage != nil { t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p) if localTopicPartition == nil { localTopicPartition = topic.NewLocalPartition(t, p, true, nil) broker.localTopicManager.AddTopicPartition(t, localTopicPartition) } ackInterval = int(initMessage.AckInterval) stream.Send(response) } else { response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) return stream.Send(response) } ackCounter := 0 var ackSequence int64 var isStopping int32 respChan := make(chan *mq_pb.PublishResponse, 128) defer func() { atomic.StoreInt32(&isStopping, 1) close(respChan) }() go func() { ticker := time.NewTicker(1 * time.Second) for { select { case resp := <-respChan: if resp != nil { if err := stream.Send(resp); err != nil { glog.Errorf("Error sending response %v: %v", resp, err) } } else { return } case <-ticker.C: if atomic.LoadInt32(&isStopping) == 0 { response := &mq_pb.PublishResponse{ AckSequence: ackSequence, } respChan <- response } else { return } } } }() // process each published messages for { // receive a message req, err := stream.Recv() if err != nil { return err } // Process the received message if dataMessage := req.GetData(); dataMessage != nil { localTopicPartition.Publish(dataMessage) } ackCounter++ ackSequence++ if ackCounter >= ackInterval { ackCounter = 0 // send back the ack response := &mq_pb.PublishResponse{ AckSequence: ackSequence, } respChan <- response } } glog.Infof("publish stream closed") return nil } // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { ret := &mq_pb.AssignTopicPartitionsResponse{} self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port)) for _, brokerPartition := range request.BrokerPartitionAssignments { localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition) broker.localTopicManager.AddTopicPartition( topic.FromPbTopic(request.Topic), localPartiton) if request.IsLeader { for _, follower := range localPartiton.FollowerBrokers { err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.AssignTopicPartitions(context.Background(), request) return err }) if err != nil { return ret, err } } } } return ret, nil }