seaweedfs/weed/command/watch.go

121 lines
3.3 KiB
Go
Raw Normal View History

2020-04-05 07:51:16 +00:00
package command
import (
"context"
"fmt"
"io"
2020-08-29 02:43:04 +00:00
"path/filepath"
"strings"
2020-04-28 06:49:46 +00:00
"time"
2020-04-05 07:51:16 +00:00
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
cmdWatch.Run = runWatch // break init cycle
2020-04-05 07:51:16 +00:00
}
var cmdWatch = &Command{
2020-08-29 02:43:04 +00:00
UsageLine: "watch [-filer=localhost:8888] [-target=/]",
2020-04-05 07:51:16 +00:00
Short: "see recent changes on a filer",
Long: `See recent changes on a filer.
`,
}
var (
2020-08-29 02:43:04 +00:00
watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
2020-04-05 07:51:16 +00:00
)
func runWatch(cmd *Command, args []string) bool {
2020-04-05 07:51:16 +00:00
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
2020-08-29 02:43:04 +00:00
var filterFunc func(dir, fname string) bool
if *watchPattern != "" {
if strings.Contains(*watchPattern, "/") {
println("watch path pattern", *watchPattern)
filterFunc = func(dir, fname string) bool {
matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
} else {
println("watch file pattern", *watchPattern)
filterFunc = func(dir, fname string) bool {
matched, err := filepath.Match(*watchPattern, fname)
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
}
}
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
if filterFunc == nil {
return true
}
if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
return false
}
if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
return true
}
if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
return true
}
return false
}
2021-01-11 10:08:26 +00:00
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
2021-01-11 08:03:13 +00:00
fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
2021-01-11 10:08:26 +00:00
return nil
2021-01-11 08:03:13 +00:00
}
watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
2020-04-05 07:51:16 +00:00
2020-09-09 18:21:23 +00:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "watch",
2020-04-05 19:51:21 +00:00
PathPrefix: *watchTarget,
2020-04-28 06:49:46 +00:00
SinceNs: time.Now().Add(-*watchStart).UnixNano(),
2020-04-05 07:51:16 +00:00
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
2020-08-29 02:43:04 +00:00
if !shouldPrint(resp) {
continue
}
2021-01-11 10:08:26 +00:00
if err = eachEntryFunc(resp); err != nil {
return err
}
2020-04-05 07:51:16 +00:00
}
})
if watchErr != nil {
fmt.Printf("watch %s: %v\n", *watchFiler, watchErr)
2020-04-05 07:51:16 +00:00
}
return true
}