iterate through the log buffer

This commit is contained in:
Chris Lu 2020-04-05 16:51:30 -07:00
parent f71b855db5
commit 53626734d4
2 changed files with 47 additions and 7 deletions

View file

@ -81,6 +81,7 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath
var buf []byte var buf []byte
newLastReadTime, buf = f.metaLogBuffer.ReadFromBuffer(lastReadTime) newLastReadTime, buf = f.metaLogBuffer.ReadFromBuffer(lastReadTime)
var processedTs int64
for pos := 0; pos+4 < len(buf); { for pos := 0; pos+4 < len(buf); {
@ -103,7 +104,10 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath
err = eachEventFn(event.Directory, event.EventNotification) err = eachEventFn(event.Directory, event.EventNotification)
processedTs = logEntry.TsNs
if err != nil { if err != nil {
newLastReadTime = time.Unix(0, processedTs)
return return
} }
@ -111,6 +115,7 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath
} }
newLastReadTime = time.Unix(0, processedTs)
return return
} }

View file

@ -1,6 +1,8 @@
package queue package queue
import ( import (
"fmt"
"runtime/debug"
"sync" "sync"
"time" "time"
@ -11,6 +13,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
type dataToFlush struct {
startTime time.Time
stopTime time.Time
data []byte
}
type LogBuffer struct { type LogBuffer struct {
buf []byte buf []byte
idx []int idx []int
@ -22,6 +30,7 @@ type LogBuffer struct {
flushFn func(startTime, stopTime time.Time, buf []byte) flushFn func(startTime, stopTime time.Time, buf []byte)
notifyFn func() notifyFn func()
isStopping bool isStopping bool
flushChan chan *dataToFlush
sync.RWMutex sync.RWMutex
} }
@ -32,8 +41,10 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
flushInterval: flushInterval, flushInterval: flushInterval,
flushFn: flushFn, flushFn: flushFn,
notifyFn: notifyFn, notifyFn: notifyFn,
flushChan: make(chan *dataToFlush, 256),
} }
go lb.loopFlush() go lb.loopFlush()
go lb.loopInterval()
return lb return lb
} }
@ -62,7 +73,7 @@ func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
} }
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
m.flush() m.flushChan <- m.copyToFlush()
m.startTime = ts m.startTime = ts
if len(m.buf) < size+4 { if len(m.buf) < size+4 {
m.buf = make([]byte, 2*size+4) m.buf = make([]byte, 2*size+4)
@ -83,27 +94,45 @@ func (m *LogBuffer) Shutdown() {
} }
m.isStopping = true m.isStopping = true
m.Lock() m.Lock()
m.flush() toFlush := m.copyToFlush()
m.Unlock() m.Unlock()
m.flushChan <- toFlush
close(m.flushChan)
} }
func (m *LogBuffer) loopFlush() { func (m *LogBuffer) loopFlush() {
for d := range m.flushChan {
if d != nil {
m.flushFn(d.startTime, d.stopTime, d.data)
}
}
}
func (m *LogBuffer) loopInterval() {
for !m.isStopping { for !m.isStopping {
m.Lock() m.Lock()
m.flush() toFlush := m.copyToFlush()
m.Unlock() m.Unlock()
m.flushChan <- toFlush
time.Sleep(m.flushInterval) time.Sleep(m.flushInterval)
} }
} }
func (m *LogBuffer) flush() { func (m *LogBuffer) copyToFlush() *dataToFlush {
if m.flushFn != nil && m.pos > 0 { if m.flushFn != nil && m.pos > 0 {
// fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos]) debug.PrintStack()
d := &dataToFlush{
startTime: m.startTime,
stopTime: m.stopTime,
data: copiedBytes(m.buf[:m.pos]),
}
m.pos = 0 m.pos = 0
m.idx = m.idx[:0] m.idx = m.idx[:0]
return d
} }
return nil
} }
func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) { func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) {
@ -125,8 +154,14 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
lastTs := lastReadTime.UnixNano() lastTs := lastReadTime.UnixNano()
l, h := 0, len(m.idx)-1 l, h := 0, len(m.idx)-1
/*
for i, pos := range m.idx {
ts := readTs(m.buf, pos)
fmt.Printf("entry %d ts: %v offset:%d\n", i, time.Unix(0,ts), pos)
}
fmt.Printf("l=%d, h=%d\n", l, h)
*/
// fmt.Printf("l=%d, h=%d\n", l, h)
for { for {
mid := (l + h) / 2 mid := (l + h) / 2
pos := m.idx[mid] pos := m.idx[mid]