From cb3c7a3cdb0c2f16603285bbc0a608635c161dc8 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 3 May 2022 16:03:10 +0500 Subject: [PATCH] enable query cache policy instead of prepare --- weed/filer/ydb/ydb_queries.go | 9 +++------ weed/filer/ydb/ydb_store.go | 10 ++++------ weed/filer/ydb/ydb_store_kv.go | 33 ++++++++++++++------------------- 3 files changed, 21 insertions(+), 31 deletions(-) diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go index bc2f37b10..63e98ebb7 100644 --- a/weed/filer/ydb/ydb_queries.go +++ b/weed/filer/ydb/ydb_queries.go @@ -23,16 +23,14 @@ const ( REPLACE INTO ` + asql.DEFAULT_TABLE + ` (dir_hash, name, directory, meta) VALUES - ($dir_hash, $name, $directory, $meta) - COMMIT;` + ($dir_hash, $name, $directory, $meta);` deleteQuery = ` DECLARE $dir_hash AS int64; DECLARE $name AS Utf8; DELETE FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash = $dir_hash AND name = $name; - COMMIT;` + WHERE dir_hash = $dir_hash AND name = $name;` findQuery = ` DECLARE $dir_hash AS int64; @@ -47,8 +45,7 @@ const ( DECLARE $directory AS Utf8; DELETE FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash = $dir_hash AND directory = $directory; - COMMIT;` + WHERE dir_hash = $dir_hash AND directory = $directory;` listDirectoryQuery = ` DECLARE $dir_hash AS int64; diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 678f58143..6e62d2719 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -12,6 +12,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/sugar" "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" @@ -104,17 +105,14 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) { var res result.Result if tx, ok := ctx.Value("tx").(table.Transaction); ok { - res, err = tx.Execute(ctx, *query, params) + res, err = tx.Execute(ctx, *query, params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) if err != nil { return fmt.Errorf("execute transaction: %v", err) } } else { err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, *query) - if err != nil { - return fmt.Errorf("prepare: %v", err) - } - _, res, err = stmt.Execute(ctx, tc, params) + _, res, err = s.Execute(ctx, tc, *query, + params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) if err != nil { return fmt.Errorf("execute statement: %v", err) } diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go index 6b1e9b99a..af147cf62 100644 --- a/weed/filer/ydb/ydb_store_kv.go +++ b/weed/filer/ydb/ydb_store_kv.go @@ -3,6 +3,7 @@ package ydb import ( "context" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" @@ -16,11 +17,9 @@ func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err dirStr, dirHash, name := abstract_sql.GenDirAndName(key) fileMeta := FileMeta{dirHash, name, dirStr, value} return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), insertQuery)) - if err != nil { - return fmt.Errorf("kv put prepare %s: %v", util.NewFullPath(dirStr, name).Name(), err) - } - _, _, err = stmt.Execute(ctx, rwTX, fileMeta.queryParameters()) + _, _, err = s.Execute(ctx, rwTX, withPragma(store.getPrefix(ctx, dirStr), insertQuery), + fileMeta.queryParameters(), + options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) if err != nil { return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) } @@ -32,13 +31,11 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err dirStr, dirHash, name := abstract_sql.GenDirAndName(key) valueFound := false err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), findQuery)) - if err != nil { - return fmt.Errorf("kv get prepare %s: %v", util.NewFullPath(dirStr, name), err) - } - _, res, err := stmt.Execute(ctx, roTX, table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(dirHash)), - table.ValueParam("$name", types.UTF8Value(name)))) + _, res, err := s.Execute(ctx, roTX, withPragma(store.getPrefix(ctx, dirStr), findQuery), + table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(dirHash)), + table.ValueParam("$name", types.UTF8Value(name))), + options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) if err != nil { return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) } @@ -65,13 +62,11 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { dirStr, dirHash, name := abstract_sql.GenDirAndName(key) return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), insertQuery)) - if err != nil { - return fmt.Errorf("Prepare %s: %v", util.NewFullPath(dirStr, name).Name(), err) - } - _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(dirHash)), - table.ValueParam("$name", types.UTF8Value(name)))) + _, _, err = s.Execute(ctx, rwTX, withPragma(store.getPrefix(ctx, dirStr), insertQuery), + table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(dirHash)), + table.ValueParam("$name", types.UTF8Value(name))), + options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) if err != nil { return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err) }