mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #3372 from kmlebedev/replicateWithExcludeDirectories
This commit is contained in:
commit
1f592ae933
2
Makefile
2
Makefile
|
@ -10,5 +10,5 @@ install:
|
||||||
full_install:
|
full_install:
|
||||||
cd weed; go install -tags "elastic gocdk sqlite ydb tikv"
|
cd weed; go install -tags "elastic gocdk sqlite ydb tikv"
|
||||||
|
|
||||||
test:
|
tests:
|
||||||
cd weed; go test -tags "elastic gocdk sqlite ydb tikv" -v ./...
|
cd weed; go test -tags "elastic gocdk sqlite ydb tikv" -v ./...
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,6 +16,7 @@ type FilerBackupOptions struct {
|
||||||
isActivePassive *bool
|
isActivePassive *bool
|
||||||
filer *string
|
filer *string
|
||||||
path *string
|
path *string
|
||||||
|
excludePaths *string
|
||||||
debug *bool
|
debug *bool
|
||||||
proxyByFiler *bool
|
proxyByFiler *bool
|
||||||
timeAgo *time.Duration
|
timeAgo *time.Duration
|
||||||
|
@ -28,6 +30,7 @@ func init() {
|
||||||
cmdFilerBackup.Run = runFilerBackup // break init cycle
|
cmdFilerBackup.Run = runFilerBackup // break init cycle
|
||||||
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
|
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
|
||||||
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
|
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
|
||||||
|
filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
|
||||||
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
|
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
|
||||||
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
||||||
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
||||||
|
@ -84,6 +87,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
||||||
|
|
||||||
sourceFiler := pb.ServerAddress(*backupOption.filer)
|
sourceFiler := pb.ServerAddress(*backupOption.filer)
|
||||||
sourcePath := *backupOption.path
|
sourcePath := *backupOption.path
|
||||||
|
excludePaths := strings.Split(*backupOption.excludePaths, ",")
|
||||||
timeAgo := *backupOption.timeAgo
|
timeAgo := *backupOption.timeAgo
|
||||||
targetPath := dataSink.GetSinkToDirectory()
|
targetPath := dataSink.GetSinkToDirectory()
|
||||||
debug := *backupOption.debug
|
debug := *backupOption.debug
|
||||||
|
@ -106,10 +110,14 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
||||||
|
|
||||||
// create filer sink
|
// create filer sink
|
||||||
filerSource := &source.FilerSource{}
|
filerSource := &source.FilerSource{}
|
||||||
filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler)
|
filerSource.DoInitialize(
|
||||||
|
sourceFiler.ToHttpAddress(),
|
||||||
|
sourceFiler.ToGrpcAddress(),
|
||||||
|
sourcePath,
|
||||||
|
*backupOption.proxyByFiler)
|
||||||
dataSink.SetSourceFiler(filerSource)
|
dataSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
|
processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, dataSink, debug)
|
||||||
|
|
||||||
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
||||||
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
|
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
|
||||||
|
|
|
@ -26,7 +26,9 @@ type SyncOptions struct {
|
||||||
filerA *string
|
filerA *string
|
||||||
filerB *string
|
filerB *string
|
||||||
aPath *string
|
aPath *string
|
||||||
|
aExcludePaths *string
|
||||||
bPath *string
|
bPath *string
|
||||||
|
bExcludePaths *string
|
||||||
aReplication *string
|
aReplication *string
|
||||||
bReplication *string
|
bReplication *string
|
||||||
aCollection *string
|
aCollection *string
|
||||||
|
@ -58,7 +60,9 @@ func init() {
|
||||||
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
|
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
|
||||||
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
|
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
|
||||||
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
|
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
|
||||||
|
syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
|
||||||
syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
|
syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
|
||||||
|
syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
|
||||||
syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
|
syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
|
||||||
syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
|
syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
|
||||||
syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
|
syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
|
||||||
|
@ -133,9 +137,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
syncOptions.clientEpoch++
|
syncOptions.clientEpoch++
|
||||||
err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
|
err := doSubscribeFilerMetaChanges(
|
||||||
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
|
syncOptions.clientId,
|
||||||
*syncOptions.bDebug, aFilerSignature, bFilerSignature)
|
syncOptions.clientEpoch,
|
||||||
|
grpcDialOption,
|
||||||
|
filerA,
|
||||||
|
*syncOptions.aPath,
|
||||||
|
strings.Split(*syncOptions.aExcludePaths, ","),
|
||||||
|
*syncOptions.aProxyByFiler,
|
||||||
|
filerB,
|
||||||
|
*syncOptions.bPath,
|
||||||
|
*syncOptions.bReplication,
|
||||||
|
*syncOptions.bCollection,
|
||||||
|
*syncOptions.bTtlSec,
|
||||||
|
*syncOptions.bProxyByFiler,
|
||||||
|
*syncOptions.bDiskType,
|
||||||
|
*syncOptions.bDebug,
|
||||||
|
aFilerSignature,
|
||||||
|
bFilerSignature)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
|
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
|
||||||
time.Sleep(1747 * time.Millisecond)
|
time.Sleep(1747 * time.Millisecond)
|
||||||
|
@ -154,9 +173,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
syncOptions.clientEpoch++
|
syncOptions.clientEpoch++
|
||||||
err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
|
err := doSubscribeFilerMetaChanges(
|
||||||
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
|
syncOptions.clientId,
|
||||||
*syncOptions.aDebug, bFilerSignature, aFilerSignature)
|
syncOptions.clientEpoch,
|
||||||
|
grpcDialOption,
|
||||||
|
filerB,
|
||||||
|
*syncOptions.bPath,
|
||||||
|
strings.Split(*syncOptions.bExcludePaths, ","),
|
||||||
|
*syncOptions.bProxyByFiler,
|
||||||
|
filerA,
|
||||||
|
*syncOptions.aPath,
|
||||||
|
*syncOptions.aReplication,
|
||||||
|
*syncOptions.aCollection,
|
||||||
|
*syncOptions.aTtlSec,
|
||||||
|
*syncOptions.aProxyByFiler,
|
||||||
|
*syncOptions.aDiskType,
|
||||||
|
*syncOptions.aDebug,
|
||||||
|
bFilerSignature,
|
||||||
|
aFilerSignature)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
|
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
|
||||||
time.Sleep(2147 * time.Millisecond)
|
time.Sleep(2147 * time.Millisecond)
|
||||||
|
@ -186,7 +220,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
|
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
|
||||||
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
|
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
|
||||||
|
|
||||||
// if first time, start from now
|
// if first time, start from now
|
||||||
|
@ -205,7 +239,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
|
||||||
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
||||||
filerSink.SetSourceFiler(filerSource)
|
filerSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
|
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, filerSink, debug)
|
||||||
|
|
||||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
@ -302,7 +336,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
|
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
// process function
|
// process function
|
||||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
@ -322,7 +356,11 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
|
||||||
if !strings.HasPrefix(resp.Directory, sourcePath) {
|
if !strings.HasPrefix(resp.Directory, sourcePath) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
for _, excludePath := range excludePaths {
|
||||||
|
if strings.HasPrefix(resp.Directory, excludePath) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
// handle deletions
|
// handle deletions
|
||||||
if filer_pb.IsDelete(resp) {
|
if filer_pb.IsDelete(resp) {
|
||||||
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
||||||
|
|
|
@ -13,6 +13,8 @@ grpcAddress = "localhost:18888"
|
||||||
# this is not a directory on your hard drive, but on your filer.
|
# this is not a directory on your hard drive, but on your filer.
|
||||||
# i.e., all files with this "prefix" are sent to notification message queue.
|
# i.e., all files with this "prefix" are sent to notification message queue.
|
||||||
directory = "/buckets"
|
directory = "/buckets"
|
||||||
|
# files from the directory separated by space are excluded from sending notifications
|
||||||
|
excludeDirectories = "/buckets/tmp"
|
||||||
|
|
||||||
[sink.local]
|
[sink.local]
|
||||||
enabled = false
|
enabled = false
|
||||||
|
|
|
@ -16,8 +16,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Replicator struct {
|
type Replicator struct {
|
||||||
sink sink.ReplicationSink
|
sink sink.ReplicationSink
|
||||||
source *source.FilerSource
|
source *source.FilerSource
|
||||||
|
excludeDirs []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator {
|
func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator {
|
||||||
|
@ -28,8 +29,9 @@ func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSin
|
||||||
dataSink.SetSourceFiler(source)
|
dataSink.SetSourceFiler(source)
|
||||||
|
|
||||||
return &Replicator{
|
return &Replicator{
|
||||||
sink: dataSink,
|
sink: dataSink,
|
||||||
source: source,
|
source: source,
|
||||||
|
excludeDirs: sourceConfig.GetStringSlice(configPrefix + "excludeDirectories"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +43,13 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
|
||||||
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
|
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
for _, excludeDir := range r.excludeDirs {
|
||||||
|
if strings.HasPrefix(key, excludeDir) {
|
||||||
|
glog.V(4).Infof("skipping %v of exclude dir %v", key, excludeDir)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var dateKey string
|
var dateKey string
|
||||||
if r.sink.IsIncremental() {
|
if r.sink.IsIncremental() {
|
||||||
var mTime int64
|
var mTime int64
|
||||||
|
|
Loading…
Reference in a new issue