Merge pull request #1455 from taozix/master

add more basic elastic options.
This commit is contained in:
Chris Lu 2020-09-10 01:31:17 -07:00 committed by GitHub
commit 0d9b858cfb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 32 deletions

View file

@ -176,7 +176,15 @@ database = "seaweedfs"
[elastic7] [elastic7]
enabled = false enabled = false
servers = "http://localhost:9200" servers = [
"http://localhost1:9200",
"http://localhost2:9200",
"http://localhost3:9200",
]
username = ""
password = ""
sniff_enabled = false
healthcheck_enabled = false
# increase the value is recommend, be sure the value in Elastic is greater or equal here # increase the value is recommend, be sure the value in Elastic is greater or equal here
index.max_result_window = 10000 index.max_result_window = 10000
` `

View file

@ -2,7 +2,6 @@ package elastic
import ( import (
"context" "context"
"crypto/md5"
"fmt" "fmt"
"math" "math"
"strings" "strings"
@ -20,10 +19,15 @@ var (
indexPrefix = ".seaweedfs_" indexPrefix = ".seaweedfs_"
indexKV = ".seaweedfs_kv_entries" indexKV = ".seaweedfs_kv_entries"
mappingWithoutQuery = ` { mappingWithoutQuery = ` {
"mappings": { "mappings": {
"enabled": false "enabled": false,
} "properties": {
}` "Value":{
"type": "binary"
}
}
}
}`
) )
type ESEntry struct { type ESEntry struct {
@ -32,7 +36,7 @@ type ESEntry struct {
} }
type ESKVEntry struct { type ESKVEntry struct {
Value string `json:Value` Value []byte `json:"Value"`
} }
func init() { func init() {
@ -47,23 +51,12 @@ type ElasticStore struct {
func (store *ElasticStore) GetName() string { func (store *ElasticStore) GetName() string {
return "elastic7" return "elastic7"
} }
func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
servers := configuration.GetString(prefix + "servers") options := store.initialize(configuration, prefix)
if servers == "" { store.client, err = elastic.NewClient(options...)
return fmt.Errorf("error elastic endpoints.")
}
store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
if store.maxPageSize <= 0 {
store.maxPageSize = 10000
}
glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize)
store.client, err = elastic.NewClient(
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetURL(servers),
)
if err != nil { if err != nil {
return fmt.Errorf("init elastic %s: %v.", servers, err) return fmt.Errorf("init elastic %v.", err)
} }
if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok { if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
_, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background()) _, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
@ -73,6 +66,25 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre
} }
return nil return nil
} }
func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) {
servers := configuration.GetStringSlice(prefix + "servers")
options = append(options, elastic.SetURL(servers...))
username := configuration.GetString(prefix + "username")
password := configuration.GetString(prefix + "password")
if username != "" && password != "" {
options = append(options, elastic.SetBasicAuth(username, password))
}
options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled")))
options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled")))
store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
if store.maxPageSize <= 0 {
store.maxPageSize = 10000
}
glog.Infof("filer store elastic endpoints: %v.", servers)
return options
}
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil return ctx, nil
} }
@ -82,6 +94,7 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil return nil
} }
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
return nil, filer.ErrUnsupportedListDirectoryPrefixed return nil, filer.ErrUnsupportedListDirectoryPrefixed
} }
@ -89,9 +102,9 @@ func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
index := getIndex(entry.FullPath) index := getIndex(entry.FullPath)
dir, _ := entry.FullPath.DirAndName() dir, _ := entry.FullPath.DirAndName()
id := fmt.Sprintf("%x", md5.Sum([]byte(entry.FullPath))) id := weed_util.Md5String([]byte(entry.FullPath))
esEntry := &ESEntry{ esEntry := &ESEntry{
ParentId: fmt.Sprintf("%x", md5.Sum([]byte(dir))), ParentId: weed_util.Md5String([]byte(dir)),
Entry: entry, Entry: entry,
} }
value, err := jsoniter.Marshal(esEntry) value, err := jsoniter.Marshal(esEntry)
@ -111,12 +124,14 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry)
} }
return nil return nil
} }
func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry) return store.InsertEntry(ctx, entry)
} }
func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
index := getIndex(fullpath) index := getIndex(fullpath)
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) id := weed_util.Md5String([]byte(fullpath))
searchResult, err := store.client.Get(). searchResult, err := store.client.Get().
Index(index). Index(index).
Type(indexType). Type(indexType).
@ -136,14 +151,16 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
glog.Errorf("find entry(%s),%v.", string(fullpath), err) glog.Errorf("find entry(%s),%v.", string(fullpath), err)
return nil, filer_pb.ErrNotFound return nil, filer_pb.ErrNotFound
} }
func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
index := getIndex(fullpath) index := getIndex(fullpath)
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) id := weed_util.Md5String([]byte(fullpath))
if strings.Count(string(fullpath), "/") == 1 { if strings.Count(string(fullpath), "/") == 1 {
return store.deleteIndex(ctx, index) return store.deleteIndex(ctx, index)
} }
return store.deleteEntry(ctx, index, id) return store.deleteEntry(ctx, index, id)
} }
func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) { func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
deleteResult, err := store.client.DeleteIndex(index).Do(ctx) deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) { if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
@ -152,6 +169,7 @@ func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err e
glog.Errorf("delete index(%s) %v.", index, err) glog.Errorf("delete index(%s) %v.", index, err)
return err return err
} }
func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) { func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
deleteResult, err := store.client.Delete(). deleteResult, err := store.client.Delete().
Index(index). Index(index).
@ -166,6 +184,7 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err) glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
return fmt.Errorf("delete entry %v.", err) return fmt.Errorf("delete entry %v.", err)
} }
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
for _, entry := range entries { for _, entry := range entries {
@ -218,7 +237,7 @@ func (store *ElasticStore) listDirectoryEntries(
first := true first := true
index := getIndex(fullpath) index := getIndex(fullpath)
nextStart := "" nextStart := ""
parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) parentId := weed_util.Md5String([]byte(fullpath))
if _, err := store.client.Refresh(index).Do(ctx); err != nil { if _, err := store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) { if elastic.IsNotFound(err) {
store.client.CreateIndex(index).Do(ctx) store.client.CreateIndex(index).Do(ctx)
@ -237,7 +256,7 @@ func (store *ElasticStore) listDirectoryEntries(
if !first { if !first {
fullPath = nextStart fullPath = nextStart
} }
after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath))) after := weed_util.Md5String([]byte(fullPath))
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil { if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, err return entries, err

View file

@ -3,6 +3,7 @@ package elastic
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -32,20 +33,20 @@ func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte,
Id(string(key)). Id(string(key)).
Do(ctx) Do(ctx)
if elastic.IsNotFound(err) { if elastic.IsNotFound(err) {
return nil, filer.ErrKvNotFound return value, filer.ErrKvNotFound
} }
if searchResult != nil && searchResult.Found { if searchResult != nil && searchResult.Found {
esEntry := &ESKVEntry{} esEntry := &ESKVEntry{}
if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil { if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
return []byte(esEntry.Value), nil return esEntry.Value, nil
} }
} }
glog.Errorf("find key(%s),%v.", string(key), err) glog.Errorf("find key(%s),%v.", string(key), err)
return nil, filer.ErrKvNotFound return value, filer.ErrKvNotFound
} }
func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
esEntry := &ESKVEntry{string(value)} esEntry := &ESKVEntry{value}
val, err := jsoniter.Marshal(esEntry) val, err := jsoniter.Marshal(esEntry)
if err != nil { if err != nil {
glog.Errorf("insert key(%s) %v.", string(key), err) glog.Errorf("insert key(%s) %v.", string(key), err)