seaweedfs/weed/mq/broker/broker_grpc_pub.go
chrislu 89a1fd1751 Squashed commit of the following:
commit 4827425146
Author: chrislu <chris.lu@gmail.com>
Date:   Sat Sep 16 15:05:38 2023 -0700

    balancer works

commit 3b50139f68
Author: chrislu <chris.lu@gmail.com>
Date:   Fri Sep 15 22:22:32 2023 -0700

    comments

commit 7f685ce7ba
Author: chrislu <chris.lu@gmail.com>
Date:   Fri Sep 15 22:20:05 2023 -0700

    adjust APIs

commit 436d99443b
Author: chrislu <chris.lu@gmail.com>
Date:   Thu Sep 14 23:49:05 2023 -0700

    receive broker stats

commit b771fefa37
Merge: 0a851ec00 890881037
Author: chrislu <chris.lu@gmail.com>
Date:   Wed Sep 13 00:03:47 2023 -0700

    Merge branch 'master' into sub

commit 0a851ec00b
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Sep 10 22:01:25 2023 -0700

    Create balancer.go

commit 39941edc0b
Author: chrislu <chris.lu@gmail.com>
Date:   Thu Sep 7 23:55:19 2023 -0700

    add publisher shutdown

commit 875f562779
Author: chrislu <chris.lu@gmail.com>
Date:   Wed Sep 6 23:16:41 2023 -0700

    server side send response at least once per second

commit 984b6c54cf
Author: chrislu <chris.lu@gmail.com>
Date:   Wed Sep 6 23:15:29 2023 -0700

    ack interval 128

commit 2492a45499
Author: chrislu <chris.lu@gmail.com>
Date:   Wed Sep 6 22:39:46 2023 -0700

    ack interval

commit ba67e6ca29
Author: chrislu <chris.lu@gmail.com>
Date:   Mon Sep 4 21:43:50 2023 -0700

    api for sub

commit 9e4f985698
Author: chrislu <chris.lu@gmail.com>
Date:   Mon Sep 4 21:43:30 2023 -0700

    publish, benchmark

commit cb470d44df
Author: chrislu <chris.lu@gmail.com>
Date:   Fri Sep 1 00:36:51 2023 -0700

    can pub and sub

commit 1eb2da46d5
Author: chrislu <chris.lu@gmail.com>
Date:   Mon Aug 28 09:02:12 2023 -0700

    connect and publish

commit 504ae8383a
Author: chrislu <chris.lu@gmail.com>
Date:   Mon Aug 28 09:01:25 2023 -0700

    protoc version

commit dbcba75271
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 18:59:04 2023 -0700

    rename to lookup

commit c9caf33119
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 18:33:46 2023 -0700

    move functions

commit 4d6c18d86f
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 17:50:59 2023 -0700

    pub sub initial tests

commit 4eb8e8624d
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 13:14:39 2023 -0700

    rename

commit 1990456670
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 13:13:14 2023 -0700

    sub

commit 905911853d
Author: chrislu <chris.lu@gmail.com>
Date:   Sat Aug 26 13:39:21 2023 -0700

    adjust proto
2023-09-16 15:06:16 -07:00

189 lines
6.4 KiB
Go

package broker
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync/atomic"
"time"
)
// For a new or re-configured topic, or one of the broker went offline,
// the pub clients ask one broker what are the brokers for all the topic partitions.
// The broker will lock the topic on write.
// 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
// 2. if the topic is found, return the brokers for the topic partitions
// For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
// The broker will lock the topic on read.
// 1. if the topic is not found, return error
// 2. if the topic is found, return the brokers for the topic partitions
//
// If the topic needs to be re-balanced, the admin client will lock the topic,
// 1. collect throughput information for all the brokers
// 2. adjust the topic partitions to the brokers
// 3. notify the brokers to add/remove partitions to host
// 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
// 4. the brokers will stop process incoming messages if not the right partition
// 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
// 4.2 the sub clients will need to change the brokers to read from
//
// The following is from each individual component's perspective:
// For a pub client
// For current topic/partition, ask one broker for the brokers for the topic partitions
// 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
// For a sub client
// For current topic/partition, ask one broker for the brokers for the topic partitions
// 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
// For a broker
// Upon a pub client lookup:
// 1. lock the topic
// 2. if already has topic partition assignment, check all brokers are healthy
// 3. if not, create topic partition assignment
// 2. return the brokers for the topic partitions
// 3. unlock the topic
// Upon a sub client lookup:
// 1. lock the topic
// 2. if already has topic partition assignment, check all brokers are healthy
// 3. if not, return error
// 2. return the brokers for the topic partitions
// 3. unlock the topic
// For an admin tool
// 0. collect stats from all the brokers, and find the topic worth moving
// 1. lock the topic
// 2. collect throughput information for all the brokers
// 3. adjust the topic partitions to the brokers
// 4. notify the brokers to add/remove partitions to host
// 5. the brokers will stop process incoming messages if not the right partition
// 6. unlock the topic
/*
The messages are buffered in memory, and saved to filer under
/topics/<topic>/<date>/<hour>/<segment>/*.msg
/topics/<topic>/<date>/<hour>/segment
/topics/<topic>/info/segment_<id>.meta
*/
func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
// 1. write to the volume server
// 2. find the topic metadata owning filer
// 3. write to the filer
var localTopicPartition *topic.LocalPartition
req, err := stream.Recv()
if err != nil {
return err
}
response := &mq_pb.PublishResponse{}
// TODO check whether current broker should be the leader for the topic partition
ackInterval := 1
initMessage := req.GetInit()
if initMessage != nil {
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
}
ackInterval = int(initMessage.AckInterval)
stream.Send(response)
} else {
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
return stream.Send(response)
}
ackCounter := 0
var ackSequence int64
var isStopping int32
respChan := make(chan *mq_pb.PublishResponse, 128)
defer func() {
atomic.StoreInt32(&isStopping, 1)
close(respChan)
}()
go func() {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case resp := <-respChan:
if resp != nil {
if err := stream.Send(resp); err != nil {
glog.Errorf("Error sending response %v: %v", resp, err)
}
} else {
return
}
case <-ticker.C:
if atomic.LoadInt32(&isStopping) == 0 {
response := &mq_pb.PublishResponse{
AckSequence: ackSequence,
}
respChan <- response
} else {
return
}
}
}
}()
// process each published messages
for {
// receive a message
req, err := stream.Recv()
if err != nil {
return err
}
// Process the received message
if dataMessage := req.GetData(); dataMessage != nil {
localTopicPartition.Publish(dataMessage)
}
ackCounter++
ackSequence++
if ackCounter >= ackInterval {
ackCounter = 0
// send back the ack
response := &mq_pb.PublishResponse{
AckSequence: ackSequence,
}
respChan <- response
}
}
glog.Infof("publish stream closed")
return nil
}
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
ret := &mq_pb.AssignTopicPartitionsResponse{}
self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
for _, brokerPartition := range request.BrokerPartitionAssignments {
localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
broker.localTopicManager.AddTopicPartition(
topic.FromPbTopic(request.Topic),
localPartiton)
if request.IsLeader {
for _, follower := range localPartiton.FollowerBrokers {
err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.AssignTopicPartitions(context.Background(), request)
return err
})
if err != nil {
return ret, err
}
}
}
}
return ret, nil
}