message broker: read also from sealed memory buffer

This commit is contained in:
Chris Lu 2020-04-20 17:26:38 -07:00
parent bd43c62fbd
commit 4bf959edf0
3 changed files with 22 additions and 2 deletions

View file

@ -122,6 +122,11 @@ func (m *LogBuffer) loopInterval() {
m.Unlock() m.Unlock()
m.flushChan <- toFlush m.flushChan <- toFlush
time.Sleep(m.flushInterval) time.Sleep(m.flushInterval)
if m.notifyFn != nil {
// check whether blocked clients are already disconnected
println("notifying log buffer readers")
m.notifyFn()
}
} }
} }

View file

@ -19,7 +19,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
startTime := time.Now() startTime := time.Now()
messageSize := 1024 messageSize := 1024
messageCount := 100 messageCount := 5000
var buf = make([]byte, messageSize) var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ { for i := 0; i < messageCount; i++ {
rand.Read(buf) rand.Read(buf)

View file

@ -4,6 +4,7 @@ import "time"
type MemBuffer struct { type MemBuffer struct {
buf []byte buf []byte
size int
startTime time.Time startTime time.Time
stopTime time.Time stopTime time.Time
} }
@ -25,16 +26,30 @@ func newSealedBuffers(size int) *SealedBuffers {
return sbs return sbs
} }
func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte) (newBuf []byte) { func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) {
oldMemBuffer := sbs.buffers[0] oldMemBuffer := sbs.buffers[0]
size := len(sbs.buffers) size := len(sbs.buffers)
for i := 0; i < size-1; i++ { for i := 0; i < size-1; i++ {
sbs.buffers[i].buf = sbs.buffers[i+1].buf sbs.buffers[i].buf = sbs.buffers[i+1].buf
sbs.buffers[i].size = sbs.buffers[i+1].size
sbs.buffers[i].startTime = sbs.buffers[i+1].startTime sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
} }
sbs.buffers[size-1].buf = buf sbs.buffers[size-1].buf = buf
sbs.buffers[size-1].size = pos
sbs.buffers[size-1].startTime = startTime sbs.buffers[size-1].startTime = startTime
sbs.buffers[size-1].stopTime = stopTime sbs.buffers[size-1].stopTime = stopTime
return oldMemBuffer.buf return oldMemBuffer.buf
} }
func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
lastReadTs := lastReadTime.UnixNano()
for pos < len(mb.buf) {
size, t := readTs(mb.buf, pos)
if t > lastReadTs {
return
}
pos += size + 4
}
return len(mb.buf)
}