Merge branch 'master' into mq-subscribe

This commit is contained in:
chrislu 2024-01-13 17:52:18 -08:00
commit 49f7de9daa
5 changed files with 65 additions and 26 deletions

View file

@ -3,19 +3,54 @@ services:
node1: node1:
image: chrislusf/seaweedfs:local image: chrislusf/seaweedfs:local
command: "server -master -volume -filer" command: "server -master -volume -filer"
ports:
- 8888:8888
- 18888:18888
healthcheck:
test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ]
interval: 1s
start_period: 10s
timeout: 30s
mount1: mount1:
image: chrislusf/seaweedfs:local image: chrislusf/seaweedfs:local
privileged: true privileged: true
command: "mount -filer=node1:8888 -dir=/mnt -dirAutoCreate" command: "mount -filer=node1:8888 -dir=/mnt -dirAutoCreate"
healthcheck:
test: [ "CMD", "curl", "--fail", "-I", "http://node1:8888/" ]
interval: 1s
start_period: 10s
timeout: 30s
depends_on:
node1:
condition: service_healthy
node2: node2:
image: chrislusf/seaweedfs:local image: chrislusf/seaweedfs:local
ports: ports:
- 7888:8888 - 7888:8888
- 17888:18888
command: "server -master -volume -filer" command: "server -master -volume -filer"
healthcheck:
test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ]
interval: 1s
start_period: 10s
timeout: 30s
mount2: mount2:
image: chrislusf/seaweedfs:local image: chrislusf/seaweedfs:local
privileged: true privileged: true
command: "mount -filer=node2:8888 -dir=/mnt -dirAutoCreate" command: "mount -filer=node2:8888 -dir=/mnt -dirAutoCreate"
healthcheck:
test: [ "CMD", "curl", "--fail", "-I", "http://node2:8888/" ]
interval: 1s
start_period: 10s
timeout: 30s
depends_on:
node2:
condition: service_healthy
sync: sync:
image: chrislusf/seaweedfs:local image: chrislusf/seaweedfs:local
command: "-v=4 filer.sync -a=node1:8888 -b=node2:8888 -a.debug -b.debug" command: "-v=4 filer.sync -a=node1:8888 -b=node2:8888 -a.debug -b.debug"
depends_on:
mount1:
condition: service_healthy
mount2:
condition: service_healthy

View file

