mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer.remote.sync: add option to add randomized suffix to buckets to avoid conflicts
This commit is contained in:
parent
d57d4c5f8f
commit
bdefdee4e6
|
@ -22,6 +22,7 @@ type RemoteSyncOptions struct {
|
||||||
timeAgo *time.Duration
|
timeAgo *time.Duration
|
||||||
dir *string
|
dir *string
|
||||||
createBucketAt *string
|
createBucketAt *string
|
||||||
|
createBucketRandomSuffix *bool
|
||||||
|
|
||||||
mappings *remote_pb.RemoteStorageMapping
|
mappings *remote_pb.RemoteStorageMapping
|
||||||
remoteConfs map[string]*remote_pb.RemoteConf
|
remoteConfs map[string]*remote_pb.RemoteConf
|
||||||
|
@ -52,6 +53,7 @@ func init() {
|
||||||
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
|
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
|
||||||
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
|
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
|
||||||
remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
|
remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
|
||||||
|
remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", false, "add randomized suffix to bucket name to avoid conflicts")
|
||||||
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
|
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
|
||||||
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
|
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
|
||||||
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"math"
|
"math"
|
||||||
|
"math/rand"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -56,18 +57,30 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("create bucket %s", entry.Name)
|
bucketName := strings.ToLower(entry.Name)
|
||||||
if err := client.CreateBucket(entry.Name); err != nil {
|
if *option.createBucketRandomSuffix {
|
||||||
|
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
|
||||||
|
if len(bucketName)+5 > 63 {
|
||||||
|
bucketName = bucketName[:58]
|
||||||
|
}
|
||||||
|
bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000)
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(0).Infof("create bucket %s", bucketName)
|
||||||
|
if err := client.CreateBucket(bucketName); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
|
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
|
||||||
remoteLocation := &remote_pb.RemoteStorageLocation{
|
remoteLocation := &remote_pb.RemoteStorageLocation{
|
||||||
Name: *option.createBucketAt,
|
Name: *option.createBucketAt,
|
||||||
Bucket: entry.Name,
|
Bucket: bucketName,
|
||||||
Path: "/",
|
Path: "/",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// need to add new mapping here before getting upates from metadata tailing
|
||||||
|
option.mappings.Mappings[string(bucketPath)] = remoteLocation
|
||||||
|
|
||||||
return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
|
return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -76,13 +89,13 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := option.findRemoteStorageClient(entry.Name)
|
client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("delete bucket %s", entry.Name)
|
glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
|
||||||
if err := client.DeleteBucket(entry.Name); err != nil {
|
if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,23 +290,24 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.
|
||||||
return eachEntryFunc, nil
|
return eachEntryFunc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (option *RemoteSyncOptions)findRemoteStorageClient(bucketName string) (remote_storage.RemoteStorageClient, error) {
|
func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
|
||||||
bucket := util.FullPath(option.bucketsDir).Child(bucketName)
|
bucket := util.FullPath(option.bucketsDir).Child(bucketName)
|
||||||
|
|
||||||
remoteStorageMountLocation, isMounted := option.mappings.Mappings[string(bucket)]
|
var isMounted bool
|
||||||
|
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
|
||||||
if !isMounted {
|
if !isMounted {
|
||||||
return nil, fmt.Errorf("%s is not mounted", bucket)
|
return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
|
||||||
}
|
}
|
||||||
remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
|
remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
|
||||||
if !hasClient {
|
if !hasClient {
|
||||||
return nil, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
|
return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := remote_storage.GetRemoteStorage(remoteConf)
|
client, err = remote_storage.GetRemoteStorage(remoteConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, remoteStorageMountLocation, err
|
||||||
}
|
}
|
||||||
return client, nil
|
return client, remoteStorageMountLocation, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
|
func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
|
||||||
|
|
Loading…
Reference in a new issue