From 02ae102731496689885dd133e2b6aa57ba634b4e Mon Sep 17 00:00:00 2001 From: creeew Date: Thu, 2 Jun 2022 01:28:47 +0800 Subject: [PATCH 1/2] fix filer.sync missing source srv uploaded files to target when target down --- .../load_test_meta_tail.go | 2 +- weed/command/filer_backup.go | 2 +- weed/command/filer_meta_backup.go | 2 +- weed/command/filer_meta_tail.go | 2 +- weed/command/filer_remote_gateway_buckets.go | 2 +- weed/command/filer_remote_sync_dir.go | 2 +- weed/command/filer_sync.go | 2 +- weed/filer/filer.go | 2 +- weed/mount/meta_cache/meta_cache_subscribe.go | 2 +- weed/pb/filer_pb_tail.go | 33 +++++++++++++++---- weed/s3api/auth_credentials_subscribe.go | 2 +- 11 files changed, 36 insertions(+), 17 deletions(-) diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index aa9840b7f..d4b9d63b1 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -77,7 +77,7 @@ func startGenerateMetadata() { func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) { - tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpc.WithInsecure(), "tail", 0, *dir, nil, 0, 0, 0, eachEntryFunc, false) + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpc.WithInsecure(), "tail", 0, *dir, nil, 0, 0, 0, eachEntryFunc, pb.TrivialOnError) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index f029e4ef2..d191c693b 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -114,6 +114,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, false) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index cced19faa..cf679885d 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -195,7 +195,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { }) return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, - *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, false) + *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index cc3e014c6..66a87c3d9 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -119,7 +119,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { return err } return nil - }, false) + }, pb.TrivialOnError) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 38d936826..c3ff756db 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -39,7 +39,7 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, - option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, false) + option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 02e7218e9..5fc20be9a 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -41,7 +41,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, - mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, false) + mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 2dd780a75..deb458525 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -178,7 +178,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, }) return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId, - sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, false) + sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) } diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 81d2aa158..42c25838e 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -85,7 +85,7 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil, 0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error { return Replay(f.Store, resp) - }, true) + }, pb.FatalOnError) return } diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index c30ec3699..c8ccdd375 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -58,7 +58,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } util.RetryForever("followMetaUpdates", func() error { - return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, 0, selfSignature, processEventFn, true) + return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, 0, selfSignature, processEventFn, pb.FatalOnError) }, func(err error) bool { glog.Errorf("follow metadata updates: %v", err) return true diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 0fba972d1..2d6a7898b 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -5,19 +5,28 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" "io" "time" ) +type EventErrorType int + +const ( + TrivialOnError EventErrorType = iota + FatalOnError + RetryForeverOnError +) + type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32, pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32, - processEventFn ProcessMetadataFunc, fatalOnError bool) error { + processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { err := WithFilerClient(true, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, - pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, fatalOnError)) + pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) } @@ -26,10 +35,10 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName string, clientId int32, pathPrefix string, lastTsNs *int64, untilTsNs int64, selfSignature int32, - processEventFn ProcessMetadataFunc, fatalOnError bool) error { + processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(clientName, clientId, - pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, fatalOnError)) + pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) } @@ -38,7 +47,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, } func makeSubscribeMetadataFunc(clientName string, clientId int32, pathPrefix string, additionalPathPrefixes []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, - processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error { + processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) func(client filer_pb.SeaweedFilerClient) error { return func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -65,9 +74,19 @@ func makeSubscribeMetadataFunc(clientName string, clientId int32, pathPrefix str } if err := processEventFn(resp); err != nil { - if fatalOnError { + switch eventErrorType { + case TrivialOnError: + glog.Errorf("process %v: %v", resp, err) + case FatalOnError: glog.Fatalf("process %v: %v", resp, err) - } else { + case RetryForeverOnError: + util.RetryForever("followMetaUpdates", func() error { + return processEventFn(resp) + }, func(err error) bool { + glog.Errorf("process %v: %v", resp, err) + return true + }) + default: glog.Errorf("process %v: %v", resp, err) } } diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 48c4e35c8..91fd5d830 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -33,7 +33,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la } util.RetryForever("followIamChanges", func() error { - return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, prefix, &lastTsNs, 0, 0, processEventFn, true) + return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, prefix, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError) }, func(err error) bool { glog.V(0).Infof("iam follow metadata changes: %v", err) return true From 47d335cf8cbe9bdbb3929281271c34953cba5588 Mon Sep 17 00:00:00 2001 From: Kaiwalya Joshi Date: Wed, 1 Jun 2022 15:47:10 -0700 Subject: [PATCH 2/2] feat: Send commands to weed shell from the docker image. Add the ability to send commands to weed shell from the docker image. Allows an operator to perform maintenance commands like so: ``` docker run \ --rm \ -e SHELL_FILER=localhost:8888 \ -e SHELL_MASTER=localhost:9333 \ chrislusf/seaweedfs:local \ "shell" \ "fs.configure -locationPrefix=/buckets/foo -volumeGrowthCount=3 -replication=002 -apply" ``` --- docker/entrypoint.sh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 6818d9581..80a7fe586 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -57,6 +57,12 @@ case "$1" in exec /usr/bin/weed -logtostderr=true s3 $ARGS $@ ;; + 'shell') + ARGS="-cluster=$SHELL_CLUSTER -filer=$SHELL_FILER -filerGroup=$SHELL_FILER_GROUP -master=$SHELL_MASTER -options=$SHELL_OPTIONS" + shift + exec echo "$@" | /usr/bin/weed -logtostderr=true shell $ARGS + ;; + *) exec /usr/bin/weed $@ ;;