From 3f7fbfddca8614fc0d5bb31960993599b5c44bca Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Thu, 10 Sep 2020 14:22:07 +0800 Subject: [PATCH 1/5] add more basic elastic options. --- weed/command/scaffold.go | 6 +++- weed/filer/elastic/v7/elastic_store.go | 50 ++++++++++++++++++-------- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 68fe8e982..7ced118ca 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -176,7 +176,11 @@ database = "seaweedfs" [elastic7] enabled = false -servers = "http://localhost:9200" +servers = "http://localhost1:9200,http://localhost2:9200,http://localhost3:9200" +username = "" +password = "" +sniff_enabled = false +healthcheck_enabled = false # increase the value is recommend, be sure the value in Elastic is greater or equal here index.max_result_window = 10000 ` diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 29e9689f4..f720fdea0 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -47,23 +47,12 @@ type ElasticStore struct { func (store *ElasticStore) GetName() string { return "elastic7" } + func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { - servers := configuration.GetString(prefix + "servers") - if servers == "" { - return fmt.Errorf("error elastic endpoints.") - } - store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window") - if store.maxPageSize <= 0 { - store.maxPageSize = 10000 - } - glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize) - store.client, err = elastic.NewClient( - elastic.SetSniff(false), - elastic.SetHealthcheck(false), - elastic.SetURL(servers), - ) + options := store.initialize(configuration, prefix) + store.client, err = elastic.NewClient(options...) if err != nil { - return fmt.Errorf("init elastic %s: %v.", servers, err) + return fmt.Errorf("init elastic %v.", err) } if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok { _, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background()) @@ -73,6 +62,30 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre } return nil } + +func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) { + configuration.SetDefault(prefix+"servers", "http://localhost:9200") + servers := configuration.GetString(prefix + "servers") + url := make([]string, 0) + for _, v := range strings.Split(servers, ",") { + url = append(url, v) + } + options = append(options, elastic.SetURL(url...)) + username := configuration.GetString(prefix + "username") + password := configuration.GetString(prefix + "password") + if username != "" && password != "" { + options = append(options, elastic.SetBasicAuth(username, password)) + } + options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled"))) + options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled"))) + store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window") + if store.maxPageSize <= 0 { + store.maxPageSize = 10000 + } + glog.Infof("filer store elastic endpoints: %s.", servers) + return options +} + func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { return ctx, nil } @@ -82,6 +95,7 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error { func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { return nil } + func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { return nil, filer.ErrUnsupportedListDirectoryPrefixed } @@ -111,9 +125,11 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) } return nil } + func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { return store.InsertEntry(ctx, entry) } + func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { index := getIndex(fullpath) id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) @@ -136,6 +152,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful glog.Errorf("find entry(%s),%v.", string(fullpath), err) return nil, filer_pb.ErrNotFound } + func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { index := getIndex(fullpath) id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) @@ -144,6 +161,7 @@ func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.F } return store.deleteEntry(ctx, index, id) } + func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) { deleteResult, err := store.client.DeleteIndex(index).Do(ctx) if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) { @@ -152,6 +170,7 @@ func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err e glog.Errorf("delete index(%s) %v.", index, err) return err } + func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) { deleteResult, err := store.client.Delete(). Index(index). @@ -166,6 +185,7 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err) return fmt.Errorf("delete entry %v.", err) } + func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { for _, entry := range entries { From 6a5b38c0d43fd290a85884605a45775c9621ad83 Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Thu, 10 Sep 2020 15:59:16 +0800 Subject: [PATCH 2/5] fix elastic kv ops. --- weed/filer/elastic/v7/elastic_store.go | 15 ++++++++++----- weed/filer/elastic/v7/elastic_store_kv.go | 9 +++++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index f720fdea0..f1c35f7c6 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -20,10 +20,15 @@ var ( indexPrefix = ".seaweedfs_" indexKV = ".seaweedfs_kv_entries" mappingWithoutQuery = ` { - "mappings": { - "enabled": false - } -}` + "mappings": { + "enabled": false, + "properties": { + "Value":{ + "type": "binary" + } + } + } + }` ) type ESEntry struct { @@ -32,7 +37,7 @@ type ESEntry struct { } type ESKVEntry struct { - Value string `json:Value` + Value []byte `json:"Value"` } func init() { diff --git a/weed/filer/elastic/v7/elastic_store_kv.go b/weed/filer/elastic/v7/elastic_store_kv.go index 1b26bdf8e..99c03314e 100644 --- a/weed/filer/elastic/v7/elastic_store_kv.go +++ b/weed/filer/elastic/v7/elastic_store_kv.go @@ -3,6 +3,7 @@ package elastic import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -32,20 +33,20 @@ func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, Id(string(key)). Do(ctx) if elastic.IsNotFound(err) { - return nil, filer.ErrKvNotFound + return value, filer.ErrKvNotFound } if searchResult != nil && searchResult.Found { esEntry := &ESKVEntry{} if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil { - return []byte(esEntry.Value), nil + return esEntry.Value, nil } } glog.Errorf("find key(%s),%v.", string(key), err) - return nil, filer.ErrKvNotFound + return value, filer.ErrKvNotFound } func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - esEntry := &ESKVEntry{string(value)} + esEntry := &ESKVEntry{value} val, err := jsoniter.Marshal(esEntry) if err != nil { glog.Errorf("insert key(%s) %v.", string(key), err) From 72f9d7f047573ce2bd785b0ba48b8ad13e4b87da Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Thu, 10 Sep 2020 16:11:18 +0800 Subject: [PATCH 3/5] use util to generate md5. --- weed/filer/elastic/v7/elastic_store.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index f1c35f7c6..7696d6b1f 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -2,7 +2,6 @@ package elastic import ( "context" - "crypto/md5" "fmt" "math" "strings" @@ -108,9 +107,9 @@ func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, ful func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { index := getIndex(entry.FullPath) dir, _ := entry.FullPath.DirAndName() - id := fmt.Sprintf("%x", md5.Sum([]byte(entry.FullPath))) + id := weed_util.Md5String([]byte(entry.FullPath)) esEntry := &ESEntry{ - ParentId: fmt.Sprintf("%x", md5.Sum([]byte(dir))), + ParentId: weed_util.Md5String([]byte(dir)), Entry: entry, } value, err := jsoniter.Marshal(esEntry) @@ -137,7 +136,7 @@ func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { index := getIndex(fullpath) - id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) + id := weed_util.Md5String([]byte(fullpath)) searchResult, err := store.client.Get(). Index(index). Type(indexType). @@ -160,7 +159,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { index := getIndex(fullpath) - id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) + id := weed_util.Md5String([]byte(fullpath)) if strings.Count(string(fullpath), "/") == 1 { return store.deleteIndex(ctx, index) } @@ -243,7 +242,7 @@ func (store *ElasticStore) listDirectoryEntries( first := true index := getIndex(fullpath) nextStart := "" - parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) + parentId := weed_util.Md5String([]byte(fullpath)) if _, err := store.client.Refresh(index).Do(ctx); err != nil { if elastic.IsNotFound(err) { store.client.CreateIndex(index).Do(ctx) @@ -262,7 +261,7 @@ func (store *ElasticStore) listDirectoryEntries( if !first { fullPath = nextStart } - after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath))) + after := weed_util.Md5String([]byte(fullPath)) if result, err = store.searchAfter(ctx, index, parentId, after); err != nil { glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) return entries, err From 719dc43af14142ca1fb199882366b46aa4189e1f Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Thu, 10 Sep 2020 16:24:09 +0800 Subject: [PATCH 4/5] modify elastic urls from string to array. --- weed/command/scaffold.go | 6 +++++- weed/filer/elastic/v7/elastic_store.go | 9 ++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 7ced118ca..800bc6029 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -176,7 +176,11 @@ database = "seaweedfs" [elastic7] enabled = false -servers = "http://localhost1:9200,http://localhost2:9200,http://localhost3:9200" +servers = [ + "http://localhost1:9200", + "http://localhost2:9200", + "http://localhost3:9200", +] username = "" password = "" sniff_enabled = false diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 7696d6b1f..5750b2fb8 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -68,13 +68,8 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre } func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) { - configuration.SetDefault(prefix+"servers", "http://localhost:9200") - servers := configuration.GetString(prefix + "servers") - url := make([]string, 0) - for _, v := range strings.Split(servers, ",") { - url = append(url, v) - } - options = append(options, elastic.SetURL(url...)) + servers := configuration.GetStringSlice(prefix + "servers") + options = append(options, elastic.SetURL(servers...)) username := configuration.GetString(prefix + "username") password := configuration.GetString(prefix + "password") if username != "" && password != "" { From 9be4e97625808b5ab3eb235d529fff1c26f8ae59 Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Thu, 10 Sep 2020 16:30:15 +0800 Subject: [PATCH 5/5] change logs print format. --- weed/filer/elastic/v7/elastic_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 5750b2fb8..8f2af25da 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -81,7 +81,7 @@ func (store *ElasticStore) initialize(configuration weed_util.Configuration, pre if store.maxPageSize <= 0 { store.maxPageSize = 10000 } - glog.Infof("filer store elastic endpoints: %s.", servers) + glog.Infof("filer store elastic endpoints: %v.", servers) return options }