separate filer.remote.gateway command to avoid confusion

This commit is contained in:
Chris Lu 2021-09-15 22:47:17 -07:00
parent 0c099c1bea
commit 63da4bbb54
4 changed files with 127 additions and 52 deletions

View file

@ -0,0 +1,113 @@
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"os"
"time"
)
type RemoteGatewayOptions struct {
filerAddress *string
grpcDialOption grpc.DialOption
readChunkFromFiler *bool
timeAgo *time.Duration
createBucketAt *string
createBucketRandomSuffix *bool
mappings *remote_pb.RemoteStorageMapping
remoteConfs map[string]*remote_pb.RemoteConf
bucketsDir string
}
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
func (option *RemoteGatewayOptions) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
var (
remoteGatewayOptions RemoteGatewayOptions
)
func init() {
cmdFilerRemoteGateway.Run = runFilerRemoteGateway // break init cycle
remoteGatewayOptions.filerAddress = cmdFilerRemoteGateway.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
remoteGatewayOptions.createBucketAt = cmdFilerRemoteGateway.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
remoteGatewayOptions.createBucketRandomSuffix = cmdFilerRemoteGateway.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts")
remoteGatewayOptions.readChunkFromFiler = cmdFilerRemoteGateway.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}
var cmdFilerRemoteGateway = &Command{
UsageLine: "filer.remote.gateway",
Short: "resumable continuously write back bucket creation, deletion, and other local updates to remote storage",
Long: `resumable continuously write back bucket creation, deletion, and other local updates to remote storage
filer.remote.gateway listens on filer local buckets update events.
If any bucket is created, deleted, or updated, it will mirror the changes to remote object store.
weed filer.remote.sync -createBucketAt=cloud1
`,
}
func runFilerRemoteGateway(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
remoteGatewayOptions.grpcDialOption = grpcDialOption
filerAddress := pb.ServerAddress(*remoteGatewayOptions.filerAddress)
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
filerAddress.ToHttpAddress(),
filerAddress.ToGrpcAddress(),
"/", // does not matter
*remoteGatewayOptions.readChunkFromFiler,
)
remoteGatewayOptions.bucketsDir = "/buckets"
// check buckets again
remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
remoteGatewayOptions.bucketsDir = resp.DirBuckets
return nil
})
// read filer remote storage mount mappings
if detectErr := remoteGatewayOptions.collectRemoteStorageConf(); detectErr != nil {
fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr)
return true
}
// synchronize /buckets folder
fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir)
util.RetryForever("filer.remote.sync buckets", func() error {
return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource)
}, func(err error) bool {
if err != nil {
glog.Errorf("synchronize %s: %v", remoteGatewayOptions.bucketsDir, err)
}
return true
})
return true
}

View file

@ -17,7 +17,7 @@ import (
"time" "time"
) )
func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error { func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
// read filer remote storage mount mappings // read filer remote storage mount mappings
if detectErr := option.collectRemoteStorageConf(); detectErr != nil { if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
@ -35,13 +35,13 @@ func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSourc
return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs) return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs)
}) })
lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir) lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
} }
func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
handleCreateBucket := func(entry *filer_pb.Entry) error { handleCreateBucket := func(entry *filer_pb.Entry) error {
if !entry.IsDirectory { if !entry.IsDirectory {
@ -307,7 +307,7 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.
return eachEntryFunc, nil return eachEntryFunc, nil
} }
func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
bucket := util.FullPath(option.bucketsDir).Child(bucketName) bucket := util.FullPath(option.bucketsDir).Child(bucketName)
var isMounted bool var isMounted bool
@ -327,7 +327,7 @@ func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (cli
return client, remoteStorageMountLocation, nil return client, remoteStorageMountLocation, nil
} }
func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
bucket, ok = extractBucketPath(option.bucketsDir, actualDir) bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
if !ok { if !ok {
return "", nil, nil, false return "", nil, nil, false
@ -355,7 +355,7 @@ func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
return util.FullPath(bucketsDir).Child(parts[0]), true return util.FullPath(bucketsDir).Child(parts[0]), true
} }
func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) { func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil { if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
return err return err

View file

@ -1,17 +1,14 @@
package command package command
import ( import (
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc" "google.golang.org/grpc"
"os"
"time" "time"
) )
@ -19,15 +16,9 @@ type RemoteSyncOptions struct {
filerAddress *string filerAddress *string
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
readChunkFromFiler *bool readChunkFromFiler *bool
debug *bool
timeAgo *time.Duration timeAgo *time.Duration
dir *string dir *string
createBucketAt *string
createBucketRandomSuffix *bool
mappings *remote_pb.RemoteStorageMapping
remoteConfs map[string]*remote_pb.RemoteConf
bucketsDir string
} }
var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
@ -49,10 +40,7 @@ func init() {
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer") remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts")
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
} }
@ -100,18 +88,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
*remoteSyncOptions.readChunkFromFiler, *remoteSyncOptions.readChunkFromFiler,
) )
remoteSyncOptions.bucketsDir = "/buckets" if dir != "" {
// check buckets again
remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
remoteSyncOptions.bucketsDir = resp.DirBuckets
return nil
})
if dir != "" && dir != remoteSyncOptions.bucketsDir {
fmt.Printf("synchronize %s to remote storage...\n", dir) fmt.Printf("synchronize %s to remote storage...\n", dir)
util.RetryForever("filer.remote.sync "+dir, func() error { util.RetryForever("filer.remote.sync "+dir, func() error {
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
@ -124,22 +101,6 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
return true return true
} }
// read filer remote storage mount mappings
if detectErr := remoteSyncOptions.collectRemoteStorageConf(); detectErr != nil {
fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr)
return true
}
// synchronize /buckets folder
fmt.Printf("synchronize buckets in %s ...\n", remoteSyncOptions.bucketsDir)
util.RetryForever("filer.remote.sync buckets", func() error {
return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource)
}, func(err error) bool {
if err != nil {
glog.Errorf("synchronize %s: %v", remoteSyncOptions.bucketsDir, err)
}
return true
})
return true return true
} }

View file

@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"os" "os"
"strings" "strings"
"time" "time"
@ -36,7 +37,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, lastTsNs) return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, lastTsNs)
}) })
lastOffsetTs := collectLastSyncOffset(option, mountedDir) lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
@ -159,19 +160,19 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
return eachEntryFunc, nil return eachEntryFunc, nil
} }
func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time { func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time {
// 1. specified by timeAgo // 1. specified by timeAgo
// 2. last offset timestamp for this directory // 2. last offset timestamp for this directory
// 3. directory creation time // 3. directory creation time
var lastOffsetTs time.Time var lastOffsetTs time.Time
if *option.timeAgo == 0 { if timeAgo == 0 {
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) mountedDirEntry, err := filer_pb.GetEntry(filerClient, util.FullPath(mountedDir))
if err != nil { if err != nil {
glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err) glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
return time.Now() return time.Now()
} }
lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir) lastOffsetTsNs, err := remote_storage.GetSyncOffset(grpcDialOption, filerAddress, mountedDir)
if mountedDirEntry != nil { if mountedDirEntry != nil {
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
lastOffsetTs = time.Unix(0, lastOffsetTsNs) lastOffsetTs = time.Unix(0, lastOffsetTsNs)
@ -183,7 +184,7 @@ func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Ti
lastOffsetTs = time.Now() lastOffsetTs = time.Now()
} }
} else { } else {
lastOffsetTs = time.Now().Add(-*option.timeAgo) lastOffsetTs = time.Now().Add(-timeAgo)
} }
return lastOffsetTs return lastOffsetTs
} }