From 6b54ff991249d2ce5dfbe28ffb503fc770d5db6c Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Wed, 27 Jan 2021 15:01:33 +0500 Subject: [PATCH 1/3] replication to create time date directory --- docker/replication.toml | 5 ++--- weed/command/filer_replication.go | 1 + weed/command/scaffold.go | 7 ++++++- weed/replication/replicator.go | 13 ++++++++++++- .../sink/backupsink/backup_sink.go | 18 ++++++++++++++++++ weed/replication/sink/localsink/local_sink.go | 19 ++++++------------- 6 files changed, 45 insertions(+), 18 deletions(-) create mode 100644 weed/replication/sink/backupsink/backup_sink.go diff --git a/docker/replication.toml b/docker/replication.toml index 2cee755a5..75e34a899 100644 --- a/docker/replication.toml +++ b/docker/replication.toml @@ -6,7 +6,6 @@ grpcAddress = "filer:18888" # i.e., all files with this "prefix" are sent to notification message queue. directory = "/buckets" -[sink.local] +[sink.backup] enabled = true -directory = "/data" -todays_date_format = "2006-02-01" \ No newline at end of file +directory = "/data" \ No newline at end of file diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index e8c06b208..f2754139b 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/backupsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 1705c6ae4..9cbb1fa1b 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -353,9 +353,14 @@ directory = "/buckets" [sink.local] enabled = false -directory = "/backup" +directory = "/data" todays_date_format = "" # set this to 2006-02-01 for incremental backup +[sink.backup] +enabled = false +# all replicated files are under create time date directory tree +directory = "/backup" + [sink.filer] enabled = false grpcAddress = "localhost:18888" diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index c4228434f..81d546c3c 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "google.golang.org/grpc" "strings" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -40,7 +41,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) return nil } - newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) + var dateKey string + if r.sink.GetName() == "backup" { + var crTime int64 + if message.NewEntry != nil { + crTime = message.NewEntry.Attributes.Crtime + } else if message.OldEntry != nil { + crTime = message.OldEntry.Attributes.Crtime + } + dateKey = time.Unix(crTime, 0).Format("2006-01-02") + } + newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):]) glog.V(3).Infof("replicate %s => %s", key, newKey) key = newKey if message.OldEntry != nil && message.NewEntry == nil { diff --git a/weed/replication/sink/backupsink/backup_sink.go b/weed/replication/sink/backupsink/backup_sink.go new file mode 100644 index 000000000..df0a778d1 --- /dev/null +++ b/weed/replication/sink/backupsink/backup_sink.go @@ -0,0 +1,18 @@ +package backupsink + +import ( + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" +) + +type BackupSink struct { + localsink.LocalSink +} + +func (backupsink *BackupSink) GetName() string { + return "backup" +} + +func init() { + sink.Sinks = append(sink.Sinks, &BackupSink{}) +} diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go index 5ca562ec8..21c625c3f 100644 --- a/weed/replication/sink/localsink/local_sink.go +++ b/weed/replication/sink/localsink/local_sink.go @@ -12,13 +12,11 @@ import ( "os" "path/filepath" "strings" - "time" ) type LocalSink struct { - dir string - todaysDateFormat string - filerSource *source.FilerSource + Dir string + filerSource *source.FilerSource } func init() { @@ -37,24 +35,19 @@ func (localsink *LocalSink) isMultiPartEntry(key string) bool { return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/") } -func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error { - localsink.dir = dir - localsink.todaysDateFormat = todaysDateFormat +func (localsink *LocalSink) initialize(dir string) error { + localsink.Dir = dir return nil } func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error { dir := configuration.GetString(prefix + "directory") - todaysDateFormat := configuration.GetString(prefix + "todays_date_format") glog.V(4).Infof("sink.local.directory: %v", dir) - return localsink.initialize(dir, todaysDateFormat) + return localsink.initialize(dir) } func (localsink *LocalSink) GetSinkToDirectory() string { - if localsink.todaysDateFormat != "" { - return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat)) - } - return localsink.dir + return localsink.Dir } func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { From 02fdc0a333bedac882c5f9c103d073c3180ec6bb Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Thu, 28 Jan 2021 14:56:13 +0500 Subject: [PATCH 2/3] rename backup to local_incremental and use mtime --- docker/replication.toml | 2 +- weed/command/filer_replication.go | 2 +- weed/replication/replicator.go | 10 +++++----- .../replication/sink/backupsink/backup_sink.go | 18 ------------------ .../local_incremental_sink.go | 18 ++++++++++++++++++ 5 files changed, 25 insertions(+), 25 deletions(-) delete mode 100644 weed/replication/sink/backupsink/backup_sink.go create mode 100644 weed/replication/sink/localincrementalsink/local_incremental_sink.go diff --git a/docker/replication.toml b/docker/replication.toml index 75e34a899..833bb1692 100644 --- a/docker/replication.toml +++ b/docker/replication.toml @@ -6,6 +6,6 @@ grpcAddress = "filer:18888" # i.e., all files with this "prefix" are sent to notification message queue. directory = "/buckets" -[sink.backup] +[sink.local_incremental] enabled = true directory = "/data" \ No newline at end of file diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index f2754139b..47ef5d1b6 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -9,9 +9,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/backupsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localincrementalsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 81d546c3c..7688029e6 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -42,14 +42,14 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p return nil } var dateKey string - if r.sink.GetName() == "backup" { - var crTime int64 + if r.sink.GetName() == "local_incremental" { + var mTime int64 if message.NewEntry != nil { - crTime = message.NewEntry.Attributes.Crtime + mTime = message.NewEntry.Attributes.Mtime } else if message.OldEntry != nil { - crTime = message.OldEntry.Attributes.Crtime + mTime = message.OldEntry.Attributes.Mtime } - dateKey = time.Unix(crTime, 0).Format("2006-01-02") + dateKey = time.Unix(mTime, 0).Format("2006-01-02") } newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):]) glog.V(3).Infof("replicate %s => %s", key, newKey) diff --git a/weed/replication/sink/backupsink/backup_sink.go b/weed/replication/sink/backupsink/backup_sink.go deleted file mode 100644 index df0a778d1..000000000 --- a/weed/replication/sink/backupsink/backup_sink.go +++ /dev/null @@ -1,18 +0,0 @@ -package backupsink - -import ( - "github.com/chrislusf/seaweedfs/weed/replication/sink" - "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" -) - -type BackupSink struct { - localsink.LocalSink -} - -func (backupsink *BackupSink) GetName() string { - return "backup" -} - -func init() { - sink.Sinks = append(sink.Sinks, &BackupSink{}) -} diff --git a/weed/replication/sink/localincrementalsink/local_incremental_sink.go b/weed/replication/sink/localincrementalsink/local_incremental_sink.go new file mode 100644 index 000000000..97da3a5f7 --- /dev/null +++ b/weed/replication/sink/localincrementalsink/local_incremental_sink.go @@ -0,0 +1,18 @@ +package localincrementalsink + +import ( + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" +) + +type LocalIncSink struct { + localsink.LocalSink +} + +func (localincsink *LocalIncSink) GetName() string { + return "local_incremental" +} + +func init() { + sink.Sinks = append(sink.Sinks, &LocalIncSink{}) +} From be1062b7fcabf793ea3cbfd5a67b81c07610ce83 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Thu, 28 Jan 2021 14:59:20 +0500 Subject: [PATCH 3/3] rename in scaffold --- weed/command/scaffold.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 9cbb1fa1b..a95c41054 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -356,9 +356,9 @@ enabled = false directory = "/data" todays_date_format = "" # set this to 2006-02-01 for incremental backup -[sink.backup] +[sink.local_incremental] enabled = false -# all replicated files are under create time date directory tree +# all replicated files are under modification time date directory tree directory = "/backup" [sink.filer]