From 709c83716cbc1877510d0836ab1194eb28c99761 Mon Sep 17 00:00:00 2001 From: Evgeny Kuzhelev Date: Wed, 27 Jul 2022 13:55:09 +0500 Subject: [PATCH 1/4] delete disk_size metrics when collection deleted --- weed/storage/store.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/weed/storage/store.go b/weed/storage/store.go index f833f1b15..8ab8b994c 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -101,6 +101,7 @@ func (s *Store) DeleteCollection(collection string) (e error) { if e != nil { return } + stats.VolumeServerDiskSizeGauge.DeleteLabelValues(collection, "normal") // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan } return @@ -240,19 +241,19 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { if maxFileKey < curMaxFileKey { maxFileKey = curMaxFileKey } - deleteVolume := false + shouldDeleteVolume := false if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) { volumeMessages = append(volumeMessages, volumeMessage) } else { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { deleteVids = append(deleteVids, v.Id) - deleteVolume = true + shouldDeleteVolume = true } else { glog.V(0).Infof("volume %d is expired", v.Id) } if v.lastIoError != nil { deleteVids = append(deleteVids, v.Id) - deleteVolume = true + shouldDeleteVolume = true glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError) } } @@ -260,12 +261,12 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { if _, exist := collectionVolumeSize[v.Collection]; !exist { collectionVolumeSize[v.Collection] = 0 } - if !deleteVolume { + if !shouldDeleteVolume { collectionVolumeSize[v.Collection] += volumeMessage.Size } else { collectionVolumeSize[v.Collection] -= volumeMessage.Size - if collectionVolumeSize[v.Collection] <= 0 { - delete(collectionVolumeSize, v.Collection) + if collectionVolumeSize[v.Collection] < 0 { + collectionVolumeSize[v.Collection] = 0 } } @@ -277,7 +278,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { "isDiskSpaceLow": 0, } } - if !deleteVolume && v.IsReadOnly() { + if !shouldDeleteVolume && v.IsReadOnly() { collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 if v.noWriteOrDelete { collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 From 47c72e6f3562bdf8566effd38c911285ba955eb4 Mon Sep 17 00:00:00 2001 From: Evgeny Kuzhelev Date: Wed, 27 Jul 2022 16:31:49 +0500 Subject: [PATCH 2/4] remove all (currently existing) collection volume metrics --- weed/stats/metrics.go | 18 ++++++++++++++++++ weed/storage/store.go | 19 ++++++++++--------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index f0b810608..0f4fcfb52 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -17,6 +17,16 @@ import ( "github.com/prometheus/client_golang/prometheus/push" ) +// Readonly volume types +const ( + IsReadOnly = "IsReadOnly" + NoWriteOrDelete = "noWriteOrDelete" + NoWriteCanDelete = "noWriteCanDelete" + IsDiskSpaceLow = "isDiskSpaceLow" +) + +var readOnlyVolumeTypes = [4]string{IsReadOnly, NoWriteOrDelete, NoWriteCanDelete, IsDiskSpaceLow} + var ( Gather = prometheus.NewRegistry() @@ -249,3 +259,11 @@ func SourceName(port uint32) string { } return net.JoinHostPort(hostname, strconv.Itoa(int(port))) } + +func DeleteCollectionMetrics(collection string) { + VolumeServerDiskSizeGauge.DeleteLabelValues(collection, "normal") + for _, volume_type := range readOnlyVolumeTypes { + VolumeServerReadOnlyVolumeGauge.DeleteLabelValues(collection, volume_type) + } + VolumeServerVolumeCounter.DeleteLabelValues(collection, "volume") +} diff --git a/weed/storage/store.go b/weed/storage/store.go index 8ab8b994c..06c6362ba 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -101,7 +101,7 @@ func (s *Store) DeleteCollection(collection string) (e error) { if e != nil { return } - stats.VolumeServerDiskSizeGauge.DeleteLabelValues(collection, "normal") + stats.DeleteCollectionMetrics(collection) // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan } return @@ -272,22 +272,22 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{ - "IsReadOnly": 0, - "noWriteOrDelete": 0, - "noWriteCanDelete": 0, - "isDiskSpaceLow": 0, + stats.IsReadOnly: 0, + stats.NoWriteOrDelete: 0, + stats.NoWriteCanDelete: 0, + stats.IsDiskSpaceLow: 0, } } if !shouldDeleteVolume && v.IsReadOnly() { - collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 + collectionVolumeReadOnlyCount[v.Collection][stats.IsReadOnly] += 1 if v.noWriteOrDelete { - collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 + collectionVolumeReadOnlyCount[v.Collection][stats.NoWriteOrDelete] += 1 } if v.noWriteCanDelete { - collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1 + collectionVolumeReadOnlyCount[v.Collection][stats.NoWriteCanDelete] += 1 } if v.location.isDiskSpaceLow { - collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1 + collectionVolumeReadOnlyCount[v.Collection][stats.IsDiskSpaceLow] += 1 } } } @@ -459,6 +459,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { err := location.UnloadVolume(i) if err == nil { glog.V(0).Infof("UnmountVolume %d", i) + stats.DeleteCollectionMetrics(v.Collection) s.DeletedVolumesChan <- message return nil } else if err == ErrVolumeNotFound { From ac5ce312783e545c38ada661b238780af37c86da Mon Sep 17 00:00:00 2001 From: Evgeny Kuzhelev Date: Wed, 27 Jul 2022 16:48:56 +0500 Subject: [PATCH 3/4] leave notion to refactor after prometheus upgrade --- weed/stats/metrics.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 0f4fcfb52..a2fd373af 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -260,6 +260,7 @@ func SourceName(port uint32) string { return net.JoinHostPort(hostname, strconv.Itoa(int(port))) } +// todo - can be changed to DeletePartialMatch when https://github.com/prometheus/client_golang/pull/1013 gets released func DeleteCollectionMetrics(collection string) { VolumeServerDiskSizeGauge.DeleteLabelValues(collection, "normal") for _, volume_type := range readOnlyVolumeTypes { From 7e09a548a64a6ffd62f40e8b38a96d6b20f9828b Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 27 Jul 2022 19:22:57 +0500 Subject: [PATCH 4/4] exclude directories to sync on filer --- Makefile | 2 +- weed/command/filer_backup.go | 12 +++++- weed/command/filer_sync.go | 58 +++++++++++++++++++++----- weed/command/scaffold/replication.toml | 2 + weed/replication/replicator.go | 17 ++++++-- 5 files changed, 74 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index aa736edee..315a62428 100644 --- a/Makefile +++ b/Makefile @@ -10,5 +10,5 @@ install: full_install: cd weed; go install -tags "elastic gocdk sqlite ydb tikv" -test: +tests: cd weed; go test -tags "elastic gocdk sqlite ydb tikv" -v ./... diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 90bc8c5c3..62477227b 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" + "strings" "time" ) @@ -15,6 +16,7 @@ type FilerBackupOptions struct { isActivePassive *bool filer *string path *string + excludePaths *string debug *bool proxyByFiler *bool timeAgo *time.Duration @@ -28,6 +30,7 @@ func init() { cmdFilerBackup.Run = runFilerBackup // break init cycle filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster") filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer") + filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer") filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers") filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") @@ -84,6 +87,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti sourceFiler := pb.ServerAddress(*backupOption.filer) sourcePath := *backupOption.path + excludePaths := strings.Split(*backupOption.excludePaths, ",") timeAgo := *backupOption.timeAgo targetPath := dataSink.GetSinkToDirectory() debug := *backupOption.debug @@ -106,10 +110,14 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler) + filerSource.DoInitialize( + sourceFiler.ToHttpAddress(), + sourceFiler.ToGrpcAddress(), + sourcePath, + *backupOption.proxyByFiler) dataSink.SetSourceFiler(filerSource) - processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) + processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, dataSink, debug) processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 130138a3d..abf13a81d 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -26,7 +26,9 @@ type SyncOptions struct { filerA *string filerB *string aPath *string + aExcludePaths *string bPath *string + bExcludePaths *string aReplication *string bReplication *string aCollection *string @@ -58,7 +60,9 @@ func init() { syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster") syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster") syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A") + syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A") syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B") + syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B") syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A") syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B") syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A") @@ -133,9 +137,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool { } for { syncOptions.clientEpoch++ - err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, - *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, - *syncOptions.bDebug, aFilerSignature, bFilerSignature) + err := doSubscribeFilerMetaChanges( + syncOptions.clientId, + syncOptions.clientEpoch, + grpcDialOption, + filerA, + *syncOptions.aPath, + strings.Split(*syncOptions.aExcludePaths, ","), + *syncOptions.aProxyByFiler, + filerB, + *syncOptions.bPath, + *syncOptions.bReplication, + *syncOptions.bCollection, + *syncOptions.bTtlSec, + *syncOptions.bProxyByFiler, + *syncOptions.bDiskType, + *syncOptions.bDebug, + aFilerSignature, + bFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -154,9 +173,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool { go func() { for { syncOptions.clientEpoch++ - err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, - *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, - *syncOptions.aDebug, bFilerSignature, aFilerSignature) + err := doSubscribeFilerMetaChanges( + syncOptions.clientId, + syncOptions.clientEpoch, + grpcDialOption, + filerB, + *syncOptions.bPath, + strings.Split(*syncOptions.bExcludePaths, ","), + *syncOptions.bProxyByFiler, + filerA, + *syncOptions.aPath, + *syncOptions.aReplication, + *syncOptions.aCollection, + *syncOptions.aTtlSec, + *syncOptions.aProxyByFiler, + *syncOptions.aDiskType, + *syncOptions.aDebug, + bFilerSignature, + aFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -186,7 +220,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd return nil } -func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, +func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { // if first time, start from now @@ -205,7 +239,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) - persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) + persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, filerSink, debug) processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -302,7 +336,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } -func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { +func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { // process function processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -322,7 +356,11 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl if !strings.HasPrefix(resp.Directory, sourcePath) { return nil } - + for _, excludePath := range excludePaths { + if strings.HasPrefix(resp.Directory, excludePath) { + return nil + } + } // handle deletions if filer_pb.IsDelete(resp) { if !strings.HasPrefix(string(sourceOldKey), sourcePath) { diff --git a/weed/command/scaffold/replication.toml b/weed/command/scaffold/replication.toml index c463c8077..cffe1b76f 100644 --- a/weed/command/scaffold/replication.toml +++ b/weed/command/scaffold/replication.toml @@ -13,6 +13,8 @@ grpcAddress = "localhost:18888" # this is not a directory on your hard drive, but on your filer. # i.e., all files with this "prefix" are sent to notification message queue. directory = "/buckets" +# files from the directory separated by space are excluded from sending notifications +excludeDirectories = "/buckets/tmp" [sink.local] enabled = false diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index eaab2c13e..8fee23b95 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -16,8 +16,9 @@ import ( ) type Replicator struct { - sink sink.ReplicationSink - source *source.FilerSource + sink sink.ReplicationSink + source *source.FilerSource + excludeDirs []string } func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator { @@ -28,8 +29,9 @@ func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSin dataSink.SetSourceFiler(source) return &Replicator{ - sink: dataSink, - source: source, + sink: dataSink, + source: source, + excludeDirs: sourceConfig.GetStringSlice(configPrefix + "excludeDirectories"), } } @@ -41,6 +43,13 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) return nil } + for _, excludeDir := range r.excludeDirs { + if strings.HasPrefix(key, excludeDir) { + glog.V(4).Infof("skipping %v of exclude dir %v", key, excludeDir) + return nil + } + } + var dateKey string if r.sink.IsIncremental() { var mTime int64