add additional buffers for logs

This commit is contained in:
Chris Lu 2020-04-11 02:19:45 -07:00
parent 7e7ed767d9
commit 417125457e
2 changed files with 47 additions and 1 deletions

View file

@ -11,6 +11,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
const BufferSize = 4 * 1024 * 1024
const PreviousBufferCount = 3
type dataToFlush struct { type dataToFlush struct {
startTime time.Time startTime time.Time
stopTime time.Time stopTime time.Time
@ -18,6 +21,7 @@ type dataToFlush struct {
} }
type LogBuffer struct { type LogBuffer struct {
prevBuffers *SealedBuffers
buf []byte buf []byte
idx []int idx []int
pos int pos int
@ -34,7 +38,8 @@ type LogBuffer struct {
func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer {
lb := &LogBuffer{ lb := &LogBuffer{
buf: make([]byte, 4*1024*1024), prevBuffers: newSealedBuffers(PreviousBufferCount),
buf: make([]byte, BufferSize),
sizeBuf: make([]byte, 4), sizeBuf: make([]byte, 4),
flushInterval: flushInterval, flushInterval: flushInterval,
flushFn: flushFn, flushFn: flushFn,
@ -127,6 +132,7 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
stopTime: m.stopTime, stopTime: m.stopTime,
data: copiedBytes(m.buf[:m.pos]), data: copiedBytes(m.buf[:m.pos]),
} }
m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf)
m.pos = 0 m.pos = 0
m.idx = m.idx[:0] m.idx = m.idx[:0]
return d return d

View file

@ -0,0 +1,40 @@
package queue
import "time"
type MemBuffer struct {
buf []byte
startTime time.Time
stopTime time.Time
}
type SealedBuffers struct {
buffers []*MemBuffer
}
func newSealedBuffers(size int) *SealedBuffers {
sbs := &SealedBuffers{}
sbs.buffers = make([]*MemBuffer, size)
for i := 0; i < size; i++ {
sbs.buffers[i] = &MemBuffer{
buf: make([]byte, BufferSize),
}
}
return sbs
}
func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte) (newBuf []byte) {
oldMemBuffer := sbs.buffers[0]
size := len(sbs.buffers)
for i := 0; i < size-1; i++ {
sbs.buffers[i].buf = sbs.buffers[i+1].buf
sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
}
sbs.buffers[size-1].buf = buf
sbs.buffers[size-1].startTime = startTime
sbs.buffers[size-1].stopTime = stopTime
return oldMemBuffer.buf
}