mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
[sync] override amz storage class, None to delete (#3639)
* override amz storage class, None to delete https://github.com/seaweedfs/seaweedfs/issues/3636 * use empty string to delete * without nil check
This commit is contained in:
parent
3cb914f7e1
commit
b64674018a
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
type RemoteSyncOptions struct {
|
type RemoteSyncOptions struct {
|
||||||
filerAddress *string
|
filerAddress *string
|
||||||
|
storageClass *string
|
||||||
grpcDialOption grpc.DialOption
|
grpcDialOption grpc.DialOption
|
||||||
readChunkFromFiler *bool
|
readChunkFromFiler *bool
|
||||||
timeAgo *time.Duration
|
timeAgo *time.Duration
|
||||||
|
@ -45,6 +46,7 @@ func init() {
|
||||||
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
|
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
|
||||||
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
|
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
|
||||||
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
|
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
|
||||||
|
remoteSyncOptions.storageClass = cmdFilerRemoteSynchronize.Flag.String("storageClass", "None", "override amz storage class, empty to delete")
|
||||||
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
|
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
|
||||||
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
||||||
remoteSyncOptions.clientId = util.RandomInt32()
|
remoteSyncOptions.clientId = util.RandomInt32()
|
||||||
|
|
|
@ -37,6 +37,12 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
|
||||||
|
|
||||||
var lastLogTsNs = time.Now().UnixNano()
|
var lastLogTsNs = time.Now().UnixNano()
|
||||||
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
|
storageClass := *option.storageClass
|
||||||
|
if storageClass == "" {
|
||||||
|
delete(resp.EventNotification.NewEntry.Extended, s3_constants.AmzStorageClass)
|
||||||
|
} else if storageClass != "None" {
|
||||||
|
resp.EventNotification.NewEntry.Extended[s3_constants.AmzStorageClass] = []byte(storageClass)
|
||||||
|
}
|
||||||
processor.AddSyncJob(resp)
|
processor.AddSyncJob(resp)
|
||||||
return nil
|
return nil
|
||||||
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
||||||
|
|
|
@ -54,7 +54,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
|
||||||
|
|
||||||
// if is the oldest job, write down the watermark
|
// if is the oldest job, write down the watermark
|
||||||
isOldest := true
|
isOldest := true
|
||||||
for t, _ := range t.activeJobs {
|
for t := range t.activeJobs {
|
||||||
if resp.TsNs > t {
|
if resp.TsNs > t {
|
||||||
isOldest = false
|
isOldest = false
|
||||||
break
|
break
|
||||||
|
|
Loading…
Reference in a new issue