Merge pull request #1459 from taozix/master

change elastic initialize process similar as other drivers.
This commit is contained in:
Chris Lu 2020-09-10 12:32:20 -07:00 committed by GitHub
commit 07ec6c89a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -15,10 +15,10 @@ import (
) )
var ( var (
indexType = "_doc" indexType = "_doc"
indexPrefix = ".seaweedfs_" indexPrefix = ".seaweedfs_"
indexKV = ".seaweedfs_kv_entries" indexKV = ".seaweedfs_kv_entries"
mappingWithoutQuery = ` { kvMappings = ` {
"mappings": { "mappings": {
"enabled": false, "enabled": false,
"properties": { "properties": {
@ -53,21 +53,7 @@ func (store *ElasticStore) GetName() string {
} }
func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
options := store.initialize(configuration, prefix) options := []elastic.ClientOptionFunc{}
store.client, err = elastic.NewClient(options...)
if err != nil {
return fmt.Errorf("init elastic %v.", err)
}
if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
_, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
if err != nil {
return fmt.Errorf("create index(%s) %v.", indexKV, err)
}
}
return nil
}
func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) {
servers := configuration.GetStringSlice(prefix + "servers") servers := configuration.GetStringSlice(prefix + "servers")
options = append(options, elastic.SetURL(servers...)) options = append(options, elastic.SetURL(servers...))
username := configuration.GetString(prefix + "username") username := configuration.GetString(prefix + "username")
@ -82,7 +68,22 @@ func (store *ElasticStore) initialize(configuration weed_util.Configuration, pre
store.maxPageSize = 10000 store.maxPageSize = 10000
} }
glog.Infof("filer store elastic endpoints: %v.", servers) glog.Infof("filer store elastic endpoints: %v.", servers)
return options return store.initialize(options)
}
func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err error) {
ctx := context.Background()
store.client, err = elastic.NewClient(options...)
if err != nil {
return fmt.Errorf("init elastic %v.", err)
}
if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok {
_, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx)
if err != nil {
return fmt.Errorf("create index(%s) %v.", indexKV, err)
}
}
return nil
} }
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {