From bcf37346ef23d2e31d2baa778ff9f839a3af96c0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 10 Apr 2020 01:35:59 -0700 Subject: [PATCH] add timestamp inside lock --- weed/filer2/filer_notify.go | 6 +++--- weed/queue/log_buffer.go | 42 +++++++++++++++++++------------------ 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index 095587038..de07e1cf9 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -45,11 +45,11 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) notification.Queue.SendMessage(fullpath, eventNotification) } - f.logMetaEvent(time.Now(), fullpath, eventNotification) + f.logMetaEvent(fullpath, eventNotification) } -func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *filer_pb.EventNotification) { +func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventNotification) { dir, _ := util.FullPath(fullpath).DirAndName() @@ -63,7 +63,7 @@ func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *f return } - f.metaLogBuffer.AddToBuffer(ts, []byte(dir), data) + f.metaLogBuffer.AddToBuffer([]byte(dir), data) } diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go index 5f8db140d..f24429479 100644 --- a/weed/queue/log_buffer.go +++ b/weed/queue/log_buffer.go @@ -46,8 +46,18 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) { +func (m *LogBuffer) AddToBuffer(key, data []byte) { + m.Lock() + defer func() { + m.Unlock() + if m.notifyFn != nil { + m.notifyFn() + } + }() + + // need to put the timestamp inside the lock + ts := time.Now() logEntry := &filer_pb.LogEntry{ TsNs: ts.UnixNano(), PartitionKeyHash: util.HashToInt32(key), @@ -58,14 +68,6 @@ func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) { size := len(logEntryData) - m.Lock() - defer func() { - m.Unlock() - if m.notifyFn != nil { - m.notifyFn() - } - }() - if m.pos == 0 { m.startTime = ts } @@ -153,18 +155,18 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer l, h := 0, len(m.idx)-1 /* - for i, pos := range m.idx { - logEntry, ts := readTs(m.buf, pos) - event := &filer_pb.FullEventNotification{} - proto.Unmarshal(logEntry.Data, event) - entry := event.EventNotification.OldEntry - if entry == nil { - entry = event.EventNotification.NewEntry + for i, pos := range m.idx { + logEntry, ts := readTs(m.buf, pos) + event := &filer_pb.FullEventNotification{} + proto.Unmarshal(logEntry.Data, event) + entry := event.EventNotification.OldEntry + if entry == nil { + entry = event.EventNotification.NewEntry + } + fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) } - fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) - } - fmt.Printf("l=%d, h=%d\n", l, h) - */ + fmt.Printf("l=%d, h=%d\n", l, h) + */ for l <= h { mid := (l + h) / 2