diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index 762af2088..eb08e5353 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -79,7 +79,19 @@ func startGenerateMetadata() { func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) { - tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), "tail", 0, 0, *dir, nil, 0, 0, 0, eachEntryFunc, pb.TrivialOnError) + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "tail", + ClientId: 0, + ClientEpoch: 0, + SelfSignature: 0, + PathPrefix: *dir, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: 0, + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), metadataFollowOption, eachEntryFunc) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 6f379a6d6..b51dd65b6 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -138,6 +138,19 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti }() } - return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, clientEpoch, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "backup_" + dataSink.GetName(), + ClientId: clientId, + ClientEpoch: clientEpoch, + SelfSignature: 0, + PathPrefix: sourcePath, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: startFrom.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + + return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index f2cba9382..ff4a61e41 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -196,8 +196,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { }) metaBackup.clientEpoch++ - return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, metaBackup.clientEpoch, - *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "meta_backup", + ClientId: metaBackup.clientId, + ClientEpoch: metaBackup.clientEpoch, + SelfSignature: 0, + PathPrefix: *metaBackup.filerDirectory, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: startTime.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + + return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, metadataFollowOption, processEventFnWithOffset) } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 58a496ff4..32855072b 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -107,16 +107,28 @@ func runFilerMetaTail(cmd *Command, args []string) bool { untilTsNs = time.Now().Add(-*tailStop).UnixNano() } - tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, 0, *tailTarget, nil, - time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error { - if !shouldPrint(resp) { - return nil - } - if err := eachEntryFunc(resp); err != nil { - return err - } + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "tail", + ClientId: clientId, + ClientEpoch: 0, + SelfSignature: 0, + PathPrefix: *tailTarget, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: time.Now().Add(-*tailStart).UnixNano(), + StopTsNs: untilTsNs, + EventErrorType: pb.TrivialOnError, + } + + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, metadataFollowOption, func(resp *filer_pb.SubscribeMetadataResponse) error { + if !shouldPrint(resp) { return nil - }, pb.TrivialOnError) + } + if err := eachEntryFunc(resp); err != nil { + return err + } + return nil + }) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index c5fce44f8..e884deaad 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -39,8 +39,21 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) option.clientEpoch++ - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch, - option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "filer.remote.sync", + ClientId: option.clientId, + ClientEpoch: option.clientEpoch, + SelfSignature: 0, + PathPrefix: option.bucketsDir, + AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote}, + DirectoriesToWatch: nil, + StartTsNs: lastOffsetTs.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset) } func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 5bc6ae300..1dc91b5d5 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -63,8 +63,21 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) option.clientEpoch++ - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch, - mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "filer.remote.sync", + ClientId: option.clientId, + ClientEpoch: option.clientEpoch, + SelfSignature: 0, + PathPrefix: mountedDir, + AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote}, + DirectoriesToWatch: nil, + StartTsNs: lastOffsetTs.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset) } func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index efef6250e..fcf60ae87 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -287,8 +287,20 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch, - sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: clientName, + ClientId: clientId, + ClientEpoch: clientEpoch, + SelfSignature: targetFilerSignature, + PathPrefix: sourcePath, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: sourceFilerOffsetTsNs, + StopTsNs: 0, + EventErrorType: pb.RetryForeverOnError, + } + + return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) } diff --git a/weed/filer/filer.go b/weed/filer/filer.go index f6f06d9b2..8ba8f9cfa 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -87,10 +87,23 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFilerId) f.UniqueFilerEpoch++ - err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", f.UniqueFilerId, f.UniqueFilerEpoch, "/", nil, - 0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error { - return Replay(f.Store, resp) - }, pb.FatalOnError) + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "bootstrap", + ClientId: f.UniqueFilerId, + ClientEpoch: f.UniqueFilerEpoch, + SelfSignature: f.Signature, + PathPrefix: "/", + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: snapshotTime.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.FatalOnError, + } + + err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, metadataFollowOption, func(resp *filer_pb.SubscribeMetadataResponse) error { + return Replay(f.Store, resp) + }) return } diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index f52660a42..d5d8150bc 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -58,9 +58,21 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } var clientEpoch int32 + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "mount", + ClientId: selfSignature, + ClientEpoch: clientEpoch, + SelfSignature: selfSignature, + PathPrefix: dir, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: lastTsNs, + StopTsNs: 0, + EventErrorType: pb.FatalOnError, + } util.RetryForever("followMetaUpdates", func() error { clientEpoch++ - return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, clientEpoch, dir, nil, &lastTsNs, 0, selfSignature, processEventFn, pb.FatalOnError) + return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn) }, func(err error) bool { glog.Errorf("follow metadata updates: %v", err) return true diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 32fa4d497..674d55ad8 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -19,22 +19,35 @@ const ( RetryForeverOnError ) +// MetadataFollowOption is used to control the behavior of the metadata following +// process. Part of it is used as a cursor to resume the following process. +type MetadataFollowOption struct { + ClientName string + ClientId int32 + ClientEpoch int32 + SelfSignature int32 + PathPrefix string + AdditionalPathPrefixes []string + DirectoriesToWatch []string + StartTsNs int64 + StopTsNs int64 + EventErrorType EventErrorType +} + type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error -func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32, clientEpoch int32, - pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32, - processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { +func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error { - err := WithFilerClient(true, clientId, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, additionalPathPrefixes, nil, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) + err := WithFilerClient(true, option.SelfSignature, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(option, processEventFn)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) } return err } -func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName string, clientId int32, clientEpoch int32, pathPrefix string, directoriesToWatch []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { +func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error { - err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, nil, directoriesToWatch, lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) + err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(option, processEventFn)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) } @@ -42,20 +55,20 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName return nil } -func makeSubscribeMetadataFunc(clientName string, clientId int32, clientEpoch int32, pathPrefix string, additionalPathPrefixes []string, directoriesToWatch []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) func(client filer_pb.SeaweedFilerClient) error { +func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn ProcessMetadataFunc) func(client filer_pb.SeaweedFilerClient) error { return func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: clientName, - PathPrefix: pathPrefix, - PathPrefixes: additionalPathPrefixes, - Directories: directoriesToWatch, - SinceNs: *lastTsNs, - Signature: selfSignature, - ClientId: clientId, - ClientEpoch: clientEpoch, - UntilNs: untilTsNs, + ClientName: option.ClientName, + PathPrefix: option.PathPrefix, + PathPrefixes: option.AdditionalPathPrefixes, + Directories: option.DirectoriesToWatch, + SinceNs: option.StartTsNs, + Signature: option.SelfSignature, + ClientId: option.ClientId, + ClientEpoch: option.ClientEpoch, + UntilNs: option.StopTsNs, }) if err != nil { return fmt.Errorf("subscribe: %v", err) @@ -71,7 +84,7 @@ func makeSubscribeMetadataFunc(clientName string, clientId int32, clientEpoch in } if err := processEventFn(resp); err != nil { - switch eventErrorType { + switch option.EventErrorType { case TrivialOnError: glog.Errorf("process %v: %v", resp, err) case FatalOnError: @@ -87,7 +100,7 @@ func makeSubscribeMetadataFunc(clientName string, clientId int32, clientEpoch in glog.Errorf("process %v: %v", resp, err) } } - *lastTsNs = resp.TsNs + option.StartTsNs = resp.TsNs } } } diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 377cf2728..8006ff326 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -34,9 +34,21 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p } var clientEpoch int32 + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: clientName, + ClientId: s3a.randomClientId, + ClientEpoch: clientEpoch, + SelfSignature: 0, + PathPrefix: prefix, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: directoriesToWatch, + StartTsNs: lastTsNs, + StopTsNs: 0, + EventErrorType: pb.FatalOnError, + } util.RetryForever("followIamChanges", func() error { clientEpoch++ - return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, clientEpoch, prefix, directoriesToWatch, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError) + return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn) }, func(err error) bool { glog.V(0).Infof("iam follow metadata changes: %v", err) return true @@ -65,7 +77,7 @@ func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, conte return nil } -//reload bucket metadata +// reload bucket metadata func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { if dir == s3a.option.BucketsPath { if newEntry != nil {