diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 18f549edc..25bca7c97 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -44,7 +44,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { } } - replicator := replication.NewReplicator(config.Sub("sink.filer")) + replicator := replication.NewReplicator(config.Sub("source.filer"), config.Sub("sink.filer")) for { key, m, err := notificationInput.ReceiveMessage() diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 17d8b1884..aa4be5ea9 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -149,25 +149,19 @@ topic = "seaweedfs_filer" [source.filer] enabled = true grpcAddress = "localhost:18888" -# id is to identify the notification source, avoid reprocessing the same events -id = "filer1" -# all files under this directory tree and not from this source.filer.id is replicated -directory = "/" +directory = "/buckets" # all files under this directory tree are replicated [notification.kafka] enabled = true hosts = [ "localhost:9092" ] -topic = "seaweedfs_filer" +topic = "seaweedfs_filer1_to_filer2" [sink.filer] enabled = true grpcAddress = "localhost:18888" -# id is to identify the notification source, avoid reprocessing the same events -id = "filer2" -# all files under this directory tree and not from this source.filer.id is replicated -directory = "/" +directory = "/backup" # all replicated files are under this directory tree ` ) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index c223021c2..4f5d5203e 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -4,28 +4,48 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "strings" + "github.com/chrislusf/seaweedfs/weed/glog" ) type Replicator struct { - sink *sink.FilerSink + sink sink.ReplicationSink + source *source.FilerSource } -func NewReplicator(config util.Configuration) *Replicator { +func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { sink := &sink.FilerSink{} - sink.Initialize(config) + sink.Initialize(sinkConfig) + + source := &source.FilerSource{} + source.Initialize(sourceConfig) + + if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") { + fromDir := sourceConfig.GetString("directory") + toDir := sinkConfig.GetString("directory") + if strings.HasPrefix(toDir, fromDir) { + glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) + } + } return &Replicator{ - sink: sink, + sink: sink, + source: source, } } func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error { + if !strings.HasPrefix(key, r.source.Dir) { + return nil + } + key = r.sink.GetDirectory() + key[len(r.source.Dir):] if message.OldEntry != nil && message.NewEntry == nil { - return r.sink.DeleteEntry(message.OldEntry, message.DeleteChunks) + return r.sink.DeleteEntry(key, message.OldEntry, message.DeleteChunks) } if message.OldEntry == nil && message.NewEntry != nil { - return r.sink.CreateEntry(message.NewEntry) + return r.sink.CreateEntry(key, message.NewEntry) } - return r.sink.UpdateEntry(message.OldEntry, message.NewEntry, message.DeleteChunks) + return r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks) } diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go index 57ebeddff..387bffb58 100644 --- a/weed/replication/sink/filer_sink.go +++ b/weed/replication/sink/filer_sink.go @@ -12,36 +12,38 @@ import ( ) type ReplicationSink interface { - DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error - CreateEntry(entry *filer_pb.Entry) error - UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error + DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error + CreateEntry(key string, entry *filer_pb.Entry) error + UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error + GetDirectory() string } type FilerSink struct { grpcAddress string - id string dir string } +func (fs *FilerSink) GetDirectory() string { + return fs.dir +} + func (fs *FilerSink) Initialize(configuration util.Configuration) error { return fs.initialize( configuration.GetString("grpcAddress"), - configuration.GetString("id"), configuration.GetString("directory"), ) } -func (fs *FilerSink) initialize(grpcAddress string, id string, dir string) (err error) { +func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress - fs.id = id fs.dir = dir return nil } -func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error { +func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error { return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(entry.Name).DirAndName() + dir, name := filer2.FullPath(key).DirAndName() request := &filer_pb.DeleteEntryRequest{ Directory: dir, @@ -53,26 +55,26 @@ func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool glog.V(1).Infof("delete entry: %v", request) _, err := client.DeleteEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("delete entry %s: %v", entry.Name, err) - return fmt.Errorf("delete entry %s: %v", entry.Name, err) + glog.V(0).Infof("delete entry %s: %v", key, err) + return fmt.Errorf("delete entry %s: %v", key, err) } return nil }) } -func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error { +func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { replicatedChunks, err := replicateChunks(entry.Chunks) if err != nil { - glog.V(0).Infof("replicate entry chunks %s: %v", entry.Name, err) - return fmt.Errorf("replicate entry chunks %s: %v", entry.Name, err) + glog.V(0).Infof("replicate entry chunks %s: %v", key, err) + return fmt.Errorf("replicate entry chunks %s: %v", key, err) } return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(entry.Name).DirAndName() + dir, name := filer2.FullPath(key).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -86,15 +88,15 @@ func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error { glog.V(1).Infof("create: %v", request) if _, err := client.CreateEntry(context.Background(), request); err != nil { - glog.V(0).Infof("create entry %s: %v", entry.Name, err) - return fmt.Errorf("create entry %s: %v", entry.Name, err) + glog.V(0).Infof("create entry %s: %v", key, err) + return fmt.Errorf("create entry %s: %v", key, err) } return nil }) } -func (fs *FilerSink) UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { +func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { return nil } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 49c623815..c3b575b6d 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -16,22 +16,19 @@ type ReplicationSource interface { type FilerSource struct { grpcAddress string - id string - dir string + Dir string } func (fs *FilerSource) Initialize(configuration util.Configuration) error { return fs.initialize( configuration.GetString("grpcAddress"), - configuration.GetString("id"), configuration.GetString("directory"), ) } -func (fs *FilerSource) initialize(grpcAddress string, id string, dir string) (err error) { +func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress - fs.id = id - fs.dir = dir + fs.Dir = dir return nil }