This commit is contained in:
chrislu 2022-07-27 12:12:43 -07:00
commit ca836568ac
7 changed files with 110 additions and 32 deletions

View file

@ -10,5 +10,5 @@ install:
full_install: full_install:
cd weed; go install -tags "elastic gocdk sqlite ydb tikv" cd weed; go install -tags "elastic gocdk sqlite ydb tikv"
test: tests:
cd weed; go test -tags "elastic gocdk sqlite ydb tikv" -v ./... cd weed; go test -tags "elastic gocdk sqlite ydb tikv" -v ./...

View file

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc" "google.golang.org/grpc"
"strings"
"time" "time"
) )
@ -15,6 +16,7 @@ type FilerBackupOptions struct {
isActivePassive *bool isActivePassive *bool
filer *string filer *string
path *string path *string
excludePaths *string
debug *bool debug *bool
proxyByFiler *bool proxyByFiler *bool
timeAgo *time.Duration timeAgo *time.Duration
@ -28,6 +30,7 @@ func init() {
cmdFilerBackup.Run = runFilerBackup // break init cycle cmdFilerBackup.Run = runFilerBackup // break init cycle
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster") filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer") filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers") filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
@ -84,6 +87,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
sourceFiler := pb.ServerAddress(*backupOption.filer) sourceFiler := pb.ServerAddress(*backupOption.filer)
sourcePath := *backupOption.path sourcePath := *backupOption.path
excludePaths := strings.Split(*backupOption.excludePaths, ",")
timeAgo := *backupOption.timeAgo timeAgo := *backupOption.timeAgo
targetPath := dataSink.GetSinkToDirectory() targetPath := dataSink.GetSinkToDirectory()
debug := *backupOption.debug debug := *backupOption.debug
@ -106,10 +110,14 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
// create filer sink // create filer sink
filerSource := &source.FilerSource{} filerSource := &source.FilerSource{}
filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler) filerSource.DoInitialize(
sourceFiler.ToHttpAddress(),
sourceFiler.ToGrpcAddress(),
sourcePath,
*backupOption.proxyByFiler)
dataSink.SetSourceFiler(filerSource) dataSink.SetSourceFiler(filerSource)
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, dataSink, debug)
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))

View file

