mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
new filer option to es v7.
This commit is contained in:
parent
1d56ea24ef
commit
a93d27d1e8
2
go.mod
2
go.mod
|
@ -87,6 +87,8 @@ require (
|
|||
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
|
||||
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
|
||||
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
|
||||
github.com/json-iterator/go v1.1.10
|
||||
github.com/olivere/elastic/v7 v7.0.19
|
||||
)
|
||||
|
||||
replace go.etcd.io/etcd => go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547
|
||||
|
|
|
@ -173,6 +173,12 @@ enabled = false
|
|||
uri = "mongodb://localhost:27017"
|
||||
option_pool_size = 0
|
||||
database = "seaweedfs"
|
||||
|
||||
[elastic7]
|
||||
enabled = false
|
||||
servers = "http://localhost:9200"
|
||||
# increase the value is recommend, both filer and elastic cluster
|
||||
index.max_result_window = 10000
|
||||
`
|
||||
|
||||
NOTIFICATION_TOML_EXAMPLE = `
|
||||
|
|
295
weed/filer/elastic/v7/elastic_store.go
Normal file
295
weed/filer/elastic/v7/elastic_store.go
Normal file
|
@ -0,0 +1,295 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
weed_util "github.com/chrislusf/seaweedfs/weed/util"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
elastic "github.com/olivere/elastic/v7"
|
||||
)
|
||||
|
||||
var (
|
||||
indexType = "_doc"
|
||||
indexPrefix = ".seaweedfs_"
|
||||
)
|
||||
|
||||
type ESEntry struct {
|
||||
ParentId string `json:"ParentId"`
|
||||
Entry *filer2.Entry
|
||||
}
|
||||
|
||||
func init() {
|
||||
filer2.Stores = append(filer2.Stores, &ElasticStore{})
|
||||
}
|
||||
|
||||
type ElasticStore struct {
|
||||
client *elastic.Client
|
||||
maxPageSize int
|
||||
}
|
||||
|
||||
func (store *ElasticStore) GetName() string {
|
||||
return "elastic7"
|
||||
}
|
||||
func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
|
||||
servers := configuration.GetString(prefix + "servers")
|
||||
if servers == "" {
|
||||
return fmt.Errorf("error elastic endpoints.")
|
||||
}
|
||||
store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
|
||||
if store.maxPageSize <= 0 {
|
||||
return fmt.Errorf("error elastic index.max_result_window.")
|
||||
}
|
||||
glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize)
|
||||
store.client, err = elastic.NewClient(
|
||||
elastic.SetSniff(false),
|
||||
elastic.SetHealthcheck(false),
|
||||
elastic.SetURL(servers),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init elastic %s: %v.", servers, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
|
||||
return ctx, nil
|
||||
}
|
||||
func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
|
||||
index := getIndex(entry.FullPath)
|
||||
dir, _ := entry.FullPath.DirAndName()
|
||||
id := fmt.Sprintf("%x", md5.Sum([]byte(entry.FullPath)))
|
||||
esEntry := &ESEntry{
|
||||
ParentId: fmt.Sprintf("%x", md5.Sum([]byte(dir))),
|
||||
Entry: entry,
|
||||
}
|
||||
value, err := jsoniter.Marshal(esEntry)
|
||||
if err != nil {
|
||||
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
|
||||
return fmt.Errorf("insert entry %v.", err)
|
||||
}
|
||||
_, err = store.client.Index().
|
||||
Index(index).
|
||||
Type(indexType).
|
||||
Id(id).
|
||||
BodyJson(string(value)).
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
|
||||
return fmt.Errorf("insert entry %v.", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
|
||||
return store.InsertEntry(ctx, entry)
|
||||
}
|
||||
func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
|
||||
index := getIndex(fullpath)
|
||||
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
|
||||
searchResult, err := store.client.Get().
|
||||
Index(index).
|
||||
Type(indexType).
|
||||
Id(id).
|
||||
Do(context.Background())
|
||||
if elastic.IsNotFound(err) {
|
||||
return nil, filer_pb.ErrNotFound
|
||||
}
|
||||
if searchResult != nil && searchResult.Found {
|
||||
esEntry := &ESEntry{
|
||||
ParentId: "",
|
||||
Entry: &filer2.Entry{},
|
||||
}
|
||||
err := jsoniter.Unmarshal(searchResult.Source, esEntry)
|
||||
return esEntry.Entry, err
|
||||
}
|
||||
glog.Errorf("find entry(%s),%v.", string(fullpath), err)
|
||||
return nil, filer_pb.ErrNotFound
|
||||
}
|
||||
func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||
index := getIndex(fullpath)
|
||||
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
|
||||
if strings.Count(string(fullpath), "/") == 1 {
|
||||
return store.deleteIndex(index)
|
||||
}
|
||||
return store.deleteEntry(index, id)
|
||||
}
|
||||
func (store *ElasticStore) deleteIndex(index string) (err error) {
|
||||
deleteResult, err := store.client.DeleteIndex(index).Do(context.Background())
|
||||
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
|
||||
return nil
|
||||
}
|
||||
glog.Errorf("delete index(%s) %v.", index, err)
|
||||
return err
|
||||
}
|
||||
func (store *ElasticStore) deleteEntry(index, id string) (err error) {
|
||||
deleteResult, err := store.client.Delete().
|
||||
Index(index).
|
||||
Type(indexType).
|
||||
Id(id).
|
||||
Do(context.Background())
|
||||
if err == nil {
|
||||
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
|
||||
return fmt.Errorf("delete entry %v.", err)
|
||||
}
|
||||
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||
if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
|
||||
for _, entry := range entries {
|
||||
store.DeleteEntry(ctx, entry.FullPath)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *ElasticStore) ListDirectoryEntries(
|
||||
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
|
||||
) (entries []*filer2.Entry, err error) {
|
||||
if string(fullpath) == "/" {
|
||||
return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
|
||||
}
|
||||
return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
|
||||
}
|
||||
|
||||
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
|
||||
indexResult, err := store.client.CatIndices().Do(context.Background())
|
||||
if err != nil {
|
||||
glog.Errorf("list indices %v.", err)
|
||||
return entries, err
|
||||
}
|
||||
for _, index := range indexResult {
|
||||
if strings.HasPrefix(index.Index, indexPrefix) {
|
||||
if entry, err := store.FindEntry(ctx,
|
||||
weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
|
||||
fileName := getFileName(entry.FullPath)
|
||||
if fileName == startFileName && !inclusive {
|
||||
continue
|
||||
}
|
||||
limit--
|
||||
if limit < 0 {
|
||||
break
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
}
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func (store *ElasticStore) listDirectoryEntries(
|
||||
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
|
||||
) (entries []*filer2.Entry, err error) {
|
||||
first := true
|
||||
index := getIndex(fullpath)
|
||||
nextStart := ""
|
||||
parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
|
||||
if _, err := store.client.Refresh(index).Do(context.Background()); err != nil {
|
||||
if elastic.IsNotFound(err) {
|
||||
store.client.CreateIndex(index).Do(context.Background())
|
||||
return entries, nil
|
||||
}
|
||||
}
|
||||
for {
|
||||
result := &elastic.SearchResult{}
|
||||
if (startFileName == "" && first) || inclusive {
|
||||
if result, err = store.search(index, parentId); err != nil {
|
||||
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
||||
return entries, err
|
||||
}
|
||||
} else {
|
||||
fullPath := string(fullpath) + "/" + startFileName
|
||||
if !first {
|
||||
fullPath = nextStart
|
||||
}
|
||||
after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath)))
|
||||
if result, err = store.searchAfter(index, parentId, after); err != nil {
|
||||
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
||||
return entries, err
|
||||
}
|
||||
}
|
||||
first = false
|
||||
for _, hit := range result.Hits.Hits {
|
||||
esEntry := &ESEntry{
|
||||
ParentId: "",
|
||||
Entry: &filer2.Entry{},
|
||||
}
|
||||
if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
|
||||
limit--
|
||||
if limit < 0 {
|
||||
return entries, nil
|
||||
}
|
||||
nextStart = string(esEntry.Entry.FullPath)
|
||||
fileName := getFileName(esEntry.Entry.FullPath)
|
||||
if fileName == startFileName && !inclusive {
|
||||
continue
|
||||
}
|
||||
entries = append(entries, esEntry.Entry)
|
||||
}
|
||||
}
|
||||
if len(result.Hits.Hits) < store.maxPageSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func (store *ElasticStore) search(index, parentId string) (result *elastic.SearchResult, err error) {
|
||||
if count, err := store.client.Count(index).Do(context.Background()); err == nil && count == 0 {
|
||||
return &elastic.SearchResult{
|
||||
Hits: &elastic.SearchHits{
|
||||
Hits: make([]*elastic.SearchHit, 0)},
|
||||
}, nil
|
||||
}
|
||||
queryResult, err := store.client.Search().
|
||||
Index(index).
|
||||
Query(elastic.NewMatchQuery("ParentId", parentId)).
|
||||
Size(store.maxPageSize).
|
||||
Sort("_id", false).
|
||||
Do(context.Background())
|
||||
return queryResult, err
|
||||
}
|
||||
|
||||
func (store *ElasticStore) searchAfter(index, parentId, after string) (result *elastic.SearchResult, err error) {
|
||||
queryResult, err := store.client.Search().
|
||||
Index(index).
|
||||
Query(elastic.NewMatchQuery("ParentId", parentId)).
|
||||
SearchAfter(after).
|
||||
Size(store.maxPageSize).
|
||||
Sort("_id", false).
|
||||
Do(context.Background())
|
||||
return queryResult, err
|
||||
|
||||
}
|
||||
|
||||
func (store *ElasticStore) Shutdown() {
|
||||
store.client.Stop()
|
||||
}
|
||||
|
||||
func getIndex(fullpath weed_util.FullPath) string {
|
||||
path := strings.Split(string(fullpath), "/")
|
||||
if len(path) > 1 {
|
||||
return indexPrefix + path[1]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func getFileName(fullpath weed_util.FullPath) string {
|
||||
path := strings.Split(string(fullpath), "/")
|
||||
if len(path) > 1 {
|
||||
return path[len(path)-1]
|
||||
}
|
||||
return ""
|
||||
}
|
|
@ -28,6 +28,7 @@ import (
|
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/elastic/v7"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/notification"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
|
||||
|
|
Loading…
Reference in a new issue