refactoring

This commit is contained in:
Chris Lu 2020-04-28 02:05:44 -07:00
parent 66effaed9e
commit 8ed490164e

View file

@ -70,12 +70,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err return err
} }
messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
lock.Mutex.Lock()
lock.cond.Wait()
lock.Mutex.Unlock()
return true
}, func(logEntry *filer_pb.LogEntry) error {
m := &messaging_pb.Message{} m := &messaging_pb.Message{}
if err = proto.Unmarshal(logEntry.Data, m); err != nil { if err = proto.Unmarshal(logEntry.Data, m); err != nil {
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
@ -87,7 +82,14 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err return err
} }
return nil return nil
}) }
messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
lock.Mutex.Unlock()
return true
}, eachLogEntryFn)
return err return err