From 01ceace18edda7e747a903276023a8b1be3af757 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 22 Sep 2018 00:53:52 -0700 Subject: [PATCH] adjust sink options --- weed/command/scaffold.go | 3 +++ weed/replication/replicator.go | 2 +- weed/replication/sink/filer_sink.go | 15 +++++++++++---- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index aa4be5ea9..174e21c5e 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -162,6 +162,9 @@ topic = "seaweedfs_filer1_to_filer2" enabled = true grpcAddress = "localhost:18888" directory = "/backup" # all replicated files are under this directory tree +replication = "" +collection = "" +ttlSec = 0 ` ) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 66d194128..3e4bccc10 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -43,7 +43,7 @@ func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) if !strings.HasPrefix(key, r.source.Dir) { return nil } - key = r.sink.GetDirectory() + key[len(r.source.Dir):] + key = r.sink.GetSinkToDirectory() + key[len(r.source.Dir):] if message.OldEntry != nil && message.NewEntry == nil { return r.sink.DeleteEntry(key, message.OldEntry, message.DeleteChunks) } diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go index 422bdeabc..ebaa1a73f 100644 --- a/weed/replication/sink/filer_sink.go +++ b/weed/replication/sink/filer_sink.go @@ -15,21 +15,21 @@ type ReplicationSink interface { DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error CreateEntry(key string, entry *filer_pb.Entry) error UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error - GetDirectory() string + GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) } type FilerSink struct { + filerSource *source.FilerSource grpcAddress string dir string - filerSource *source.FilerSource replication string collection string ttlSec int32 dataCenter string } -func (fs *FilerSink) GetDirectory() string { +func (fs *FilerSink) GetSinkToDirectory() string { return fs.dir } @@ -37,6 +37,9 @@ func (fs *FilerSink) Initialize(configuration util.Configuration) error { return fs.initialize( configuration.GetString("grpcAddress"), configuration.GetString("directory"), + configuration.GetString("replication"), + configuration.GetString("collection"), + configuration.GetInt("ttlSec"), ) } @@ -44,9 +47,13 @@ func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) { fs.filerSource = s } -func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) { +func (fs *FilerSink) initialize(grpcAddress string, dir string, + replication string, collection string, ttlSec int) (err error) { fs.grpcAddress = grpcAddress fs.dir = dir + fs.replication = replication + fs.collection = collection + fs.ttlSec = int32(ttlSec) return nil }