add reading from persisted logs for local filer store

This commit is contained in:
Chris Lu 2020-07-13 22:55:28 -07:00
parent 87b5031714
commit 4f6096c7f0
2 changed files with 17 additions and 2 deletions

View file

@ -59,7 +59,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
lastTsNs = prevTsNs lastTsNs = prevTsNs
} }
glog.V(0).Infof("follow filer: %v, lastTsNs=%d", filer, lastTsNs) glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
if err := Replay(f.Store.ActualStore, event); err != nil { if err := Replay(f.Store.ActualStore, event); err != nil {
glog.Errorf("failed to reply metadata change from %v: %v", filer, err) glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
@ -98,7 +98,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
for { for {
err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
ClientName: "filer", ClientName: "filer:"+self,
PathPrefix: "/", PathPrefix: "/",
SinceNs: lastTsNs, SinceNs: lastTsNs,
}) })

View file

@ -63,6 +63,20 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
if _, ok := fs.filer.Store.ActualStore.(filer2.FilerLocalStore); ok {
// println("reading from persisted logs ...")
processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
}
// println("reading from in memory logs ...")
err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock() fs.listenersLock.Lock()
fs.listenersCond.Wait() fs.listenersCond.Wait()
@ -117,6 +131,7 @@ func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream file
EventNotification: eventNotification, EventNotification: eventNotification,
TsNs: tsNs, TsNs: tsNs,
} }
// println("sending", dirPath, entryName)
if err := stream.Send(message); err != nil { if err := stream.Send(message); err != nil {
glog.V(0).Infof("=> client %v: %+v", clientName, err) glog.V(0).Infof("=> client %v: %+v", clientName, err)
return err return err