diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 3dcea147f..ed56a710e 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -29,7 +29,6 @@ type LogBuffer struct { pos int startTime time.Time stopTime time.Time - lastFlushTime time.Time sizeBuf []byte flushInterval time.Duration flushFn func(startTime, stopTime time.Time, buf []byte) @@ -133,8 +132,6 @@ func (m *LogBuffer) loopFlush() { // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) d.releaseMemory() - // local logbuffer is different from aggregate logbuffer here - m.lastFlushTime = d.stopTime } } } @@ -165,9 +162,6 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { data: copiedBytes(m.buf[:m.pos]), } // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) - } else { - // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) - m.lastFlushTime = m.stopTime } m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) m.startTime = time.Unix(0, 0) @@ -188,20 +182,31 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu m.RLock() defer m.RUnlock() - if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) { - if time.Now().Sub(m.lastFlushTime) < m.flushInterval*2 { - diff := m.lastFlushTime.Sub(lastReadTime) - glog.V(4).Infof("lastFlush:%v lastRead:%v diff:%v", m.lastFlushTime, lastReadTime, diff) - return nil, ResumeFromDiskError + // Read from disk and memory + // 1. read from disk, last time is = td + // 2. in memory, the earliest time = tm + // if tm <= td, case 2.1 + // read from memory + // if tm is empty, case 2.2 + // read from memory + // if td < tm, case 2.3 + // read from disk again + var tsMemory time.Time + if !m.startTime.IsZero() { + tsMemory = m.startTime + } + for _, prevBuf := range m.prevBuffers.buffers { + if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { + tsMemory = prevBuf.startTime } } + if tsMemory.IsZero() { // case 2.2 + return nil, nil + } else if lastReadTime.Before(tsMemory) { // case 2.3 + return nil, ResumeFromDiskError + } - /* - fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers)) - for i, prevBuf := range m.prevBuffers.buffers { - fmt.Printf(" prev %d : %s\n", i, prevBuf.String()) - } - */ + // the following is case 2.1 if lastReadTime.Equal(m.stopTime) { return nil, nil