2021-09-16 05:47:17 +00:00
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"os"
"time"
)
type RemoteGatewayOptions struct {
filerAddress * string
grpcDialOption grpc . DialOption
readChunkFromFiler * bool
timeAgo * time . Duration
createBucketAt * string
createBucketRandomSuffix * bool
2021-09-16 06:04:16 +00:00
include * string
exclude * string
2021-09-16 05:47:17 +00:00
mappings * remote_pb . RemoteStorageMapping
remoteConfs map [ string ] * remote_pb . RemoteConf
bucketsDir string
}
var _ = filer_pb . FilerClient ( & RemoteGatewayOptions { } )
func ( option * RemoteGatewayOptions ) WithFilerClient ( fn func ( filer_pb . SeaweedFilerClient ) error ) error {
return pb . WithFilerClient ( pb . ServerAddress ( * option . filerAddress ) , option . grpcDialOption , func ( client filer_pb . SeaweedFilerClient ) error {
return fn ( client )
} )
}
func ( option * RemoteGatewayOptions ) AdjustedUrl ( location * filer_pb . Location ) string {
return location . Url
}
var (
remoteGatewayOptions RemoteGatewayOptions
)
func init ( ) {
cmdFilerRemoteGateway . Run = runFilerRemoteGateway // break init cycle
remoteGatewayOptions . filerAddress = cmdFilerRemoteGateway . Flag . String ( "filer" , "localhost:8888" , "filer of the SeaweedFS cluster" )
remoteGatewayOptions . createBucketAt = cmdFilerRemoteGateway . Flag . String ( "createBucketAt" , "" , "one remote storage name to create new buckets in" )
remoteGatewayOptions . createBucketRandomSuffix = cmdFilerRemoteGateway . Flag . Bool ( "createBucketWithRandomSuffix" , true , "add randomized suffix to bucket name to avoid conflicts" )
remoteGatewayOptions . readChunkFromFiler = cmdFilerRemoteGateway . Flag . Bool ( "filerProxy" , false , "read file chunks from filer instead of volume servers" )
remoteGatewayOptions . timeAgo = cmdFilerRemoteGateway . Flag . Duration ( "timeAgo" , 0 , "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"" )
2021-09-16 06:04:16 +00:00
remoteGatewayOptions . include = cmdFilerRemoteGateway . Flag . String ( "include" , "" , "pattens of new bucket names, e.g., s3*" )
remoteGatewayOptions . exclude = cmdFilerRemoteGateway . Flag . String ( "exclude" , "" , "pattens of new bucket names, e.g., local*" )
2021-09-16 05:47:17 +00:00
}
var cmdFilerRemoteGateway = & Command {
UsageLine : "filer.remote.gateway" ,
2021-09-16 05:53:10 +00:00
Short : "resumable continuously write back bucket creation, deletion, and other local updates to remote object store" ,
Long : ` resumable continuously write back bucket creation , deletion , and other local updates to remote object store
2021-09-16 05:47:17 +00:00
filer . remote . gateway listens on filer local buckets update events .
If any bucket is created , deleted , or updated , it will mirror the changes to remote object store .
weed filer . remote . sync - createBucketAt = cloud1
` ,
}
func runFilerRemoteGateway ( cmd * Command , args [ ] string ) bool {
util . LoadConfiguration ( "security" , false )
grpcDialOption := security . LoadClientTLS ( util . GetViper ( ) , "grpc.client" )
remoteGatewayOptions . grpcDialOption = grpcDialOption
filerAddress := pb . ServerAddress ( * remoteGatewayOptions . filerAddress )
filerSource := & source . FilerSource { }
filerSource . DoInitialize (
filerAddress . ToHttpAddress ( ) ,
filerAddress . ToGrpcAddress ( ) ,
"/" , // does not matter
* remoteGatewayOptions . readChunkFromFiler ,
)
remoteGatewayOptions . bucketsDir = "/buckets"
// check buckets again
remoteGatewayOptions . WithFilerClient ( func ( filerClient filer_pb . SeaweedFilerClient ) error {
resp , err := filerClient . GetFilerConfiguration ( context . Background ( ) , & filer_pb . GetFilerConfigurationRequest { } )
if err != nil {
return err
}
remoteGatewayOptions . bucketsDir = resp . DirBuckets
return nil
} )
// read filer remote storage mount mappings
if detectErr := remoteGatewayOptions . collectRemoteStorageConf ( ) ; detectErr != nil {
fmt . Fprintf ( os . Stderr , "read mount info: %v\n" , detectErr )
return true
}
// synchronize /buckets folder
fmt . Printf ( "synchronize buckets in %s ...\n" , remoteGatewayOptions . bucketsDir )
util . RetryForever ( "filer.remote.sync buckets" , func ( ) error {
return remoteGatewayOptions . followBucketUpdatesAndUploadToRemote ( filerSource )
} , func ( err error ) bool {
if err != nil {
glog . Errorf ( "synchronize %s: %v" , remoteGatewayOptions . bucketsDir , err )
}
return true
} )
return true
}