From b16d47030e06d1216247978c7446bdf4f53cfe75 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 23 Sep 2018 01:34:40 -0700 Subject: [PATCH] save and use last processed kafka offsets --- weed/command/scaffold.go | 2 + weed/replication/notification_kafka.go | 79 +++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 1f76cbfea..0df82853c 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -166,6 +166,8 @@ hosts = [ "localhost:9092" ] topic = "seaweedfs_filer1_to_filer2" +offsetFile = "./last.offset" +offsetSaveIntervalSeconds = 10 [sink.filer] enabled = true diff --git a/weed/replication/notification_kafka.go b/weed/replication/notification_kafka.go index d10175757..87f28f738 100644 --- a/weed/replication/notification_kafka.go +++ b/weed/replication/notification_kafka.go @@ -8,6 +8,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" + "io/ioutil" + "encoding/json" + "sync" + "time" ) func init() { @@ -30,10 +34,12 @@ func (k *KafkaInput) Initialize(configuration util.Configuration) error { return k.initialize( configuration.GetStringSlice("hosts"), configuration.GetString("topic"), + configuration.GetString("offsetFile"), + configuration.GetInt("offsetSaveIntervalSeconds"), ) } -func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { +func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int) (err error) { config := sarama.NewConfig() config.Consumer.Return.Errors = true k.consumer, err = sarama.NewConsumer(hosts, config) @@ -51,8 +57,25 @@ func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { panic(err) } + progress := loadProgress(offsetFile) + if progress == nil || progress.Topic != topic { + progress = &KafkaProgress{ + Topic: topic, + PartitionOffsets: make(map[int32]int64), + } + } + progress.lastSaveTime = time.Now() + progress.offsetFile = offsetFile + progress.offsetSaveIntervalSeconds = offsetSaveIntervalSeconds + for _, partition := range partitions { - partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + offset, found := progress.PartitionOffsets[partition] + if !found { + offset = sarama.OffsetOldest + } else { + offset += 1 + } + partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, offset) if err != nil { panic(err) } @@ -63,6 +86,9 @@ func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { fmt.Println(err) case msg := <-partitionConsumer.Messages(): k.messageChan <- msg + if err := progress.setOffset(msg.Partition, msg.Offset); err != nil { + glog.Warningf("set kafka offset: %v", err) + } } } }() @@ -81,3 +107,52 @@ func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotifi return } + +type KafkaProgress struct { + Topic string `json:"topic"` + PartitionOffsets map[int32]int64 `json:"partitionOffsets"` + offsetFile string + lastSaveTime time.Time + offsetSaveIntervalSeconds int + sync.Mutex +} + +func loadProgress(offsetFile string) *KafkaProgress { + progress := &KafkaProgress{} + data, err := ioutil.ReadFile(offsetFile) + if err != nil { + glog.Warningf("failed to read kafka progress file: %s", offsetFile) + return nil + } + err = json.Unmarshal(data, progress) + if err != nil { + glog.Warningf("failed to read kafka progress message: %s", string(data)) + return nil + } + return progress +} + +func (progress *KafkaProgress) saveProgress() error { + data, err := json.Marshal(progress) + if err != nil { + return fmt.Errorf("failed to marshal progress: %v", err) + } + err = ioutil.WriteFile(progress.offsetFile, data, 0640) + if err != nil { + return fmt.Errorf("failed to save progress to %s: %v", progress.offsetFile, err) + } + + progress.lastSaveTime = time.Now() + return nil +} + +func (progress *KafkaProgress) setOffset(parition int32, offset int64) error { + progress.Lock() + defer progress.Unlock() + + progress.PartitionOffsets[parition] = offset + if int(time.Now().Sub(progress.lastSaveTime).Seconds()) > progress.offsetSaveIntervalSeconds { + return progress.saveProgress() + } + return nil +}