change elastic initialize process similar as others.

This commit is contained in:
ruitao.liu 2020-09-10 23:35:20 +08:00
parent 660d7c0edd
commit 5b0676049a

View file

@ -15,10 +15,10 @@ import (
) )
var ( var (
indexType = "_doc" indexType = "_doc"
indexPrefix = ".seaweedfs_" indexPrefix = ".seaweedfs_"
indexKV = ".seaweedfs_kv_entries" indexKV = ".seaweedfs_kv_entries"
mappingWithoutQuery = ` { kvMappings = ` {
"mappings": { "mappings": {
"enabled": false, "enabled": false,
"properties": { "properties": {
@ -53,21 +53,7 @@ func (store *ElasticStore) GetName() string {
} }
func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
options := store.initialize(configuration, prefix) options := []elastic.ClientOptionFunc{}
store.client, err = elastic.NewClient(options...)
if err != nil {
return fmt.Errorf("init elastic %v.", err)
}
if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
_, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
if err != nil {
return fmt.Errorf("create index(%s) %v.", indexKV, err)
}
}
return nil
}
func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) {
servers := configuration.GetStringSlice(prefix + "servers") servers := configuration.GetStringSlice(prefix + "servers")
options = append(options, elastic.SetURL(servers...)) options = append(options, elastic.SetURL(servers...))
username := configuration.GetString(prefix + "username") username := configuration.GetString(prefix + "username")
@ -82,7 +68,22 @@ func (store *ElasticStore) initialize(configuration weed_util.Configuration, pre
store.maxPageSize = 10000 store.maxPageSize = 10000
} }
glog.Infof("filer store elastic endpoints: %v.", servers) glog.Infof("filer store elastic endpoints: %v.", servers)
return options return store.initialize(options)
}
func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err error) {
ctx := context.Background()
store.client, err = elastic.NewClient(options...)
if err != nil {
return fmt.Errorf("init elastic %v.", err)
}
if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok {
_, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx)
if err != nil {
return fmt.Errorf("create index(%s) %v.", indexKV, err)
}
}
return nil
} }
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {