From 5b0676049a78a82f94c3d6ff1311e71e7905ff88 Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Thu, 10 Sep 2020 23:35:20 +0800 Subject: [PATCH] change elastic initialize process similar as others. --- weed/filer/elastic/v7/elastic_store.go | 41 +++++++++++++------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 8f2af25da..ec88e10a5 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -15,10 +15,10 @@ import ( ) var ( - indexType = "_doc" - indexPrefix = ".seaweedfs_" - indexKV = ".seaweedfs_kv_entries" - mappingWithoutQuery = ` { + indexType = "_doc" + indexPrefix = ".seaweedfs_" + indexKV = ".seaweedfs_kv_entries" + kvMappings = ` { "mappings": { "enabled": false, "properties": { @@ -53,21 +53,7 @@ func (store *ElasticStore) GetName() string { } func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { - options := store.initialize(configuration, prefix) - store.client, err = elastic.NewClient(options...) - if err != nil { - return fmt.Errorf("init elastic %v.", err) - } - if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok { - _, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background()) - if err != nil { - return fmt.Errorf("create index(%s) %v.", indexKV, err) - } - } - return nil -} - -func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) { + options := []elastic.ClientOptionFunc{} servers := configuration.GetStringSlice(prefix + "servers") options = append(options, elastic.SetURL(servers...)) username := configuration.GetString(prefix + "username") @@ -82,7 +68,22 @@ func (store *ElasticStore) initialize(configuration weed_util.Configuration, pre store.maxPageSize = 10000 } glog.Infof("filer store elastic endpoints: %v.", servers) - return options + return store.initialize(options) +} + +func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err error) { + ctx := context.Background() + store.client, err = elastic.NewClient(options...) + if err != nil { + return fmt.Errorf("init elastic %v.", err) + } + if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok { + _, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx) + if err != nil { + return fmt.Errorf("create index(%s) %v.", indexKV, err) + } + } + return nil } func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {