seaweedfs/weed/messaging/msgclient/chan_sub.go

86 lines
1.6 KiB
Go
Raw Permalink Normal View History

2020-05-09 07:31:34 +00:00
package msgclient
import (
2020-05-17 18:10:45 +00:00
"context"
2020-05-09 07:43:53 +00:00
"crypto/md5"
"hash"
2020-05-09 07:31:34 +00:00
"io"
"log"
"time"
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
type SubChannel struct {
2020-05-09 07:43:53 +00:00
ch chan []byte
stream messaging_pb.SeaweedMessaging_SubscribeClient
md5hash hash.Hash
2020-05-17 18:10:45 +00:00
cancel context.CancelFunc
2020-05-09 07:31:34 +00:00
}
func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
2020-05-09 07:31:34 +00:00
tp := broker.TopicPartition{
Namespace: "chan",
Topic: chanName,
Partition: 0,
}
grpcConnection, err := mc.findBroker(tp)
if err != nil {
return nil, err
}
2020-05-17 18:10:45 +00:00
ctx, cancel := context.WithCancel(context.Background())
sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
2020-05-09 07:31:34 +00:00
if err != nil {
return nil, err
}
t := &SubChannel{
2020-05-09 07:43:53 +00:00
ch: make(chan []byte),
stream: sc,
md5hash: md5.New(),
2020-05-17 18:10:45 +00:00
cancel: cancel,
2020-05-09 07:31:34 +00:00
}
go func() {
for {
resp, subErr := t.stream.Recv()
if subErr == io.EOF {
return
}
if subErr != nil {
log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
return
}
if resp.Data == nil {
// this could be heartbeat from broker
continue
}
2020-05-09 07:31:34 +00:00
if resp.Data.IsClose {
t.stream.Send(&messaging_pb.SubscriberMessage{
IsClose: true,
})
close(t.ch)
2020-05-17 18:10:45 +00:00
cancel()
2020-05-09 07:31:34 +00:00
return
}
t.ch <- resp.Data.Value
t.md5hash.Write(resp.Data.Value)
2020-05-09 07:31:34 +00:00
}
}()
return t, nil
}
func (sc *SubChannel) Channel() chan []byte {
return sc.ch
}
2020-05-09 07:43:53 +00:00
func (sc *SubChannel) Md5() []byte {
return sc.md5hash.Sum(nil)
}
2020-05-17 18:10:45 +00:00
func (sc *SubChannel) Cancel() {
sc.cancel()
}