mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer.backup: added to replace filer.replicate
This commit is contained in:
parent
014a31d11a
commit
e52c94640e
|
@ -33,3 +33,7 @@ debug_webdav:
|
||||||
debug_s3:
|
debug_s3:
|
||||||
go build -gcflags="all=-N -l"
|
go build -gcflags="all=-N -l"
|
||||||
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3
|
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3
|
||||||
|
|
||||||
|
debug_filer_copy:
|
||||||
|
go build -gcflags="all=-N -l"
|
||||||
|
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h
|
||||||
|
|
|
@ -15,6 +15,7 @@ var Commands = []*Command{
|
||||||
cmdDownload,
|
cmdDownload,
|
||||||
cmdExport,
|
cmdExport,
|
||||||
cmdFiler,
|
cmdFiler,
|
||||||
|
cmdFilerBackup,
|
||||||
cmdFilerCat,
|
cmdFilerCat,
|
||||||
cmdFilerMetaTail,
|
cmdFilerMetaTail,
|
||||||
cmdFilerReplicate,
|
cmdFilerReplicate,
|
||||||
|
|
158
weed/command/filer_backup.go
Normal file
158
weed/command/filer_backup.go
Normal file
|
@ -0,0 +1,158 @@
|
||||||
|
package command
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/replication/source"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FilerBackupOptions struct {
|
||||||
|
isActivePassive *bool
|
||||||
|
filer *string
|
||||||
|
path *string
|
||||||
|
debug *bool
|
||||||
|
proxyByFiler *bool
|
||||||
|
timeAgo *time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
filerBackupOptions FilerBackupOptions
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdFilerBackup.Run = runFilerBackup // break init cycle
|
||||||
|
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
|
||||||
|
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
|
||||||
|
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
|
||||||
|
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
||||||
|
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
||||||
|
}
|
||||||
|
|
||||||
|
var cmdFilerBackup = &Command{
|
||||||
|
UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
|
||||||
|
Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
|
||||||
|
Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
|
||||||
|
|
||||||
|
filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
|
||||||
|
and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
|
||||||
|
|
||||||
|
If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
|
||||||
|
A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
|
||||||
|
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
func runFilerBackup(cmd *Command, args []string) bool {
|
||||||
|
|
||||||
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
||||||
|
|
||||||
|
util.LoadConfiguration("security", false)
|
||||||
|
util.LoadConfiguration("replication", true)
|
||||||
|
|
||||||
|
for {
|
||||||
|
err := doFilerBackup(grpcDialOption, &filerBackupOptions)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
|
||||||
|
time.Sleep(1747 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
BackupKeyPrefix = "backup."
|
||||||
|
)
|
||||||
|
|
||||||
|
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
|
||||||
|
|
||||||
|
// find data sink
|
||||||
|
config := util.GetViper()
|
||||||
|
dataSink := findSink(config)
|
||||||
|
if dataSink == nil {
|
||||||
|
return fmt.Errorf("no data sink configured in replication.toml")
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceFiler := *backupOption.filer
|
||||||
|
sourcePath := *backupOption.path
|
||||||
|
timeAgo := *backupOption.timeAgo
|
||||||
|
targetPath := dataSink.GetSinkToDirectory()
|
||||||
|
debug := *backupOption.debug
|
||||||
|
|
||||||
|
// get start time for the data sink
|
||||||
|
startFrom := time.Unix(0, 0)
|
||||||
|
sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
|
||||||
|
if timeAgo.Milliseconds() == 0 {
|
||||||
|
lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infof("starting from %v", startFrom)
|
||||||
|
} else {
|
||||||
|
startFrom = time.Unix(0, lastOffsetTsNs)
|
||||||
|
glog.V(0).Infof("resuming from %v", startFrom)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
startFrom = time.Now().Add(-timeAgo)
|
||||||
|
glog.V(0).Infof("start time is set to %v", startFrom)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create filer sink
|
||||||
|
filerSource := &source.FilerSource{}
|
||||||
|
filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler)
|
||||||
|
dataSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
|
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
|
||||||
|
|
||||||
|
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
|
||||||
|
ClientName: "backup_" + dataSink.GetName(),
|
||||||
|
PathPrefix: sourcePath,
|
||||||
|
SinceNs: startFrom.UnixNano(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("listen: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var counter int64
|
||||||
|
var lastWriteTime time.Time
|
||||||
|
for {
|
||||||
|
resp, listenErr := stream.Recv()
|
||||||
|
|
||||||
|
if listenErr == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if listenErr != nil {
|
||||||
|
return listenErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := processEventFn(resp); err != nil {
|
||||||
|
return fmt.Errorf("processEventFn: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
counter++
|
||||||
|
if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
|
||||||
|
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
|
||||||
|
counter = 0
|
||||||
|
lastWriteTime = time.Now()
|
||||||
|
if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
|
||||||
|
return fmt.Errorf("setOffset: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var dataSink sink.ReplicationSink
|
dataSink := findSink(config)
|
||||||
for _, sk := range sink.Sinks {
|
|
||||||
if config.GetBool("sink." + sk.GetName() + ".enabled") {
|
|
||||||
if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
|
|
||||||
glog.Fatalf("Failed to initialize sink for %s: %+v",
|
|
||||||
sk.GetName(), err)
|
|
||||||
}
|
|
||||||
glog.V(0).Infof("Configure sink to %s", sk.GetName())
|
|
||||||
dataSink = sk
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if dataSink == nil {
|
if dataSink == nil {
|
||||||
println("no data sink configured in replication.toml:")
|
println("no data sink configured in replication.toml:")
|
||||||
|
@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func findSink(config *util.ViperProxy) sink.ReplicationSink {
|
||||||
|
var dataSink sink.ReplicationSink
|
||||||
|
for _, sk := range sink.Sinks {
|
||||||
|
if config.GetBool("sink." + sk.GetName() + ".enabled") {
|
||||||
|
if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
|
||||||
|
glog.Fatalf("Failed to initialize sink for %s: %+v",
|
||||||
|
sk.GetName(), err)
|
||||||
|
}
|
||||||
|
glog.V(0).Infof("Configure sink to %s", sk.GetName())
|
||||||
|
dataSink = sk
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dataSink
|
||||||
|
}
|
||||||
|
|
||||||
func validateOneEnabledInput(config *util.ViperProxy) {
|
func validateOneEnabledInput(config *util.ViperProxy) {
|
||||||
enabledInput := ""
|
enabledInput := ""
|
||||||
for _, input := range sub.NotificationInputs {
|
for _, input := range sub.NotificationInputs {
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/replication"
|
"github.com/chrislusf/seaweedfs/weed/replication"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/replication/sink"
|
||||||
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
|
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
|
||||||
"github.com/chrislusf/seaweedfs/weed/replication/source"
|
"github.com/chrislusf/seaweedfs/weed/replication/source"
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
|
@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
|
||||||
|
|
||||||
// if first time, start from now
|
// if first time, start from now
|
||||||
// if has previously synced, resume from that point of time
|
// if has previously synced, resume from that point of time
|
||||||
sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
|
sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
|
||||||
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
||||||
filerSink.SetSourceFiler(filerSource)
|
filerSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
|
persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
|
||||||
|
|
||||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
|
||||||
var sourceOldKey, sourceNewKey util.FullPath
|
|
||||||
if message.OldEntry != nil {
|
|
||||||
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
|
|
||||||
}
|
|
||||||
if message.NewEntry != nil {
|
|
||||||
sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, sig := range message.Signatures {
|
for _, sig := range message.Signatures {
|
||||||
if sig == targetFilerSignature && targetFilerSignature != 0 {
|
if sig == targetFilerSignature && targetFilerSignature != 0 {
|
||||||
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
|
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if debug {
|
return persistEventFn(resp)
|
||||||
fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !strings.HasPrefix(resp.Directory, sourcePath) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle deletions
|
|
||||||
if message.OldEntry != nil && message.NewEntry == nil {
|
|
||||||
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
|
|
||||||
return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle new entries
|
|
||||||
if message.OldEntry == nil && message.NewEntry != nil {
|
|
||||||
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
|
|
||||||
return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
|
|
||||||
}
|
|
||||||
|
|
||||||
// this is something special?
|
|
||||||
if message.OldEntry == nil && message.NewEntry == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle updates
|
|
||||||
if strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
|
||||||
// old key is in the watched directory
|
|
||||||
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
|
||||||
// new key is also in the watched directory
|
|
||||||
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
|
|
||||||
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
|
|
||||||
foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
|
|
||||||
if foundExisting {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// not able to find old entry
|
|
||||||
if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
|
|
||||||
return fmt.Errorf("delete old entry %v: %v", oldKey, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the new entry
|
|
||||||
newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
|
|
||||||
return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// new key is outside of the watched directory
|
|
||||||
key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
|
|
||||||
return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// old key is outside of the watched directory
|
|
||||||
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
|
||||||
// new key is in the watched directory
|
|
||||||
key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
|
|
||||||
return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
|
|
||||||
} else {
|
|
||||||
// new key is also outside of the watched directory
|
|
||||||
// skip
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
|
||||||
glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
|
glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
|
||||||
counter = 0
|
counter = 0
|
||||||
lastWriteTime = time.Now()
|
lastWriteTime = time.Now()
|
||||||
if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
|
if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -290,11 +215,11 @@ const (
|
||||||
SyncKeyPrefix = "sync."
|
SyncKeyPrefix = "sync."
|
||||||
)
|
)
|
||||||
|
|
||||||
func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
|
func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
|
||||||
|
|
||||||
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
syncKey := []byte(SyncKeyPrefix + "____")
|
syncKey := []byte(signaturePrefix + "____")
|
||||||
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
|
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
|
||||||
|
|
||||||
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
|
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
|
func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
|
||||||
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
syncKey := []byte(SyncKeyPrefix + "____")
|
syncKey := []byte(signaturePrefix + "____")
|
||||||
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
|
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
|
||||||
|
|
||||||
valueBuf := make([]byte, 8)
|
valueBuf := make([]byte, 8)
|
||||||
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
|
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
|
||||||
|
@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
|
// process function
|
||||||
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
|
message := resp.EventNotification
|
||||||
|
|
||||||
|
var sourceOldKey, sourceNewKey util.FullPath
|
||||||
|
if message.OldEntry != nil {
|
||||||
|
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
|
||||||
|
}
|
||||||
|
if message.NewEntry != nil {
|
||||||
|
sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if debug {
|
||||||
|
glog.V(0).Infof("received %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.HasPrefix(resp.Directory, sourcePath) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle deletions
|
||||||
|
if message.OldEntry != nil && message.NewEntry == nil {
|
||||||
|
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
|
||||||
|
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle new entries
|
||||||
|
if message.OldEntry == nil && message.NewEntry != nil {
|
||||||
|
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
||||||
|
return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
|
||||||
|
}
|
||||||
|
|
||||||
|
// this is something special?
|
||||||
|
if message.OldEntry == nil && message.NewEntry == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle updates
|
||||||
|
if strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
||||||
|
// old key is in the watched directory
|
||||||
|
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
||||||
|
// new key is also in the watched directory
|
||||||
|
if !dataSink.IsIncremental() {
|
||||||
|
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
|
||||||
|
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
|
||||||
|
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
|
||||||
|
if foundExisting {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// not able to find old entry
|
||||||
|
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
|
||||||
|
return fmt.Errorf("delete old entry %v: %v", oldKey, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// create the new entry
|
||||||
|
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
||||||
|
return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// new key is outside of the watched directory
|
||||||
|
if !dataSink.IsIncremental() {
|
||||||
|
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
|
||||||
|
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// old key is outside of the watched directory
|
||||||
|
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
||||||
|
// new key is in the watched directory
|
||||||
|
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
||||||
|
return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
|
||||||
|
} else {
|
||||||
|
// new key is also outside of the watched directory
|
||||||
|
// skip
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return processEventFn
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
|
||||||
|
if !dataSink.IsIncremental() {
|
||||||
|
return util.Join(targetPath, string(sourceKey)[len(sourcePath):])
|
||||||
|
}
|
||||||
|
var mTime int64
|
||||||
|
if message.NewEntry != nil {
|
||||||
|
mTime = message.NewEntry.Attributes.Mtime
|
||||||
|
} else if message.OldEntry != nil {
|
||||||
|
mTime = message.OldEntry.Attributes.Mtime
|
||||||
|
}
|
||||||
|
dateKey := time.Unix(mTime, 0).Format("2006-01-02")
|
||||||
|
return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue