diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index f84a58c74..e733ddc75 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -92,6 +92,9 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { copy(m.buf[m.pos:m.pos+4], m.sizeBuf) copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) m.pos += size + 4 + + // fmt.Printf("entry size %d total %d count %d\n", size, m.pos, len(m.idx)) + } func (m *LogBuffer) Shutdown() { @@ -117,16 +120,12 @@ func (m *LogBuffer) loopFlush() { func (m *LogBuffer) loopInterval() { for !m.isStopping { + time.Sleep(m.flushInterval) m.Lock() + // println("loop interval") toFlush := m.copyToFlush() m.Unlock() m.flushChan <- toFlush - time.Sleep(m.flushInterval) - if m.notifyFn != nil { - // check whether blocked clients are already disconnected - println("notifying log buffer readers") - m.notifyFn() - } } } @@ -139,7 +138,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { stopTime: m.stopTime, data: copiedBytes(m.buf[:m.pos]), } - m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf) + // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx)) + m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) m.pos = 0 m.idx = m.idx[:0] return d @@ -166,6 +166,20 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu return nil } if lastReadTime.Before(m.startTime) { + // println("checking ", lastReadTime.UnixNano()) + for i, buf := range m.prevBuffers.buffers { + if buf.startTime.After(lastReadTime) { + if i == 0 { + println("return the earliest in memory") + return copiedBytes(buf.buf[:buf.size]) + } + return copiedBytes(buf.buf[:buf.size]) + } + if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { + pos := buf.locateByTs(lastReadTime) + return copiedBytes(buf.buf[pos:]) + } + } return copiedBytes(m.buf[:m.pos]) } @@ -227,16 +241,16 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) { return } -func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) { +func readTs(buf []byte, pos int) (size int, ts int64) { - size := util.BytesToUint32(buf[pos : pos+4]) - entryData := buf[pos+4 : pos+4+int(size)] + size = int(util.BytesToUint32(buf[pos : pos+4])) + entryData := buf[pos+4 : pos+4+size] logEntry := &filer_pb.LogEntry{} err := proto.Unmarshal(entryData, logEntry) if err != nil { glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) } - return logEntry, logEntry.TsNs + return size, logEntry.TsNs }