mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer.remote.sync: automatically detect the primary remote storage
This commit is contained in:
parent
1702ce5395
commit
c218ef20c7
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"google.golang.org/grpc"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -47,7 +48,7 @@ var (
|
|||
func init() {
|
||||
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
|
||||
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
|
||||
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
|
||||
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
|
||||
remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
|
||||
remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", false, "add randomized suffix to bucket name to avoid conflicts")
|
||||
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
|
||||
|
@ -66,9 +67,15 @@ var cmdFilerRemoteSynchronize = &Command{
|
|||
|
||||
There are two modes:
|
||||
1)Write back one mounted folder to remote storage
|
||||
|
||||
weed filer.remote.sync -dir=/mount/s3_on_cloud
|
||||
|
||||
2)Watch /buckets folder and write back all changes.
|
||||
Any new buckets will be created in this remote storage.
|
||||
|
||||
# if there is only one remote storage configured
|
||||
weed filer.remote.sync
|
||||
# if there are multiple remote storages configured
|
||||
weed filer.remote.sync -createBucketAt=cloud1
|
||||
|
||||
`,
|
||||
|
@ -91,9 +98,6 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
|
|||
*remoteSyncOptions.readChunkFromFiler,
|
||||
)
|
||||
|
||||
storageName := *remoteSyncOptions.createBucketAt
|
||||
if storageName != "" {
|
||||
|
||||
remoteSyncOptions.bucketsDir = "/buckets"
|
||||
// check buckets again
|
||||
remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
|
||||
|
@ -105,18 +109,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
|
|||
return nil
|
||||
})
|
||||
|
||||
fmt.Printf("synchronize %s, default new bucket creation in %s ...\n", remoteSyncOptions.bucketsDir, storageName)
|
||||
util.RetryForever("filer.remote.sync buckets "+storageName, func() error {
|
||||
return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource)
|
||||
}, func(err error) bool {
|
||||
if err != nil {
|
||||
glog.Errorf("synchronize %s to %s: %v", remoteSyncOptions.bucketsDir, storageName, err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
if dir != "" {
|
||||
if dir != "" && dir != remoteSyncOptions.bucketsDir {
|
||||
fmt.Printf("synchronize %s to remote storage...\n", dir)
|
||||
util.RetryForever("filer.remote.sync "+dir, func() error {
|
||||
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
|
||||
|
@ -126,7 +119,25 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
|
|||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// read filer remote storage mount mappings
|
||||
if detectErr := remoteSyncOptions.collectRemoteStorageConf(); detectErr != nil {
|
||||
fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr)
|
||||
return true
|
||||
}
|
||||
|
||||
// synchronize /buckets folder
|
||||
fmt.Printf("synchronize buckets in %s ...\n", remoteSyncOptions.bucketsDir)
|
||||
util.RetryForever("filer.remote.sync buckets", func() error {
|
||||
return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource)
|
||||
}, func(err error) bool {
|
||||
if err != nil {
|
||||
glog.Errorf("synchronize %s: %v", remoteSyncOptions.bucketsDir, err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
return true
|
||||
|
||||
}
|
||||
|
|
|
@ -51,6 +51,19 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.
|
|||
// this directory is imported from "remote.mount.buckets" or "remote.mount"
|
||||
return nil
|
||||
}
|
||||
if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
|
||||
*option.createBucketAt = option.mappings.PrimaryBucketStorageName
|
||||
glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
|
||||
}
|
||||
if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
|
||||
for k := range option.mappings.Mappings {
|
||||
*option.createBucketAt = k
|
||||
glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
|
||||
}
|
||||
}
|
||||
if *option.createBucketAt == "" {
|
||||
return nil
|
||||
}
|
||||
remoteConf, found := option.remoteConfs[*option.createBucketAt]
|
||||
if !found {
|
||||
return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
|
||||
|
@ -72,7 +85,7 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.
|
|||
|
||||
glog.V(0).Infof("create bucket %s", bucketName)
|
||||
if err := client.CreateBucket(bucketName); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
|
||||
}
|
||||
|
||||
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
|
||||
|
@ -95,12 +108,12 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.
|
|||
|
||||
client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
|
||||
}
|
||||
|
||||
glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
|
||||
if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
|
||||
}
|
||||
|
||||
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
|
||||
|
@ -351,6 +364,7 @@ func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) {
|
|||
}
|
||||
|
||||
option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
|
||||
var lastConfName string
|
||||
err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
|
||||
return nil
|
||||
|
@ -360,8 +374,14 @@ func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) {
|
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
|
||||
}
|
||||
option.remoteConfs[conf.Name] = conf
|
||||
lastConfName = conf.Name
|
||||
return nil
|
||||
}, "", false, math.MaxUint32)
|
||||
|
||||
if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
|
||||
glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
|
||||
option.mappings.PrimaryBucketStorageName = lastConfName
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ message RemoteConf {
|
|||
|
||||
message RemoteStorageMapping {
|
||||
map<string,RemoteStorageLocation> mappings = 1;
|
||||
string primary_bucket_storage_name = 2;
|
||||
}
|
||||
message RemoteStorageLocation {
|
||||
string name = 1;
|
||||
|
|
|
@ -401,6 +401,7 @@ type RemoteStorageMapping struct {
|
|||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
Mappings map[string]*RemoteStorageLocation `protobuf:"bytes,1,rep,name=mappings,proto3" json:"mappings,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
|
||||
PrimaryBucketStorageName string `protobuf:"bytes,2,opt,name=primary_bucket_storage_name,json=primaryBucketStorageName,proto3" json:"primary_bucket_storage_name,omitempty"`
|
||||
}
|
||||
|
||||
func (x *RemoteStorageMapping) Reset() {
|
||||
|
@ -442,6 +443,13 @@ func (x *RemoteStorageMapping) GetMappings() map[string]*RemoteStorageLocation {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *RemoteStorageMapping) GetPrimaryBucketStorageName() string {
|
||||
if x != nil {
|
||||
return x.PrimaryBucketStorageName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type RemoteStorageLocation struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
|
@ -619,31 +627,35 @@ var file_remote_proto_rawDesc = []byte{
|
|||
0x79, 0x18, 0x42, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x73, 0x74, 0x6f, 0x72, 0x6a, 0x53, 0x65,
|
||||
0x63, 0x72, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x74, 0x6f, 0x72, 0x6a,
|
||||
0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x43, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x0d, 0x73, 0x74, 0x6f, 0x72, 0x6a, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0xc0,
|
||||
0x0d, 0x73, 0x74, 0x6f, 0x72, 0x6a, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0xff,
|
||||
0x01, 0x0a, 0x14, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65,
|
||||
0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x49, 0x0a, 0x08, 0x6d, 0x61, 0x70, 0x70, 0x69,
|
||||
0x6e, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x72, 0x65, 0x6d, 0x6f,
|
||||
0x74, 0x65, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72,
|
||||
0x61, 0x67, 0x65, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x4d, 0x61, 0x70, 0x70, 0x69,
|
||||
0x6e, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x61, 0x70, 0x70, 0x69, 0x6e,
|
||||
0x67, 0x73, 0x1a, 0x5d, 0x0a, 0x0d, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e,
|
||||
0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x36, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02,
|
||||
0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62,
|
||||
0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4c, 0x6f,
|
||||
0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
|
||||
0x01, 0x22, 0x57, 0x0a, 0x15, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61,
|
||||
0x67, 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
|
||||
0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16,
|
||||
0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06,
|
||||
0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03,
|
||||
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x42, 0x50, 0x0a, 0x10, 0x73, 0x65,
|
||||
0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x0a,
|
||||
0x46, 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68,
|
||||
0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66,
|
||||
0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f,
|
||||
0x70, 0x62, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x33,
|
||||
0x67, 0x73, 0x12, 0x3d, 0x0a, 0x1b, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x62, 0x75,
|
||||
0x63, 0x6b, 0x65, 0x74, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d,
|
||||
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x18, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79,
|
||||
0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d,
|
||||
0x65, 0x1a, 0x5d, 0x0a, 0x0d, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e, 0x74,
|
||||
0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x03, 0x6b, 0x65, 0x79, 0x12, 0x36, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20,
|
||||
0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62, 0x2e,
|
||||
0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4c, 0x6f, 0x63,
|
||||
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
|
||||
0x22, 0x57, 0x0a, 0x15, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67,
|
||||
0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d,
|
||||
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a,
|
||||
0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62,
|
||||
0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x42, 0x50, 0x0a, 0x10, 0x73, 0x65, 0x61,
|
||||
0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x0a, 0x46,
|
||||
0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75,
|
||||
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f,
|
||||
0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70,
|
||||
0x62, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f,
|
||||
0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
Loading…
Reference in a new issue