diff --git a/weed/command/filer_export.go b/weed/command/filer_export.go index 9bd0f3014..d05607bf5 100644 --- a/weed/command/filer_export.go +++ b/weed/command/filer_export.go @@ -5,6 +5,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/server" "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) func init() { @@ -22,14 +24,18 @@ var cmdFilerExport = &Command{ If target store is empty, only the directory tree will be listed. + If target store is "notification", the list of entries will be sent to notification. + This is usually used to bootstrap filer replication to a remote system. + `, } var ( // filerExportOutputFile = cmdFilerExport.Flag.String("output", "", "the output file. If empty, only list out the directory tree") filerExportSourceStore = cmdFilerExport.Flag.String("sourceStore", "", "the source store name in filer.toml") - filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml") + filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml, or \"notification\" to export all files to message queue") dirListLimit = cmdFilerExport.Flag.Int("dirListLimit", 100000, "limit directory list size") + dryRun = cmdFilerExport.Flag.Bool("dryRun", false, "not actually moving data") ) type statistics struct { @@ -48,7 +54,7 @@ func runFilerExport(cmd *Command, args []string) bool { if store.GetName() == *filerExportSourceStore { viperSub := config.Sub(store.GetName()) if err := store.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize store for %s: %+v", + glog.Fatalf("Failed to initialize source store for %s: %+v", store.GetName(), err) } else { sourceStore = store @@ -61,7 +67,7 @@ func runFilerExport(cmd *Command, args []string) bool { if store.GetName() == *filerExportTargetStore { viperSub := config.Sub(store.GetName()) if err := store.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize store for %s: %+v", + glog.Fatalf("Failed to initialize target store for %s: %+v", store.GetName(), err) } else { targetStore = store @@ -79,14 +85,44 @@ func runFilerExport(cmd *Command, args []string) bool { return false } + if targetStore == nil && *filerExportTargetStore != "" && *filerExportTargetStore != "notification" { + glog.Errorf("Failed to find target store %s", *filerExportTargetStore) + println("existing data sources are:") + for _, store := range filer2.Stores { + println(" " + store.GetName()) + } + return false + } + stat := statistics{} var fn func(level int, entry *filer2.Entry) error - if targetStore == nil { + if *filerExportTargetStore == "notification" { + weed_server.LoadConfiguration("notification", false) + v := viper.GetViper() + notification.LoadConfiguration(v.Sub("notification")) + + fn = func(level int, entry *filer2.Entry) error { + printout(level, entry) + if *dryRun { + return nil + } + return notification.Queue.SendMessage( + string(entry.FullPath), + &filer_pb.EventNotification{ + NewEntry: entry.ToProtoEntry(), + }, + ) + } + } else if targetStore == nil { fn = printout } else { fn = func(level int, entry *filer2.Entry) error { + printout(level, entry) + if *dryRun { + return nil + } return targetStore.InsertEntry(entry) } } @@ -126,7 +162,11 @@ func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer func printout(level int, entry *filer2.Entry) error { for i := 0; i < level; i++ { - print(" ") + if i == level-1 { + print("+-") + } else { + print("| ") + } } println(entry.FullPath.Name()) return nil diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go index 82bc00b27..11318a3ae 100644 --- a/weed/filer2/entry.go +++ b/weed/filer2/entry.go @@ -43,3 +43,15 @@ func (entry *Entry) Timestamp() time.Time { return entry.Mtime } } + +func (entry *Entry) ToProtoEntry() *filer_pb.Entry { + if entry == nil { + return nil + } + return &filer_pb.Entry{ + Name: string(entry.FullPath), + IsDirectory: entry.IsDirectory(), + Attributes: EntryAttributeToPb(entry), + Chunks: entry.Chunks, + } +} diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index 44928516c..b3c215249 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -23,23 +23,11 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) notification.Queue.SendMessage( key, &filer_pb.EventNotification{ - OldEntry: toProtoEntry(oldEntry), - NewEntry: toProtoEntry(newEntry), + OldEntry: oldEntry.ToProtoEntry(), + NewEntry: newEntry.ToProtoEntry(), DeleteChunks: deleteChunks, }, ) } } - -func toProtoEntry(entry *Entry) *filer_pb.Entry { - if entry == nil { - return nil - } - return &filer_pb.Entry{ - Name: string(entry.FullPath), - IsDirectory: entry.IsDirectory(), - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - } -} diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go index ab54cd1a2..b74e2ad35 100644 --- a/weed/filer2/filer_notify_test.go +++ b/weed/filer2/filer_notify_test.go @@ -32,8 +32,8 @@ func TestProtoMarshalText(t *testing.T) { } notification := &filer_pb.EventNotification{ - OldEntry: toProtoEntry(oldEntry), - NewEntry: toProtoEntry(nil), + OldEntry: oldEntry.ToProtoEntry(), + NewEntry: nil, DeleteChunks: true, }