@ -26,7 +26,9 @@ type SyncOptions struct {
filerA *string filerA *string
filerB *string filerB *string
aPath *string aPath *string
aExcludePaths *string
bPath *string bPath *string
bExcludePaths *string
aReplication *string aReplication *string
bReplication *string bReplication *string
aCollection *string aCollection *string
@ -58,7 +60,9 @@ func init() {
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster") syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster") syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A") syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B") syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A") syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B") syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A") syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
@ -133,9 +137,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
} }
for { for {
syncOptions.clientEpoch++ syncOptions.clientEpoch++
err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, err := doSubscribeFilerMetaChanges(
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, syncOptions.clientId,
*syncOptions.bDebug, aFilerSignature, bFilerSignature) syncOptions.clientEpoch,
grpcDialOption,
filerA,
*syncOptions.aPath,
strings.Split(*syncOptions.aExcludePaths, ","),
*syncOptions.aProxyByFiler,
filerB,
*syncOptions.bPath,
*syncOptions.bReplication,
*syncOptions.bCollection,
*syncOptions.bTtlSec,
*syncOptions.bProxyByFiler,
*syncOptions.bDiskType,
*syncOptions.bDebug,
aFilerSignature,
bFilerSignature)
if err != nil { if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond) time.Sleep(1747 * time.Millisecond)
@ -154,9 +173,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() { go func() {
for { for {
syncOptions.clientEpoch++ syncOptions.clientEpoch++
err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, err := doSubscribeFilerMetaChanges(
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, syncOptions.clientId,
*syncOptions.aDebug, bFilerSignature, aFilerSignature) syncOptions.clientEpoch,
grpcDialOption,
filerB,
*syncOptions.bPath,
strings.Split(*syncOptions.bExcludePaths, ","),
*syncOptions.bProxyByFiler,
filerA,
*syncOptions.aPath,
*syncOptions.aReplication,
*syncOptions.aCollection,
*syncOptions.aTtlSec,
*syncOptions.aProxyByFiler,
*syncOptions.aDiskType,
*syncOptions.aDebug,
bFilerSignature,
aFilerSignature)
if err != nil { if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond) time.Sleep(2147 * time.Millisecond)
@ -186,7 +220,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
return nil return nil
} }
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now // if first time, start from now
@ -205,7 +239,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource) filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, filerSink, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification message := resp.EventNotification
@ -302,7 +336,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
} }
func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
// process function // process function
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification message := resp.EventNotification
@ -322,7 +356,11 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
if !strings.HasPrefix(resp.Directory, sourcePath) { if !strings.HasPrefix(resp.Directory, sourcePath) {
return nil return nil
} }
for _, excludePath := range excludePaths {
if strings.HasPrefix(resp.Directory, excludePath) {
return nil
}
}
// handle deletions // handle deletions
if filer_pb.IsDelete(resp) { if filer_pb.IsDelete(resp) {
if !strings.HasPrefix(string(sourceOldKey), sourcePath) { if !strings.HasPrefix(string(sourceOldKey), sourcePath) {

View file

@ -13,6 +13,8 @@ grpcAddress = "localhost:18888"
# this is not a directory on your hard drive, but on your filer. # this is not a directory on your hard drive, but on your filer.
# i.e., all files with this "prefix" are sent to notification message queue. # i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets" directory = "/buckets"
# files from the directory separated by space are excluded from sending notifications
excludeDirectories = "/buckets/tmp"
[sink.local] [sink.local]
enabled = false enabled = false

View file

@ -16,8 +16,9 @@ import (
) )
type Replicator struct { type Replicator struct {
sink sink.ReplicationSink sink sink.ReplicationSink
source *source.FilerSource source *source.FilerSource
excludeDirs []string
} }
func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator { func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator {
@ -28,8 +29,9 @@ func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSin
dataSink.SetSourceFiler(source) dataSink.SetSourceFiler(source)
return &Replicator{ return &Replicator{
sink: dataSink, sink: dataSink,
source: source, source: source,
excludeDirs: sourceConfig.GetStringSlice(configPrefix + "excludeDirectories"),
} }
} }
@ -41,6 +43,13 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
return nil return nil
} }
for _, excludeDir := range r.excludeDirs {
if strings.HasPrefix(key, excludeDir) {
glog.V(4).Infof("skipping %v of exclude dir %v", key, excludeDir)
return nil
}
}
var dateKey string var dateKey string
if r.sink.IsIncremental() { if r.sink.IsIncremental() {
var mTime int64 var mTime int64

View file

@ -17,6 +17,16 @@ import (
"github.com/prometheus/client_golang/prometheus/push" "github.com/prometheus/client_golang/prometheus/push"
) )
// Readonly volume types
const (
IsReadOnly = "IsReadOnly"
NoWriteOrDelete = "noWriteOrDelete"
NoWriteCanDelete = "noWriteCanDelete"
IsDiskSpaceLow = "isDiskSpaceLow"
)
var readOnlyVolumeTypes = [4]string{IsReadOnly, NoWriteOrDelete, NoWriteCanDelete, IsDiskSpaceLow}
var ( var (
Gather = prometheus.NewRegistry() Gather = prometheus.NewRegistry()
@ -249,3 +259,12 @@ func SourceName(port uint32) string {
} }
return net.JoinHostPort(hostname, strconv.Itoa(int(port))) return net.JoinHostPort(hostname, strconv.Itoa(int(port)))
} }
// todo - can be changed to DeletePartialMatch when https://github.com/prometheus/client_golang/pull/1013 gets released
func DeleteCollectionMetrics(collection string) {
VolumeServerDiskSizeGauge.DeleteLabelValues(collection, "normal")
for _, volume_type := range readOnlyVolumeTypes {
VolumeServerReadOnlyVolumeGauge.DeleteLabelValues(collection, volume_type)
}
VolumeServerVolumeCounter.DeleteLabelValues(collection, "volume")
}

View file

@ -101,6 +101,7 @@ func (s *Store) DeleteCollection(collection string) (e error) {
if e != nil { if e != nil {
return return
} }
stats.DeleteCollectionMetrics(collection)
// let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan
} }
return return
@ -240,19 +241,19 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
if maxFileKey < curMaxFileKey { if maxFileKey < curMaxFileKey {
maxFileKey = curMaxFileKey maxFileKey = curMaxFileKey
} }
deleteVolume := false shouldDeleteVolume := false
if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) { if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, volumeMessage) volumeMessages = append(volumeMessages, volumeMessage)
} else { } else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id) deleteVids = append(deleteVids, v.Id)
deleteVolume = true shouldDeleteVolume = true
} else { } else {
glog.V(0).Infof("volume %d is expired", v.Id) glog.V(0).Infof("volume %d is expired", v.Id)
} }
if v.lastIoError != nil { if v.lastIoError != nil {
deleteVids = append(deleteVids, v.Id) deleteVids = append(deleteVids, v.Id)
deleteVolume = true shouldDeleteVolume = true
glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError) glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
} }
} }
@ -260,33 +261,33 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
if _, exist := collectionVolumeSize[v.Collection]; !exist { if _, exist := collectionVolumeSize[v.Collection]; !exist {
collectionVolumeSize[v.Collection] = 0 collectionVolumeSize[v.Collection] = 0
} }
if !deleteVolume { if !shouldDeleteVolume {
collectionVolumeSize[v.Collection] += volumeMessage.Size collectionVolumeSize[v.Collection] += volumeMessage.Size
} else { } else {
collectionVolumeSize[v.Collection] -= volumeMessage.Size collectionVolumeSize[v.Collection] -= volumeMessage.Size
if collectionVolumeSize[v.Collection] <= 0 { if collectionVolumeSize[v.Collection] < 0 {
delete(collectionVolumeSize, v.Collection) collectionVolumeSize[v.Collection] = 0
} }
} }
if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{ collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{
"IsReadOnly": 0, stats.IsReadOnly: 0,
"noWriteOrDelete": 0, stats.NoWriteOrDelete: 0,
"noWriteCanDelete": 0, stats.NoWriteCanDelete: 0,
"isDiskSpaceLow": 0, stats.IsDiskSpaceLow: 0,
} }
} }
if !deleteVolume && v.IsReadOnly() { if !shouldDeleteVolume && v.IsReadOnly() {
collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 collectionVolumeReadOnlyCount[v.Collection][stats.IsReadOnly] += 1
if v.noWriteOrDelete { if v.noWriteOrDelete {
collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 collectionVolumeReadOnlyCount[v.Collection][stats.NoWriteOrDelete] += 1
} }
if v.noWriteCanDelete { if v.noWriteCanDelete {
collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1 collectionVolumeReadOnlyCount[v.Collection][stats.NoWriteCanDelete] += 1
} }
if v.location.isDiskSpaceLow { if v.location.isDiskSpaceLow {
collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1 collectionVolumeReadOnlyCount[v.Collection][stats.IsDiskSpaceLow] += 1
} }
} }
} }
@ -458,6 +459,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
err := location.UnloadVolume(i) err := location.UnloadVolume(i)
if err == nil { if err == nil {
glog.V(0).Infof("UnmountVolume %d", i) glog.V(0).Infof("UnmountVolume %d", i)
stats.DeleteCollectionMetrics(v.Collection)
s.DeletedVolumesChan <- message s.DeletedVolumesChan <- message
return nil return nil
} else if err == ErrVolumeNotFound { } else if err == ErrVolumeNotFound {