2021-08-08 08:21:42 +00:00
package command
import (
2021-08-09 00:55:03 +00:00
"context"
2021-08-08 08:21:42 +00:00
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"time"
)
type RemoteSyncOptions struct {
filerAddress * string
grpcDialOption grpc . DialOption
readChunkFromFiler * bool
debug * bool
timeAgo * time . Duration
dir * string
}
const (
RemoteSyncKeyPrefix = "remote.sync."
)
var _ = filer_pb . FilerClient ( & RemoteSyncOptions { } )
func ( option * RemoteSyncOptions ) WithFilerClient ( fn func ( filer_pb . SeaweedFilerClient ) error ) error {
return pb . WithFilerClient ( * option . filerAddress , option . grpcDialOption , func ( client filer_pb . SeaweedFilerClient ) error {
return fn ( client )
} )
}
func ( option * RemoteSyncOptions ) AdjustedUrl ( location * filer_pb . Location ) string {
return location . Url
}
var (
remoteSyncOptions RemoteSyncOptions
)
func init ( ) {
cmdFilerRemoteSynchronize . Run = runFilerRemoteSynchronize // break init cycle
remoteSyncOptions . filerAddress = cmdFilerRemoteSynchronize . Flag . String ( "filer" , "localhost:8888" , "filer of the SeaweedFS cluster" )
remoteSyncOptions . dir = cmdFilerRemoteSynchronize . Flag . String ( "dir" , "/" , "a mounted directory on filer" )
remoteSyncOptions . readChunkFromFiler = cmdFilerRemoteSynchronize . Flag . Bool ( "filerProxy" , false , "read file chunks from filer instead of volume servers" )
remoteSyncOptions . debug = cmdFilerRemoteSynchronize . Flag . Bool ( "debug" , false , "debug mode to print out filer updated remote files" )
remoteSyncOptions . timeAgo = cmdFilerRemoteSynchronize . Flag . Duration ( "timeAgo" , 0 , "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"" )
}
var cmdFilerRemoteSynchronize = & Command {
UsageLine : "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud" ,
2021-08-09 05:30:12 +00:00
Short : "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage" ,
Long : ` resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
2021-08-08 08:21:42 +00:00
filer . remote . sync listens on filer update events .
If any mounted remote file is updated , it will fetch the updated content ,
and write to the remote storage .
` ,
}
func runFilerRemoteSynchronize ( cmd * Command , args [ ] string ) bool {
util . LoadConfiguration ( "security" , false )
grpcDialOption := security . LoadClientTLS ( util . GetViper ( ) , "grpc.client" )
remoteSyncOptions . grpcDialOption = grpcDialOption
2021-08-15 19:38:26 +00:00
dir := * remoteSyncOptions . dir
filerAddress := * remoteSyncOptions . filerAddress
2021-08-08 08:21:42 +00:00
// read filer remote storage mount mappings
2021-08-15 19:38:26 +00:00
_ , _ , remoteStorageMountLocation , storageConf , detectErr := filer . DetectMountInfo ( grpcDialOption , filerAddress , dir )
if detectErr != nil {
fmt . Printf ( "read mount info: %v" , detectErr )
2021-08-08 08:21:42 +00:00
return false
}
filerSource := & source . FilerSource { }
filerSource . DoInitialize (
2021-08-15 19:38:26 +00:00
filerAddress ,
pb . ServerToGrpcAddress ( filerAddress ) ,
2021-08-08 08:21:42 +00:00
"/" , // does not matter
* remoteSyncOptions . readChunkFromFiler ,
)
2021-08-15 19:38:26 +00:00
fmt . Printf ( "synchronize %s to remote storage...\n" , dir )
util . RetryForever ( "filer.remote.sync " + dir , func ( ) error {
return followUpdatesAndUploadToRemote ( & remoteSyncOptions , filerSource , dir , storageConf , remoteStorageMountLocation )
} , func ( err error ) bool {
if err != nil {
2021-08-16 02:46:45 +00:00
glog . Errorf ( "synchronize %s: %v" , dir , err )
2021-08-08 08:21:42 +00:00
}
2021-08-15 19:38:26 +00:00
return true
} )
2021-08-08 08:21:42 +00:00
return true
}
func followUpdatesAndUploadToRemote ( option * RemoteSyncOptions , filerSource * source . FilerSource , mountedDir string , remoteStorage * filer_pb . RemoteConf , remoteStorageMountLocation * filer_pb . RemoteStorageLocation ) error {
dirHash := util . HashStringToLong ( mountedDir )
// 1. specified by timeAgo
// 2. last offset timestamp for this directory
// 3. directory creation time
var lastOffsetTs time . Time
if * option . timeAgo == 0 {
mountedDirEntry , err := filer_pb . GetEntry ( option , util . FullPath ( mountedDir ) )
if err != nil {
return fmt . Errorf ( "lookup %s: %v" , mountedDir , err )
}
lastOffsetTsNs , err := getOffset ( option . grpcDialOption , * option . filerAddress , RemoteSyncKeyPrefix , int32 ( dirHash ) )
if err == nil && mountedDirEntry . Attributes . Crtime < lastOffsetTsNs / 1000000 {
lastOffsetTs = time . Unix ( 0 , lastOffsetTsNs )
glog . V ( 0 ) . Infof ( "resume from %v" , lastOffsetTs )
} else {
lastOffsetTs = time . Unix ( mountedDirEntry . Attributes . Crtime , 0 )
}
} else {
lastOffsetTs = time . Now ( ) . Add ( - * option . timeAgo )
}
client , err := remote_storage . GetRemoteStorage ( remoteStorage )
if err != nil {
return err
}
eachEntryFunc := func ( resp * filer_pb . SubscribeMetadataResponse ) error {
message := resp . EventNotification
if message . OldEntry == nil && message . NewEntry == nil {
return nil
}
if message . OldEntry == nil && message . NewEntry != nil {
2021-08-16 02:46:45 +00:00
if ! filer . HasData ( message . NewEntry ) {
2021-08-08 08:21:42 +00:00
return nil
}
2021-08-16 02:46:45 +00:00
glog . V ( 2 ) . Infof ( "create: %+v" , resp )
2021-08-08 22:58:10 +00:00
if ! shouldSendToRemote ( message . NewEntry ) {
2021-08-16 02:46:45 +00:00
glog . V ( 2 ) . Infof ( "skipping creating: %+v" , resp )
2021-08-08 22:58:10 +00:00
return nil
}
2021-08-08 08:21:42 +00:00
dest := toRemoteStorageLocation ( util . FullPath ( mountedDir ) , util . NewFullPath ( message . NewParentPath , message . NewEntry . Name ) , remoteStorageMountLocation )
2021-08-09 21:35:18 +00:00
if message . NewEntry . IsDirectory {
2021-08-16 03:07:13 +00:00
glog . V ( 0 ) . Infof ( "mkdir %s" , remote_storage . FormatLocation ( dest ) )
2021-08-09 21:35:18 +00:00
return client . WriteDirectory ( dest , message . NewEntry )
}
2021-08-16 03:07:13 +00:00
glog . V ( 0 ) . Infof ( "create %s" , remote_storage . FormatLocation ( dest ) )
2021-08-16 02:46:45 +00:00
reader := filer . NewFileReader ( filerSource , message . NewEntry )
2021-08-09 00:55:03 +00:00
remoteEntry , writeErr := client . WriteFile ( dest , message . NewEntry , reader )
if writeErr != nil {
return writeErr
}
return updateLocalEntry ( & remoteSyncOptions , message . NewParentPath , message . NewEntry , remoteEntry )
2021-08-08 08:21:42 +00:00
}
if message . OldEntry != nil && message . NewEntry == nil {
2021-08-16 02:46:45 +00:00
glog . V ( 2 ) . Infof ( "delete: %+v" , resp )
2021-08-08 08:21:42 +00:00
dest := toRemoteStorageLocation ( util . FullPath ( mountedDir ) , util . NewFullPath ( resp . Directory , message . OldEntry . Name ) , remoteStorageMountLocation )
2021-08-16 03:07:13 +00:00
glog . V ( 0 ) . Infof ( "delete %s" , remote_storage . FormatLocation ( dest ) )
2021-08-08 08:21:42 +00:00
return client . DeleteFile ( dest )
}
if message . OldEntry != nil && message . NewEntry != nil {
oldDest := toRemoteStorageLocation ( util . FullPath ( mountedDir ) , util . NewFullPath ( resp . Directory , message . OldEntry . Name ) , remoteStorageMountLocation )
dest := toRemoteStorageLocation ( util . FullPath ( mountedDir ) , util . NewFullPath ( message . NewParentPath , message . NewEntry . Name ) , remoteStorageMountLocation )
2021-08-08 22:58:10 +00:00
if ! shouldSendToRemote ( message . NewEntry ) {
2021-08-16 02:46:45 +00:00
glog . V ( 2 ) . Infof ( "skipping updating: %+v" , resp )
2021-08-08 22:58:10 +00:00
return nil
}
2021-08-09 21:35:18 +00:00
if message . NewEntry . IsDirectory {
return client . WriteDirectory ( dest , message . NewEntry )
}
2021-08-08 08:21:42 +00:00
if resp . Directory == message . NewParentPath && message . OldEntry . Name == message . NewEntry . Name {
2021-08-16 02:46:45 +00:00
if filer . IsSameData ( message . OldEntry , message . NewEntry ) {
glog . V ( 2 ) . Infof ( "update meta: %+v" , resp )
2021-08-16 02:27:30 +00:00
return client . UpdateFileMetadata ( dest , message . OldEntry , message . NewEntry )
2021-08-08 08:21:42 +00:00
}
}
2021-08-16 02:46:45 +00:00
glog . V ( 2 ) . Infof ( "update: %+v" , resp )
2021-08-16 03:07:13 +00:00
glog . V ( 0 ) . Infof ( "delete %s" , remote_storage . FormatLocation ( oldDest ) )
2021-08-08 08:21:42 +00:00
if err := client . DeleteFile ( oldDest ) ; err != nil {
return err
}
2021-08-16 02:46:45 +00:00
reader := filer . NewFileReader ( filerSource , message . NewEntry )
2021-08-16 03:07:13 +00:00
glog . V ( 0 ) . Infof ( "create %s" , remote_storage . FormatLocation ( dest ) )
2021-08-09 00:55:03 +00:00
remoteEntry , writeErr := client . WriteFile ( dest , message . NewEntry , reader )
if writeErr != nil {
return writeErr
}
return updateLocalEntry ( & remoteSyncOptions , message . NewParentPath , message . NewEntry , remoteEntry )
2021-08-08 08:21:42 +00:00
}
return nil
}
processEventFnWithOffset := pb . AddOffsetFunc ( eachEntryFunc , 3 * time . Second , func ( counter int64 , lastTsNs int64 ) error {
lastTime := time . Unix ( 0 , lastTsNs )
glog . V ( 0 ) . Infof ( "remote sync %s progressed to %v %0.2f/sec" , * option . filerAddress , lastTime , float64 ( counter ) / float64 ( 3 ) )
return setOffset ( option . grpcDialOption , * option . filerAddress , RemoteSyncKeyPrefix , int32 ( dirHash ) , lastTsNs )
} )
return pb . FollowMetadata ( * option . filerAddress , option . grpcDialOption ,
"filer.remote.sync" , mountedDir , lastOffsetTs . UnixNano ( ) , 0 , processEventFnWithOffset , false )
}
func toRemoteStorageLocation ( mountDir , sourcePath util . FullPath , remoteMountLocation * filer_pb . RemoteStorageLocation ) * filer_pb . RemoteStorageLocation {
source := string ( sourcePath [ len ( mountDir ) : ] )
2021-08-09 21:35:18 +00:00
dest := util . FullPath ( remoteMountLocation . Path ) . Child ( source )
2021-08-08 08:21:42 +00:00
return & filer_pb . RemoteStorageLocation {
Name : remoteMountLocation . Name ,
Bucket : remoteMountLocation . Bucket ,
2021-08-09 21:35:18 +00:00
Path : string ( dest ) ,
2021-08-08 08:21:42 +00:00
}
}
2021-08-08 22:58:10 +00:00
func shouldSendToRemote ( entry * filer_pb . Entry ) bool {
if entry . RemoteEntry == nil {
return true
}
2021-08-15 04:46:34 +00:00
if entry . RemoteEntry . LastLocalSyncTsNs / 1e9 < entry . Attributes . Mtime {
2021-08-08 22:58:10 +00:00
return true
}
return false
}
2021-08-09 00:55:03 +00:00
func updateLocalEntry ( filerClient filer_pb . FilerClient , dir string , entry * filer_pb . Entry , remoteEntry * filer_pb . RemoteEntry ) error {
entry . RemoteEntry = remoteEntry
return filerClient . WithFilerClient ( func ( client filer_pb . SeaweedFilerClient ) error {
_ , err := client . UpdateEntry ( context . Background ( ) , & filer_pb . UpdateEntryRequest {
2021-08-09 05:30:36 +00:00
Directory : dir ,
Entry : entry ,
2021-08-09 00:55:03 +00:00
} )
return err
} )
2021-08-09 05:30:36 +00:00
}