seaweedfs/weed/mq/broker/broker_stats.go
Chris Lu 580940bf82
Merge accumulated changes related to message queue (#5098)
* balance partitions on brokers

* prepare topic partition first and then publish, move partition

* purge unused APIs

* clean up

* adjust logs

* add BalanceTopics() grpc API

* configure topic

* configure topic command

* refactor

* repair missing partitions

* sequence of operations to ensure ordering

* proto to close publishers and consumers

* rename file

* topic partition versioned by unixTimeNs

* create local topic partition

* close publishers

* randomize the client name

* wait until no publishers

* logs

* close stop publisher channel

* send last ack

* comments

* comment

* comments

* support list of brokers

* add cli options

* Update .gitignore

* logs

* return io.eof directly

* refactor

* optionally create topic

* refactoring

* detect consumer disconnection

* sub client wait for more messages

* subscribe by time stamp

* rename

* rename to sub_balancer

* rename

* adjust comments

* rename

* fix compilation

* rename

* rename

* SubscriberToSubCoordinator

* sticky rebalance

* go fmt

* add tests

* balance partitions on brokers

* prepare topic partition first and then publish, move partition

* purge unused APIs

* clean up

* adjust logs

* add BalanceTopics() grpc API

* configure topic

* configure topic command

* refactor

* repair missing partitions

* sequence of operations to ensure ordering

* proto to close publishers and consumers

* rename file

* topic partition versioned by unixTimeNs

* create local topic partition

* close publishers

* randomize the client name

* wait until no publishers

* logs

* close stop publisher channel

* send last ack

* comments

* comment

* comments

* support list of brokers

* add cli options

* Update .gitignore

* logs

* return io.eof directly

* refactor

* optionally create topic

* refactoring

* detect consumer disconnection

* sub client wait for more messages

* subscribe by time stamp

* rename

* rename to sub_balancer

* rename

* adjust comments

* rename

* fix compilation

* rename

* rename

* SubscriberToSubCoordinator

* sticky rebalance

* go fmt

* add tests

* tracking topic=>broker

* merge

* comment
2023-12-11 12:05:54 -08:00

81 lines
2.1 KiB
Go

package broker
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"math/rand"
"time"
)
// BrokerConnectToBalancer connects to the broker balancer and sends stats
func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
// find the lock owner
var brokerBalancer string
err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
Name: pub_balancer.LockBrokerBalancer,
})
if err != nil {
return err
}
brokerBalancer = resp.Owner
return nil
})
if err != nil {
return err
}
b.currentBalancer = pb.ServerAddress(brokerBalancer)
glog.V(0).Infof("broker %s found balancer %s", self, brokerBalancer)
if brokerBalancer == "" {
return fmt.Errorf("no balancer found")
}
// connect to the lock owner
err = pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
stream, err := client.PublisherToPubBalancer(context.Background())
if err != nil {
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
}
defer stream.CloseSend()
err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
Message: &mq_pb.PublisherToPubBalancerRequest_Init{
Init: &mq_pb.PublisherToPubBalancerRequest_InitMessage{
Broker: self,
},
},
})
if err != nil {
return fmt.Errorf("send init message: %v", err)
}
for {
stats := b.localTopicManager.CollectStats(time.Second * 5)
err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
Message: &mq_pb.PublisherToPubBalancerRequest_Stats{
Stats: stats,
},
})
if err != nil {
if err == io.EOF {
return err
}
return fmt.Errorf("send stats message: %v", err)
}
glog.V(3).Infof("sent stats: %+v", stats)
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
}
return nil
})
return err
}