This commit is contained in:
Chris Lu 2020-07-05 15:52:36 -07:00
parent 70d8a3a1d3
commit 49929e0869
3 changed files with 6 additions and 6 deletions

View file

@ -35,7 +35,7 @@ type Filer struct {
FsyncBuckets []string FsyncBuckets []string
buckets *FilerBuckets buckets *FilerBuckets
Cipher bool Cipher bool
MetaLogBuffer *log_buffer.LogBuffer LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string metaLogCollection string
metaLogReplication string metaLogReplication string
} }
@ -47,7 +47,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string
fileIdDeletionQueue: util.NewUnboundedQueue(), fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
} }
f.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection f.metaLogCollection = collection
f.metaLogReplication = replication f.metaLogReplication = replication
@ -318,6 +318,6 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
} }
func (f *Filer) Shutdown() { func (f *Filer) Shutdown() {
f.MetaLogBuffer.Shutdown() f.LocalMetaLogBuffer.Shutdown()
f.store.Shutdown() f.store.Shutdown()
} }

View file

@ -67,7 +67,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
return return
} }
f.MetaLogBuffer.AddToBuffer([]byte(dir), data) f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data)
} }

View file

@ -37,7 +37,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, processedTsNs) lastReadTime = time.Unix(0, processedTsNs)
} }
err = fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock() fs.listenersLock.Lock()
fs.listenersCond.Wait() fs.listenersCond.Wait()
fs.listenersLock.Unlock() fs.listenersLock.Unlock()
@ -63,7 +63,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock() fs.listenersLock.Lock()
fs.listenersCond.Wait() fs.listenersCond.Wait()
fs.listenersLock.Unlock() fs.listenersLock.Unlock()