diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go index f24429479..78c5b75ae 100644 --- a/weed/queue/log_buffer.go +++ b/weed/queue/log_buffer.go @@ -11,6 +11,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +const BufferSize = 4 * 1024 * 1024 +const PreviousBufferCount = 3 + type dataToFlush struct { startTime time.Time stopTime time.Time @@ -18,6 +21,7 @@ type dataToFlush struct { } type LogBuffer struct { + prevBuffers *SealedBuffers buf []byte idx []int pos int @@ -34,7 +38,8 @@ type LogBuffer struct { func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { lb := &LogBuffer{ - buf: make([]byte, 4*1024*1024), + prevBuffers: newSealedBuffers(PreviousBufferCount), + buf: make([]byte, BufferSize), sizeBuf: make([]byte, 4), flushInterval: flushInterval, flushFn: flushFn, @@ -127,6 +132,7 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { stopTime: m.stopTime, data: copiedBytes(m.buf[:m.pos]), } + m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf) m.pos = 0 m.idx = m.idx[:0] return d diff --git a/weed/queue/sealed_buffer.go b/weed/queue/sealed_buffer.go new file mode 100644 index 000000000..23cec92c9 --- /dev/null +++ b/weed/queue/sealed_buffer.go @@ -0,0 +1,40 @@ +package queue + +import "time" + +type MemBuffer struct { + buf []byte + startTime time.Time + stopTime time.Time +} + +type SealedBuffers struct { + buffers []*MemBuffer +} + +func newSealedBuffers(size int) *SealedBuffers { + sbs := &SealedBuffers{} + + sbs.buffers = make([]*MemBuffer, size) + for i := 0; i < size; i++ { + sbs.buffers[i] = &MemBuffer{ + buf: make([]byte, BufferSize), + } + } + + return sbs +} + +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte) (newBuf []byte) { + oldMemBuffer := sbs.buffers[0] + size := len(sbs.buffers) + for i := 0; i < size-1; i++ { + sbs.buffers[i].buf = sbs.buffers[i+1].buf + sbs.buffers[i].startTime = sbs.buffers[i+1].startTime + sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime + } + sbs.buffers[size-1].buf = buf + sbs.buffers[size-1].startTime = startTime + sbs.buffers[size-1].stopTime = stopTime + return oldMemBuffer.buf +}