diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 7747c9af6..8c2b1b33a 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -68,7 +68,7 @@ func (f *Filer) AggregateFromPeers(self string, filers []string) { filers = append(filers, self) } f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) - f.MetaAggregator.StartLoopSubscribe(time.Now().UnixNano()) + f.MetaAggregator.StartLoopSubscribe(f, self, time.Now().UnixNano()) } diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index f36c74f14..24464b6a5 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -29,6 +29,11 @@ type FilerStore interface { Shutdown() } +type FilerLocalStore interface { + UpdateOffset(filer string, lastTsNs int64) error + ReadOffset(filer string) (lastTsNs int64, err error) +} + type FilerStoreWrapper struct { actualStore FilerStore } diff --git a/weed/filer2/leveldb2/leveldb2_local_store.go b/weed/filer2/leveldb2/leveldb2_local_store.go new file mode 100644 index 000000000..86aa54471 --- /dev/null +++ b/weed/filer2/leveldb2/leveldb2_local_store.go @@ -0,0 +1,47 @@ +package leveldb + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + filer2.Stores = append(filer2.Stores, &LevelDB2Store{}) +} + +var ( + _ = filer2.FilerLocalStore(&LevelDB2Store{}) +) + +func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error { + + value := make([]byte, 8) + util.Uint64toBytes(value, uint64(lastTsNs)) + + err := store.dbs[0].Put([]byte("meta"+filer), value, nil) + + if err != nil { + return fmt.Errorf("UpdateOffset %s : %v", filer, err) + } + + println("UpdateOffset", filer, "lastTsNs", lastTsNs) + + return nil +} + +func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) { + + value, err := store.dbs[0].Get([]byte("meta"+filer), nil) + + if err != nil { + return 0, fmt.Errorf("ReadOffset %s : %v", filer, err) + } + + lastTsNs = int64(util.BytesToUint64(value)) + + println("ReadOffset", filer, "lastTsNs", lastTsNs) + + return +} diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go index 2f707b921..478337ae7 100644 --- a/weed/filer2/meta_aggregator.go +++ b/weed/filer2/meta_aggregator.go @@ -37,13 +37,42 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg return t } -func (ma *MetaAggregator) StartLoopSubscribe(lastTsNs int64) { +func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string, lastTsNs int64) { for _, filer := range ma.filers { - go ma.subscribeToOneFiler(filer, lastTsNs) + go ma.subscribeToOneFiler(f, self, filer, lastTsNs) } } -func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) { +func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, filer string, self string, lastTsNs int64) { + + var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) + lastPersistTime := time.Now() + changesSinceLastPersist := 0 + + MaxChangeLimit := 100 + + if localStore, ok := f.store.actualStore.(FilerLocalStore); ok { + if prevTsNs, err := localStore.ReadOffset(filer); err == nil { + lastTsNs = prevTsNs + } + if self != filer { + maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { + if err := Replay(f.store.actualStore, event); err != nil { + glog.Errorf("failed to reply metadata change from %v: %v", filer, err) + return + } + changesSinceLastPersist++ + if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) { + if err := localStore.UpdateOffset(filer, event.TsNs); err == nil { + lastPersistTime = time.Now() + changesSinceLastPersist = 0 + } else { + glog.V(0).Infof("failed to update offset for %v: %v", filer, err) + } + } + } + } + } processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error { data, err := proto.Marshal(event) @@ -54,6 +83,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) { dir := event.Directory // println("received meta change", dir, "size", len(data)) ma.MetaLogBuffer.AddToBuffer([]byte(dir), data) + if maybeReplicateMetadataChange != nil { + maybeReplicateMetadataChange(event) + } return nil } diff --git a/weed/filer2/meta_replay.go b/weed/filer2/meta_replay.go new file mode 100644 index 000000000..d9cdaa76a --- /dev/null +++ b/weed/filer2/meta_replay.go @@ -0,0 +1,37 @@ +package filer2 + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + var oldPath util.FullPath + var newEntry *Entry + if message.OldEntry != nil { + oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name) + glog.V(4).Infof("deleting %v", oldPath) + if err := filerStore.DeleteEntry(context.Background(), oldPath); err != nil { + return err + } + } + + if message.NewEntry != nil { + dir := resp.Directory + if message.NewParentPath != "" { + dir = message.NewParentPath + } + key := util.NewFullPath(dir, message.NewEntry.Name) + glog.V(4).Infof("creating %v", key) + newEntry = FromPbEntry(dir, message.NewEntry) + if err := filerStore.InsertEntry(context.Background(), newEntry); err != nil { + return err + } + } + + return nil +}