From 3bd48c4f2925a725b347dff4e4d66928591e1598 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 1 Sep 2021 01:29:22 -0700 Subject: [PATCH] filer.remote.sync: exit when directory is unmounted this will not propagate the deletions back to the cloud --- weed/command/filer_remote_sync.go | 47 +++++++++++++-- weed/remote_storage/track_sync_offset.go | 73 ++++++++++++++++++++++++ weed/shell/command_remote_unmount.go | 23 +++++--- 3 files changed, 129 insertions(+), 14 deletions(-) create mode 100644 weed/remote_storage/track_sync_offset.go diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 43dc86f33..b751d30b6 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -12,7 +12,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" "google.golang.org/grpc" + "os" + "strings" "time" ) @@ -102,8 +105,6 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return fmt.Errorf("read mount info: %v", detectErr) } - dirHash := util.HashStringToLong(mountedDir) - // 1. specified by timeAgo // 2. last offset timestamp for this directory // 3. directory creation time @@ -114,7 +115,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return fmt.Errorf("lookup %s: %v", mountedDir, err) } - lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) + lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir) if mountedDirEntry != nil { if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { lastOffsetTs = time.Unix(0, lastOffsetTsNs) @@ -134,8 +135,44 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return err } + handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry == nil { + return nil + } + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %v", readErr) + } + if remoteLoc, found := mappings.Mappings[mountedDir]; found { + if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { + glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) + } + } else { + glog.V(0).Infof("unmounted %s exiting ...", mountedDir) + os.Exit(0) + } + } + if message.NewEntry.Name == remoteStorage.Name + filer.REMOTE_STORAGE_CONF_SUFFIX { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + remoteStorage = conf + client, err = remote_storage.GetRemoteStorage(remoteStorage) + return err + } + + return nil + } + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification + if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + return handleEtcRemoteChanges(resp) + } + if message.OldEntry == nil && message.NewEntry == nil { return nil } @@ -207,11 +244,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { lastTime := time.Unix(0, lastTsNs) glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) - return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) + return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs) }) return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", - mountedDir, nil, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) + mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { diff --git a/weed/remote_storage/track_sync_offset.go b/weed/remote_storage/track_sync_offset.go new file mode 100644 index 000000000..2dfb6d784 --- /dev/null +++ b/weed/remote_storage/track_sync_offset.go @@ -0,0 +1,73 @@ +package remote_storage + +import ( + "context" + "errors" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +const ( + SyncKeyPrefix = "remote.sync." +) + +func GetSyncOffset(grpcDialOption grpc.DialOption, filer string, dir string) (lastOffsetTsNs int64, readErr error) { + + dirHash := uint32(util.HashStringToLong(dir)) + + readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + syncKey := []byte(SyncKeyPrefix + "____") + util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) + + resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey}) + if err != nil { + return err + } + + if len(resp.Error) != 0 { + return errors.New(resp.Error) + } + if len(resp.Value) < 8 { + return nil + } + + lastOffsetTsNs = int64(util.BytesToUint64(resp.Value)) + + return nil + }) + + return + +} + +func SetSyncOffset(grpcDialOption grpc.DialOption, filer string, dir string, offsetTsNs int64) error { + + dirHash := uint32(util.HashStringToLong(dir)) + + return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + syncKey := []byte(SyncKeyPrefix + "____") + util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) + + valueBuf := make([]byte, 8) + util.Uint64toBytes(valueBuf, uint64(offsetTsNs)) + + resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{ + Key: syncKey, + Value: valueBuf, + }) + if err != nil { + return err + } + + if len(resp.Error) != 0 { + return errors.New(resp.Error) + } + + return nil + + }) + +} diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go index 9b61f5cfb..ed5887a4a 100644 --- a/weed/shell/command_remote_unmount.go +++ b/weed/shell/command_remote_unmount.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "io" + "time" ) func init() { @@ -30,9 +32,7 @@ func (c *commandRemoteUnmount) Help() string { remote.mount -dir=/xxx -remote=s3_1/bucket # unmount the mounted directory and remove its cache - # Make sure you have stopped "weed filer.remote.sync" first! - # Otherwise, the deletion will also be propagated to the remote storage!!! - remote.unmount -dir=/xxx -iHaveStoppedRemoteSync + remote.unmount -dir=/xxx ` } @@ -42,7 +42,6 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) dir := remoteMountCommand.String("dir", "", "a directory in filer") - hasStoppedRemoteSync := remoteMountCommand.Bool("iHaveStoppedRemoteSync", false, "confirm to stop weed filer.remote.sync first") if err = remoteMountCommand.Parse(args); err != nil { return nil @@ -61,17 +60,21 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer return fmt.Errorf("directory %s is not mounted", *dir) } - if !*hasStoppedRemoteSync { - return fmt.Errorf("make sure \"weed filer.remote.sync\" is stopped to avoid data loss") + // store a mount configuration in filer + fmt.Fprintf(writer, "deleting mount for %s ...\n", *dir) + if err = c.deleteMountMapping(commandEnv, *dir); err != nil { + return fmt.Errorf("delete mount mapping: %v", err) } + // purge mounted data + fmt.Fprintf(writer, "purge %s ...\n", *dir) if err = c.purgeMountedData(commandEnv, *dir); err != nil { return fmt.Errorf("purge mounted data: %v", err) } - // store a mount configuration in filer - if err = c.deleteMountMapping(commandEnv, *dir); err != nil { - return fmt.Errorf("delete mount mapping: %v", err) + // reset remote sync offset in case the folder is mounted again + if err = remote_storage.SetSyncOffset(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, *dir, time.Now().UnixNano()); err != nil { + return fmt.Errorf("reset remote.sync offset for %s: %v", *dir, err) } return nil @@ -100,6 +103,8 @@ func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir stri mkdirErr := filer_pb.DoMkdir(client, parent, name, func(entry *filer_pb.Entry) { entry.Attributes = oldEntry.Attributes entry.Extended = oldEntry.Extended + entry.Attributes.Crtime = time.Now().Unix() + entry.Attributes.Mtime = time.Now().Unix() }) if mkdirErr != nil { return fmt.Errorf("mkdir %s: %v", dir, mkdirErr)