@ -30,24 +30,24 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
return err return err
} }
processor := NewMetadataProcessor(eachEntryFunc, 128) lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
var lastLogTsNs = time.Now().UnixNano() var lastLogTsNs = time.Now().UnixNano()
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
processor.AddSyncJob(resp) processor.AddSyncJob(resp)
return nil return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error { }, 3*time.Second, func(counter int64, lastTsNs int64) error {
if processor.processedTsWatermark == 0 { offsetTsNs := processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return nil return nil
} }
now := time.Now().UnixNano() now := time.Now().UnixNano()
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now lastLogTsNs = now
return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, processor.processedTsWatermark) return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, offsetTsNs)
}) })
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
option.clientEpoch++ option.clientEpoch++
metadataFollowOption := &pb.MetadataFollowOption{ metadataFollowOption := &pb.MetadataFollowOption{

View file

@ -33,7 +33,8 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return err return err
} }
processor := NewMetadataProcessor(eachEntryFunc, 128) lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
var lastLogTsNs = time.Now().UnixNano() var lastLogTsNs = time.Now().UnixNano()
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
@ -50,18 +51,17 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
processor.AddSyncJob(resp) processor.AddSyncJob(resp)
return nil return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error { }, 3*time.Second, func(counter int64, lastTsNs int64) error {
if processor.processedTsWatermark == 0 { offsetTsNs := processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return nil return nil
} }
// use processor.processedTsWatermark instead of the lastTsNs from the most recent job // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
now := time.Now().UnixNano() now := time.Now().UnixNano()
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now lastLogTsNs = now
return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, processor.processedTsWatermark) return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, offsetTsNs)
}) })
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
option.clientEpoch++ option.clientEpoch++
metadataFollowOption := &pb.MetadataFollowOption{ metadataFollowOption := &pb.MetadataFollowOption{

View file

@ -19,6 +19,7 @@ import (
"os" "os"
"regexp" "regexp"
"strings" "strings"
"sync/atomic"
"time" "time"
) )
@ -50,7 +51,7 @@ type SyncOptions struct {
aDoDeleteFiles *bool aDoDeleteFiles *bool
bDoDeleteFiles *bool bDoDeleteFiles *bool
clientId int32 clientId int32
clientEpoch int32 clientEpoch atomic.Int32
} }
const ( const (
@ -150,10 +151,10 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
os.Exit(2) os.Exit(2)
} }
for { for {
syncOptions.clientEpoch++ syncOptions.clientEpoch.Add(1)
err := doSubscribeFilerMetaChanges( err := doSubscribeFilerMetaChanges(
syncOptions.clientId, syncOptions.clientId,
syncOptions.clientEpoch, syncOptions.clientEpoch.Load(),
grpcDialOption, grpcDialOption,
filerA, filerA,
*syncOptions.aPath, *syncOptions.aPath,
@ -188,10 +189,10 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
} }
go func() { go func() {
for { for {
syncOptions.clientEpoch++ syncOptions.clientEpoch.Add(1)
err := doSubscribeFilerMetaChanges( err := doSubscribeFilerMetaChanges(
syncOptions.clientId, syncOptions.clientId,
syncOptions.clientEpoch, syncOptions.clientEpoch.Load(),
grpcDialOption, grpcDialOption,
filerB, filerB,
*syncOptions.bPath, *syncOptions.bPath,
@ -274,7 +275,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit) glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit)
concurrency = DefaultConcurrencyLimit concurrency = DefaultConcurrencyLimit
} }
processor := NewMetadataProcessor(processEventFn, concurrency) processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
var lastLogTsNs = time.Now().UnixNano() var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
@ -282,16 +283,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
processor.AddSyncJob(resp) processor.AddSyncJob(resp)
return nil return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error { }, 3*time.Second, func(counter int64, lastTsNs int64) error {
if processor.processedTsWatermark == 0 { offsetTsNs := processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return nil return nil
} }
// use processor.processedTsWatermark instead of the lastTsNs from the most recent job // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
now := time.Now().UnixNano() now := time.Now().UnixNano()
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now lastLogTsNs = now
// collect synchronous offset // collect synchronous offset
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(processor.processedTsWatermark)) statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark) return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
}) })
metadataFollowOption := &pb.MetadataFollowOption{ metadataFollowOption := &pb.MetadataFollowOption{
@ -408,7 +410,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
} }
// handle deletions // handle deletions
if filer_pb.IsDelete(resp) { if filer_pb.IsDelete(resp) {
if doDeleteFiles { if !doDeleteFiles {
return nil return nil
} }
if !strings.HasPrefix(string(sourceOldKey), sourcePath) { if !strings.HasPrefix(string(sourceOldKey), sourcePath) {

View file

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"sync" "sync"
"sync/atomic"
) )
type MetadataProcessor struct { type MetadataProcessor struct {
@ -14,15 +15,16 @@ type MetadataProcessor struct {
activeJobsCond *sync.Cond activeJobsCond *sync.Cond
concurrencyLimit int concurrencyLimit int
fn pb.ProcessMetadataFunc fn pb.ProcessMetadataFunc
processedTsWatermark int64 processedTsWatermark atomic.Int64
} }
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor { func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{ t := &MetadataProcessor{
fn: fn, fn: fn,
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
concurrencyLimit: concurrency, concurrencyLimit: concurrency,
} }
t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock) t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
return t return t
} }
@ -61,7 +63,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
} }
} }
if isOldest { if isOldest {
t.processedTsWatermark = resp.TsNs t.processedTsWatermark.Store(resp.TsNs)
} }
t.activeJobsCond.Signal() t.activeJobsCond.Signal()
}() }()