2022-08-07 07:56:15 +00:00
|
|
|
package command
|
|
|
|
|
|
|
|
import (
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
2022-08-22 07:00:51 +00:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
2022-08-07 07:56:15 +00:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
type MetadataProcessor struct {
|
|
|
|
activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
|
|
|
|
activeJobsLock sync.Mutex
|
|
|
|
activeJobsCond *sync.Cond
|
|
|
|
concurrencyLimit int
|
2022-08-22 07:00:51 +00:00
|
|
|
fn pb.ProcessMetadataFunc
|
2022-08-07 07:56:15 +00:00
|
|
|
processedTsWatermark int64
|
|
|
|
}
|
|
|
|
|
2022-08-22 07:00:51 +00:00
|
|
|
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor {
|
2022-08-07 07:56:15 +00:00
|
|
|
t := &MetadataProcessor{
|
|
|
|
fn: fn,
|
|
|
|
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
|
|
|
|
concurrencyLimit: concurrency,
|
|
|
|
}
|
|
|
|
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
|
|
|
|
return t
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
|
|
|
|
if filer_pb.IsEmpty(resp) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
t.activeJobsLock.Lock()
|
|
|
|
defer t.activeJobsLock.Unlock()
|
|
|
|
|
|
|
|
for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
|
|
|
|
t.activeJobsCond.Wait()
|
|
|
|
}
|
|
|
|
t.activeJobs[resp.TsNs] = resp
|
|
|
|
go func() {
|
|
|
|
|
2022-08-22 18:25:40 +00:00
|
|
|
if err := util.Retry("metadata processor", func() error {
|
2022-08-07 07:56:15 +00:00
|
|
|
return t.fn(resp)
|
2022-08-22 18:25:40 +00:00
|
|
|
}); err != nil {
|
2022-08-07 07:56:15 +00:00
|
|
|
glog.Errorf("process %v: %v", resp, err)
|
2022-08-22 18:25:40 +00:00
|
|
|
}
|
2022-08-07 07:56:15 +00:00
|
|
|
|
|
|
|
t.activeJobsLock.Lock()
|
|
|
|
defer t.activeJobsLock.Unlock()
|
|
|
|
|
|
|
|
delete(t.activeJobs, resp.TsNs)
|
|
|
|
|
|
|
|
// if is the oldest job, write down the watermark
|
|
|
|
isOldest := true
|
|
|
|
for t, _ := range t.activeJobs {
|
|
|
|
if resp.TsNs > t {
|
|
|
|
isOldest = false
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if isOldest {
|
|
|
|
t.processedTsWatermark = resp.TsNs
|
|
|
|
}
|
|
|
|
t.activeJobsCond.Signal()
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
|
|
|
|
for _, r := range t.activeJobs {
|
|
|
|
if shouldWaitFor(resp, r) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
// a is one possible job to schedule
|
|
|
|
// b is one existing active job
|
|
|
|
func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
|
|
|
|
aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
|
|
|
|
bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
|
|
|
|
|
|
|
|
if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
if aNewPath != "" {
|
|
|
|
if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if bNewPath != "" {
|
|
|
|
if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if aNewPath != "" && bNewPath != "" {
|
|
|
|
if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
|
|
|
|
if bIsDirectory {
|
|
|
|
if aIsDirectory {
|
|
|
|
return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
|
|
|
|
} else {
|
|
|
|
return aPath.IsUnder(bPath)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if aIsDirectory {
|
|
|
|
return bPath.IsUnder(aPath)
|
|
|
|
} else {
|
|
|
|
return aPath == bPath
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
|
|
|
|
oldEntry := resp.EventNotification.OldEntry
|
|
|
|
newEntry := resp.EventNotification.NewEntry
|
|
|
|
// create
|
|
|
|
if filer_pb.IsCreate(resp) {
|
|
|
|
path = util.FullPath(resp.Directory).Child(newEntry.Name)
|
|
|
|
isDirectory = newEntry.IsDirectory
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if filer_pb.IsDelete(resp) {
|
|
|
|
path = util.FullPath(resp.Directory).Child(oldEntry.Name)
|
|
|
|
isDirectory = oldEntry.IsDirectory
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if filer_pb.IsUpdate(resp) {
|
|
|
|
path = util.FullPath(resp.Directory).Child(newEntry.Name)
|
|
|
|
isDirectory = newEntry.IsDirectory
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// renaming
|
|
|
|
path = util.FullPath(resp.Directory).Child(oldEntry.Name)
|
|
|
|
isDirectory = oldEntry.IsDirectory
|
|
|
|
newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
|
|
|
|
return
|
|
|
|
}
|