seaweedfs/weed/mq/client/sub_client/subscriber.go

51 lines
1.3 KiB
Go
Raw Normal View History

2023-09-01 07:36:51 +00:00
package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
)
type SubscriberConfiguration struct {
2023-10-02 08:02:27 +00:00
ClientId string
GroupId string
GroupInstanceId string
GroupMinimumPeers int32
GroupMaximumPeers int32
BootstrapServers []string
GrpcDialOption grpc.DialOption
2023-09-01 07:36:51 +00:00
}
2023-09-05 04:43:50 +00:00
type ContentConfiguration struct {
Namespace string
Topic string
Filter string
}
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
type OnCompletionFunc func()
2023-09-01 07:36:51 +00:00
type TopicSubscriber struct {
2023-09-05 04:43:50 +00:00
SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration
2023-09-01 07:36:51 +00:00
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
2023-09-05 04:43:50 +00:00
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
2023-10-01 18:59:19 +00:00
bootstrapBroker string
2023-09-01 07:36:51 +00:00
}
2023-10-01 18:59:19 +00:00
func NewTopicSubscriber(bootstrapBroker string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
2023-09-01 07:36:51 +00:00
return &TopicSubscriber{
2023-09-05 04:43:50 +00:00
SubscriberConfig: subscriber,
ContentConfig: content,
2023-10-01 18:59:19 +00:00
bootstrapBroker: bootstrapBroker,
2023-09-01 07:36:51 +00:00
}
}
2023-09-05 04:43:50 +00:00
func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
sub.OnEachMessageFunc = onEachMessageFn
}
func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) {
sub.OnCompletionFunc = onCompeletionFn
}