diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index d5e6cb214..422575193 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -58,9 +58,13 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { + var toFlush *dataToFlush m.Lock() defer func() { m.Unlock() + if toFlush != nil { + m.flushChan <- toFlush + } if m.notifyFn != nil { m.notifyFn() } @@ -96,7 +100,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { // glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos) - m.flushChan <- m.copyToFlush() + toFlush = m.copyToFlush() m.startTime = ts if len(m.buf) < size+4 { m.buf = make([]byte, 2*size+4) @@ -148,8 +152,10 @@ func (m *LogBuffer) loopInterval() { return } toFlush := m.copyToFlush() - m.flushChan <- toFlush m.Unlock() + if toFlush != nil { + m.flushChan <- toFlush + } } }