diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go index f83bcd08b..7aefa6b86 100644 --- a/weed/mq/client/publish_stream_processor.go +++ b/weed/mq/client/publish_stream_processor.go @@ -80,12 +80,12 @@ func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMe } builder := <-p.builders - bb := segment.NewMessageBatchBuilder(builder, p.ProducerId, p.ProducerEpoch, 3, 4) + bb := segment.NewMessageBatchBuilder(builder) for _, m := range messages { bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content) p.messagesSequence++ } - bb.BuildMessageBatch() + bb.BuildMessageBatch(p.ProducerId, p.ProducerEpoch, 3, 4) defer func() { p.builders <- builder }() diff --git a/weed/mq/messages/message_buffer.go b/weed/mq/messages/message_buffer.go new file mode 100644 index 000000000..9e61d0dfb --- /dev/null +++ b/weed/mq/messages/message_buffer.go @@ -0,0 +1,53 @@ +package messages + +import ( + flatbuffers "github.com/google/flatbuffers/go" + "github.com/seaweedfs/seaweedfs/weed/mq/segment" +) + +type MessageBuffer struct { + fbsBuffer *flatbuffers.Builder + sequenceBase int64 + counter int64 + bb *segment.MessageBatchBuilder + isSealed bool +} + +func NewMessageBuffer() *MessageBuffer { + t := &MessageBuffer{ + fbsBuffer: flatbuffers.NewBuilder(4 * 1024 * 1024), + } + t.bb = segment.NewMessageBatchBuilder(t.fbsBuffer) + return t +} + +func (mb *MessageBuffer) Reset(sequenceBase int64) { + mb.sequenceBase = sequenceBase + mb.counter = 0 + mb.bb.Reset() +} + +func (mb *MessageBuffer) AddMessage(message *Message) { + mb.bb.AddMessage(mb.sequenceBase, message.Ts.UnixMilli(), message.Properties, message.Key, message.Content) + mb.sequenceBase++ + mb.counter++ +} + +func (mb *MessageBuffer) Len() int { + return int(mb.counter) +} + +func (mb *MessageBuffer) Seal(producerId int32, + producerEpoch int32, + segmentId int32, + flags int32) { + mb.isSealed = true + mb.bb.BuildMessageBatch(producerId, producerEpoch, segmentId, flags) +} + +func (mb *MessageBuffer) Bytes() []byte { + if !mb.isSealed { + return nil + } + return mb.bb.GetBytes() +} diff --git a/weed/mq/messages/message_buffer_mover.go b/weed/mq/messages/message_buffer_mover.go new file mode 100644 index 000000000..fad14056f --- /dev/null +++ b/weed/mq/messages/message_buffer_mover.go @@ -0,0 +1,32 @@ +package messages + +import "fmt" + +type MessageBufferMover interface { + Setup() + TearDown() + MoveBuffer(buffer *MessageBuffer) (MessageBufferReference, error) // should be thread-safe +} +type MessageBufferReference struct { + sequence int64 + fileId string +} + +var _ = MessageBufferMover(&EmptyMover{}) + +type EmptyMover struct { +} + +func (e EmptyMover) Setup() { +} + +func (e EmptyMover) TearDown() { +} + +func (e EmptyMover) MoveBuffer(buffer *MessageBuffer) (MessageBufferReference, error) { + println("moving", buffer.sequenceBase) + return MessageBufferReference{ + sequence: buffer.sequenceBase, + fileId: fmt.Sprintf("buffer %d", buffer.sequenceBase), + }, nil +} diff --git a/weed/mq/messages/message_pipeline.go b/weed/mq/messages/message_pipeline.go new file mode 100644 index 000000000..d8e3a85b8 --- /dev/null +++ b/weed/mq/messages/message_pipeline.go @@ -0,0 +1,161 @@ +package messages + +import ( + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "log" + "sync" + "sync/atomic" + "time" +) + +type OnMessageFunc func(message *Message) + +type MessagePipeline struct { + // atomic status + atomicPipelineStatus int64 // -1: stop + + // attributes + ProducerId int32 + ProducerEpoch int32 + grpcDialOption grpc.DialOption + + emptyBuffersChan chan *MessageBuffer + sealedBuffersChan chan *MessageBuffer + movedBuffersChan chan MessageBufferReference + onMessageFn OnMessageFunc + mover MessageBufferMover + moverPool *util.LimitedConcurrentExecutor + + // control pipeline + doneChan chan struct{} + batchSize int + timeout time.Duration + + incomingMessageLock sync.Mutex + incomingMessageBuffer *MessageBuffer + + messageSequence int64 +} + +func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeout time.Duration, mover MessageBufferMover) *MessagePipeline { + t := &MessagePipeline{ + ProducerId: producerId, + emptyBuffersChan: make(chan *MessageBuffer, workerCount), + sealedBuffersChan: make(chan *MessageBuffer, workerCount), + movedBuffersChan: make(chan MessageBufferReference, workerCount), + doneChan: make(chan struct{}), + batchSize: batchSize, + timeout: timeout, + moverPool: util.NewLimitedConcurrentExecutor(workerCount), + mover: mover, + } + go t.doLoopUpload() + return t +} + +func (mp *MessagePipeline) OutputChan() chan MessageBufferReference { + return mp.movedBuffersChan +} + +func (mp *MessagePipeline) AddMessage(message *Message) { + mp.incomingMessageLock.Lock() + defer mp.incomingMessageLock.Unlock() + + // get existing message buffer or create a new one + if mp.incomingMessageBuffer == nil { + select { + case mp.incomingMessageBuffer = <-mp.emptyBuffersChan: + default: + mp.incomingMessageBuffer = NewMessageBuffer() + } + mp.incomingMessageBuffer.Reset(mp.messageSequence) + } + + // add one message + mp.incomingMessageBuffer.AddMessage(message) + mp.messageSequence++ + + // seal the message buffer if full + if mp.incomingMessageBuffer.Len() >= mp.batchSize { + mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0) + mp.sealedBuffersChan <- mp.incomingMessageBuffer + mp.incomingMessageBuffer = nil + } +} + +func (mp *MessagePipeline) doLoopUpload() { + + mp.mover.Setup() + defer mp.mover.TearDown() + + ticker := time.NewTicker(mp.timeout) + for { + status := atomic.LoadInt64(&mp.atomicPipelineStatus) + if status == -100 { + return + } else if status == -1 { + // entering shutting down mode + atomic.StoreInt64(&mp.atomicPipelineStatus, -2) + mp.incomingMessageLock.Lock() + mp.doFlushIncomingMessages() + mp.incomingMessageLock.Unlock() + } + + select { + case messageBuffer := <-mp.sealedBuffersChan: + ticker.Reset(mp.timeout) + mp.moverPool.Execute(func() { + util.RetryForever("message mover", func() error { + if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil { + return flushErr + } else { + mp.movedBuffersChan <- messageReference + } + return nil + }, func(err error) (shouldContinue bool) { + log.Printf("failed: %v", err) + return true + }) + }) + case <-ticker.C: + if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 { + atomic.StoreInt64(&mp.atomicPipelineStatus, -100) + return + } + mp.incomingMessageLock.Lock() + mp.doFlushIncomingMessages() + mp.incomingMessageLock.Unlock() + } + } + + atomic.StoreInt64(&mp.atomicPipelineStatus, -100) + close(mp.movedBuffersChan) + +} + +func (mp *MessagePipeline) doFlushIncomingMessages() { + if mp.incomingMessageBuffer == nil || mp.incomingMessageBuffer.Len() == 0 { + return + } + mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0) + mp.sealedBuffersChan <- mp.incomingMessageBuffer + mp.incomingMessageBuffer = nil +} + +func (mp *MessagePipeline) ShutdownStart() { + if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 { + atomic.StoreInt64(&mp.atomicPipelineStatus, -1) + } +} +func (mp *MessagePipeline) ShutdownWait() { + for atomic.LoadInt64(&mp.atomicPipelineStatus) != -100 { + time.Sleep(331 * time.Millisecond) + } +} + +func (mp *MessagePipeline) ShutdownImmediate() { + if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 { + atomic.StoreInt64(&mp.atomicPipelineStatus, -100) + } +} diff --git a/weed/mq/messages/message_pipeline_test.go b/weed/mq/messages/message_pipeline_test.go new file mode 100644 index 000000000..aaa25e0fd --- /dev/null +++ b/weed/mq/messages/message_pipeline_test.go @@ -0,0 +1,29 @@ +package messages + +import ( + "testing" + "time" +) + +func TestAddMessage(t *testing.T) { + mp := NewMessagePipeline(0, 3, 10, time.Second, &EmptyMover{}) + go func() { + outChan := mp.OutputChan() + for mr := range outChan { + println(mr.sequence, mr.fileId) + } + }() + + for i := 0; i < 100; i++ { + message := &Message{ + Key: []byte("key"), + Content: []byte("data"), + Properties: nil, + Ts: time.Now(), + } + mp.AddMessage(message) + } + + mp.ShutdownStart() + mp.ShutdownWait() +} diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go index ee55b18a7..bb979a2a8 100644 --- a/weed/mq/segment/message_serde.go +++ b/weed/mq/segment/message_serde.go @@ -7,10 +7,6 @@ import ( type MessageBatchBuilder struct { b *flatbuffers.Builder - producerId int32 - producerEpoch int32 - segmentId int32 - flags int32 messageOffsets []flatbuffers.UOffsetT segmentSeqBase int64 segmentSeqLast int64 @@ -18,23 +14,19 @@ type MessageBatchBuilder struct { tsMsLast int64 } -func NewMessageBatchBuilder(b *flatbuffers.Builder, - producerId int32, - producerEpoch int32, - segmentId int32, - flags int32) *MessageBatchBuilder { +func NewMessageBatchBuilder(b *flatbuffers.Builder) *MessageBatchBuilder { b.Reset() return &MessageBatchBuilder{ - b: b, - producerId: producerId, - producerEpoch: producerEpoch, - segmentId: segmentId, - flags: flags, + b: b, } } +func (builder *MessageBatchBuilder) Reset() { + builder.b.Reset() +} + func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string]string, key []byte, value []byte) { if builder.segmentSeqBase == 0 { builder.segmentSeqBase = segmentSeq @@ -80,7 +72,10 @@ func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, pro } -func (builder *MessageBatchBuilder) BuildMessageBatch() { +func (builder *MessageBatchBuilder) BuildMessageBatch(producerId int32, + producerEpoch int32, + segmentId int32, + flags int32) { message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets)) for i := len(builder.messageOffsets) - 1; i >= 0; i-- { builder.b.PrependUOffsetT(builder.messageOffsets[i]) @@ -88,10 +83,10 @@ func (builder *MessageBatchBuilder) BuildMessageBatch() { messagesOffset := builder.b.EndVector(len(builder.messageOffsets)) message_fbs.MessageBatchStart(builder.b) - message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId) - message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch) - message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId) - message_fbs.MessageBatchAddFlags(builder.b, builder.flags) + message_fbs.MessageBatchAddProducerId(builder.b, producerId) + message_fbs.MessageBatchAddProducerEpoch(builder.b, producerEpoch) + message_fbs.MessageBatchAddSegmentId(builder.b, segmentId) + message_fbs.MessageBatchAddFlags(builder.b, flags) message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase) message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase)) message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase) diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go index 30c3d784a..8849b393b 100644 --- a/weed/mq/segment/message_serde_test.go +++ b/weed/mq/segment/message_serde_test.go @@ -14,11 +14,11 @@ func TestMessageSerde(t *testing.T) { prop["n1"] = "v1" prop["n2"] = "v2" - bb := NewMessageBatchBuilder(b, 1, 2, 3, 4) + bb := NewMessageBatchBuilder(b) bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here")) - bb.BuildMessageBatch() + bb.BuildMessageBatch(1, 2, 3, 4) buf := bb.GetBytes()