seaweedfs/weed/mq/messages/message_pipeline.go
2022-09-25 11:19:05 -07:00

162 lines
4.2 KiB
Go

package messages
import (
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"log"
"sync"
"sync/atomic"
"time"
)
type OnMessageFunc func(message *Message)
type MessagePipeline struct {
// atomic status
atomicPipelineStatus int64 // -1: stop
// attributes
ProducerId int32
ProducerEpoch int32
grpcDialOption grpc.DialOption
emptyBuffersChan chan *MessageBuffer
sealedBuffersChan chan *MessageBuffer
movedBuffersChan chan MessageBufferReference
onMessageFn OnMessageFunc
mover MessageBufferMover
moverPool *util.LimitedConcurrentExecutor
// control pipeline
doneChan chan struct{}
batchSize int
timeout time.Duration
incomingMessageLock sync.Mutex
incomingMessageBuffer *MessageBuffer
messageSequence int64
}
func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeout time.Duration, mover MessageBufferMover) *MessagePipeline {
t := &MessagePipeline{
ProducerId: producerId,
emptyBuffersChan: make(chan *MessageBuffer, workerCount),
sealedBuffersChan: make(chan *MessageBuffer, workerCount),
movedBuffersChan: make(chan MessageBufferReference, workerCount),
doneChan: make(chan struct{}),
batchSize: batchSize,
timeout: timeout,
moverPool: util.NewLimitedConcurrentExecutor(workerCount),
mover: mover,
}
go t.doLoopUpload()
return t
}
func (mp *MessagePipeline) OutputChan() chan MessageBufferReference {
return mp.movedBuffersChan
}
func (mp *MessagePipeline) AddMessage(message *Message) {
mp.incomingMessageLock.Lock()
defer mp.incomingMessageLock.Unlock()
// get existing message buffer or create a new one
if mp.incomingMessageBuffer == nil {
select {
case mp.incomingMessageBuffer = <-mp.emptyBuffersChan:
default:
mp.incomingMessageBuffer = NewMessageBuffer()
}
mp.incomingMessageBuffer.Reset(mp.messageSequence)
}
// add one message
mp.incomingMessageBuffer.AddMessage(message)
mp.messageSequence++
// seal the message buffer if full
if mp.incomingMessageBuffer.Len() >= mp.batchSize {
mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0)
mp.sealedBuffersChan <- mp.incomingMessageBuffer
mp.incomingMessageBuffer = nil
}
}
func (mp *MessagePipeline) doLoopUpload() {
mp.mover.Setup()
defer mp.mover.TearDown()
ticker := time.NewTicker(mp.timeout)
for {
status := atomic.LoadInt64(&mp.atomicPipelineStatus)
if status == -100 {
return
} else if status == -1 {
// entering shutting down mode
atomic.StoreInt64(&mp.atomicPipelineStatus, -2)
mp.incomingMessageLock.Lock()
mp.doFlushIncomingMessages()
mp.incomingMessageLock.Unlock()
}
select {
case messageBuffer := <-mp.sealedBuffersChan:
ticker.Reset(mp.timeout)
mp.moverPool.Execute(func() {
util.RetryForever("message mover", func() error {
if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil {
return flushErr
} else {
mp.movedBuffersChan <- messageReference
}
return nil
}, func(err error) (shouldContinue bool) {
log.Printf("failed: %v", err)
return true
})
})
case <-ticker.C:
if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 {
atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
return
}
mp.incomingMessageLock.Lock()
mp.doFlushIncomingMessages()
mp.incomingMessageLock.Unlock()
}
}
atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
close(mp.movedBuffersChan)
}
func (mp *MessagePipeline) doFlushIncomingMessages() {
if mp.incomingMessageBuffer == nil || mp.incomingMessageBuffer.Len() == 0 {
return
}
mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0)
mp.sealedBuffersChan <- mp.incomingMessageBuffer
mp.incomingMessageBuffer = nil
}
func (mp *MessagePipeline) ShutdownStart() {
if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 {
atomic.StoreInt64(&mp.atomicPipelineStatus, -1)
}
}
func (mp *MessagePipeline) ShutdownWait() {
for atomic.LoadInt64(&mp.atomicPipelineStatus) != -100 {
time.Sleep(331 * time.Millisecond)
}
}
func (mp *MessagePipeline) ShutdownImmediate() {
if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 {
atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
}
}