mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer backup add option for exclude file names that match the regexp to sync on filer
This commit is contained in:
parent
5e41ab1370
commit
3c5295a1a6
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"regexp"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,6 +17,7 @@ type FilerBackupOptions struct {
|
||||||
filer *string
|
filer *string
|
||||||
path *string
|
path *string
|
||||||
excludePaths *string
|
excludePaths *string
|
||||||
|
excludeFileName *string
|
||||||
debug *bool
|
debug *bool
|
||||||
proxyByFiler *bool
|
proxyByFiler *bool
|
||||||
timeAgo *time.Duration
|
timeAgo *time.Duration
|
||||||
|
@ -31,6 +33,7 @@ func init() {
|
||||||
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
|
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
|
||||||
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
|
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
|
||||||
filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
|
filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
|
||||||
|
filerBackupOptions.excludeFileName = cmdFilerBackup.Flag.String("filerExcludeFileName", "", "exclude file names that match the regexp to sync on filer")
|
||||||
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
|
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
|
||||||
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
||||||
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
||||||
|
@ -90,6 +93,10 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
||||||
sourceFiler := pb.ServerAddress(*backupOption.filer)
|
sourceFiler := pb.ServerAddress(*backupOption.filer)
|
||||||
sourcePath := *backupOption.path
|
sourcePath := *backupOption.path
|
||||||
excludePaths := util.StringSplit(*backupOption.excludePaths, ",")
|
excludePaths := util.StringSplit(*backupOption.excludePaths, ",")
|
||||||
|
reExcludeFileName, err := regexp.Compile(*backupOption.excludeFileName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error compile regexp %v for exclude file name: %+v", *backupOption.excludeFileName, err)
|
||||||
|
}
|
||||||
timeAgo := *backupOption.timeAgo
|
timeAgo := *backupOption.timeAgo
|
||||||
targetPath := dataSink.GetSinkToDirectory()
|
targetPath := dataSink.GetSinkToDirectory()
|
||||||
debug := *backupOption.debug
|
debug := *backupOption.debug
|
||||||
|
@ -119,7 +126,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
||||||
*backupOption.proxyByFiler)
|
*backupOption.proxyByFiler)
|
||||||
dataSink.SetSourceFiler(filerSource)
|
dataSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, dataSink, debug)
|
processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, debug)
|
||||||
|
|
||||||
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
||||||
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
|
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"os"
|
"os"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -250,7 +251,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
|
||||||
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
||||||
filerSink.SetSourceFiler(filerSource)
|
filerSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, filerSink, debug)
|
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, debug)
|
||||||
|
|
||||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
@ -368,7 +369,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
|
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
// process function
|
// process function
|
||||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
@ -393,6 +394,9 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if reExcludeFileName != nil && reExcludeFileName.MatchString(message.NewEntry.Name) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
// handle deletions
|
// handle deletions
|
||||||
if filer_pb.IsDelete(resp) {
|
if filer_pb.IsDelete(resp) {
|
||||||
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
||||||
|
|
Loading…
Reference in a new issue