mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer.meta.tail: optionally submit metadata changes to ElasticSearch
This commit is contained in:
parent
c5df2577f5
commit
3fb2ed9093
|
@ -3,6 +3,8 @@ package command
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
jsoniter "github.com/json-iterator/go"
|
||||||
|
"github.com/olivere/elastic/v7"
|
||||||
"io"
|
"io"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -31,6 +33,8 @@ var (
|
||||||
tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
|
tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
|
||||||
tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
||||||
tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
|
tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
|
||||||
|
esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
|
||||||
|
esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
|
||||||
)
|
)
|
||||||
|
|
||||||
func runFilerMetaTail(cmd *Command, args []string) bool {
|
func runFilerMetaTail(cmd *Command, args []string) bool {
|
||||||
|
@ -80,6 +84,14 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
|
||||||
fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
|
fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if *esServers != "" {
|
||||||
|
var err error
|
||||||
|
eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
@ -118,3 +130,72 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type EsDocument struct {
|
||||||
|
Dir string `json:"dir,omitempty"`
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
IsDirectory bool `json:"isDir,omitempty"`
|
||||||
|
Size uint64 `json:"size,omitempty"`
|
||||||
|
Uid uint32 `json:"uid,omitempty"`
|
||||||
|
Gid uint32 `json:"gid,omitempty"`
|
||||||
|
UserName string `json:"userName,omitempty"`
|
||||||
|
Collection string `json:"collection,omitempty"`
|
||||||
|
Crtime int64 `json:"crtime,omitempty"`
|
||||||
|
Mtime int64 `json:"mtime,omitempty"`
|
||||||
|
Mime string `json:"mime,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
|
||||||
|
entry := event.NewEntry
|
||||||
|
dir, name := event.NewParentPath, entry.Name
|
||||||
|
id := util.Md5String([]byte(util.NewFullPath(dir, name)))
|
||||||
|
esEntry := &EsDocument{
|
||||||
|
Dir: dir,
|
||||||
|
Name: name,
|
||||||
|
IsDirectory: entry.IsDirectory,
|
||||||
|
Size: entry.Attributes.FileSize,
|
||||||
|
Uid: entry.Attributes.Uid,
|
||||||
|
Gid: entry.Attributes.Gid,
|
||||||
|
UserName: entry.Attributes.UserName,
|
||||||
|
Collection: entry.Attributes.Collection,
|
||||||
|
Crtime: entry.Attributes.Crtime,
|
||||||
|
Mtime: entry.Attributes.Mtime,
|
||||||
|
Mime: entry.Attributes.Mime,
|
||||||
|
}
|
||||||
|
return esEntry, id
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
|
||||||
|
options := []elastic.ClientOptionFunc{}
|
||||||
|
options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
|
||||||
|
options = append(options, elastic.SetSniff(false))
|
||||||
|
options = append(options, elastic.SetHealthcheck(false))
|
||||||
|
client, err := elastic.NewClient(options...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
|
event := resp.EventNotification
|
||||||
|
if event.OldEntry != nil &&
|
||||||
|
(event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
|
||||||
|
// delete or not update the same file
|
||||||
|
dir, name := resp.Directory, event.OldEntry.Name
|
||||||
|
id := util.Md5String([]byte(util.NewFullPath(dir, name)))
|
||||||
|
println("delete", id)
|
||||||
|
_, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if event.NewEntry != nil {
|
||||||
|
// add a new file or update the same file
|
||||||
|
esEntry, id := toEsEntry(event)
|
||||||
|
value, err := jsoniter.Marshal(esEntry)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
println(string(value))
|
||||||
|
_, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue