From 7b544576af87865c42697492bfdf3af30ee27229 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 3 May 2022 20:16:00 +0500 Subject: [PATCH] refactor --- weed/filer/ydb/ydb_queries.go | 21 +++++++- weed/filer/ydb/ydb_store.go | 97 +++++++++++++++++++++------------- weed/filer/ydb/ydb_store_kv.go | 6 +-- weed/filer/ydb/ydb_types.go | 6 ++- 4 files changed, 87 insertions(+), 43 deletions(-) diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go index 923ccc20f..0f925584e 100644 --- a/weed/filer/ydb/ydb_queries.go +++ b/weed/filer/ydb/ydb_queries.go @@ -4,6 +4,7 @@ import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" const ( insertQuery = ` + PRAGMA TablePathPrefix("%v"); DECLARE $dir_hash AS int64; DECLARE $directory AS Utf8; DECLARE $name AS Utf8; @@ -16,6 +17,7 @@ const ( ($dir_hash, $name, $directory, $meta, $expire_at);` updateQuery = ` + PRAGMA TablePathPrefix("%v"); DECLARE $dir_hash AS int64; DECLARE $directory AS Utf8; DECLARE $name AS Utf8; @@ -28,6 +30,7 @@ const ( ($dir_hash, $name, $directory, $meta, $expire_at);` deleteQuery = ` + PRAGMA TablePathPrefix("%v"); DECLARE $dir_hash AS int64; DECLARE $name AS Utf8; @@ -35,6 +38,7 @@ const ( WHERE dir_hash = $dir_hash AND name = $name;` findQuery = ` + PRAGMA TablePathPrefix("%v"); DECLARE $dir_hash AS int64; DECLARE $name AS Utf8; @@ -43,6 +47,7 @@ const ( WHERE dir_hash = $dir_hash AND name = $name;` deleteFolderChildrenQuery = ` + PRAGMA TablePathPrefix("%v"); DECLARE $dir_hash AS int64; DECLARE $directory AS Utf8; @@ -50,6 +55,7 @@ const ( WHERE dir_hash = $dir_hash AND directory = $directory;` listDirectoryQuery = ` + PRAGMA TablePathPrefix("%v"); DECLARE $dir_hash AS int64; DECLARE $directory AS Utf8; DECLARE $start_name AS Utf8; @@ -58,6 +64,19 @@ const ( SELECT name, meta FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash = $dir_hash AND directory = $directory and name %s $start_name and name LIKE $prefix + WHERE dir_hash = $dir_hash AND directory = $directory and name > $start_name and name LIKE $prefix + ORDER BY name ASC LIMIT $limit;` + + listInclusiveDirectoryQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + DECLARE $start_name AS Utf8; + DECLARE $prefix AS Utf8; + DECLARE $limit AS Uint64; + + SELECT name, meta + FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND directory = $directory and name >= $start_name and name LIKE $prefix ORDER BY name ASC LIMIT $limit;` ) diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index e4aeabbb9..3d40a0400 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -66,6 +66,9 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, connectionTimeOut int, poolSizeLimit int) (err error) { store.dirBuckets = dirBuckets store.SupportBucketTable = useBucketPrefix + if store.SupportBucketTable { + glog.V(0).Infof("enabled BucketPrefix") + } store.dbs = make(map[string]bool) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -133,7 +136,7 @@ func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *tabl return err } -func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, query string) (err error) { +func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, isUpdate bool) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() if err != nil { @@ -143,29 +146,36 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } - queryWithPragma := withPragma(store.getPrefix(ctx, dir), query) - fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta} - return store.doTxOrDB(ctx, &queryWithPragma, fileMeta.queryParameters(entry.TtlSec), rwTX, nil) + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta} + var query *string + if isUpdate { + query = withPragma(tablePathPrefix, updateQuery) + } else { + query = withPragma(tablePathPrefix, insertQuery) + } + return store.doTxOrDB(ctx, query, fileMeta.queryParameters(entry.TtlSec), rwTX, nil) } func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { - return store.insertOrUpdateEntry(ctx, entry, insertQuery) + return store.insertOrUpdateEntry(ctx, entry, false) } func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - return store.insertOrUpdateEntry(ctx, entry, updateQuery) + return store.insertOrUpdateEntry(ctx, entry, true) } func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { dir, name := fullpath.DirAndName() var data []byte entryFound := false - queryWithPragma := withPragma(store.getPrefix(ctx, dir), findQuery) + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, findQuery) queryParams := table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), table.ValueParam("$name", types.UTF8Value(name))) - err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error { + err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { for res.NextResultSet(ctx) { for res.NextRow() { if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil { @@ -196,22 +206,24 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { dir, name := fullpath.DirAndName() - queryWithPragma := withPragma(store.getPrefix(ctx, dir), deleteQuery) + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, deleteQuery) queryParams := table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), table.ValueParam("$name", types.UTF8Value(name))) - return store.doTxOrDB(ctx, &queryWithPragma, queryParams, rwTX, nil) + return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) } func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { dir, _ := fullpath.DirAndName() - queryWithPragma := withPragma(store.getPrefix(ctx, dir), deleteFolderChildrenQuery) + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, deleteFolderChildrenQuery) queryParams := table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), - table.ValueParam("$directory", types.UTF8Value(dir))) + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$directory", types.UTF8Value(*shortDir))) - return store.doTxOrDB(ctx, &queryWithPragma, queryParams, rwTX, nil) + return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) } func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { @@ -220,19 +232,21 @@ func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.Fu func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { dir := string(dirPath) - startFileCompOp := ">" + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + var query *string if includeStartFile { - startFileCompOp = ">=" + query = withPragma(tablePathPrefix, listInclusiveDirectoryQuery) + } else { + query = withPragma(tablePathPrefix, listDirectoryQuery) } - queryWithPragma := withPragma(store.getPrefix(ctx, dir), fmt.Sprintf(listDirectoryQuery, startFileCompOp)) queryParams := table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), - table.ValueParam("$directory", types.UTF8Value(dir)), + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$directory", types.UTF8Value(*shortDir)), table.ValueParam("$start_name", types.UTF8Value(startFileName)), table.ValueParam("$prefix", types.UTF8Value(prefix+"%")), table.ValueParam("$limit", types.Uint64Value(uint64(limit))), ) - err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error { + err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { var name string var data []byte for res.NextResultSet(ctx) { @@ -337,41 +351,50 @@ func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error { if !store.SupportBucketTable { return nil } - return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE)) - }) + }); err != nil { + return err + } + glog.V(4).Infof("deleted table %s", prefix) + + return nil } -func (store *YdbStore) getPrefix(ctx context.Context, dir string) (tablePathPrefix string) { - tablePathPrefix = store.tablePathPrefix +func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPrefix *string, shortDir *string) { + tablePathPrefix = &store.tablePathPrefix + shortDir = dir if !store.SupportBucketTable { return } prefixBuckets := store.dirBuckets + "/" - if strings.HasPrefix(dir, prefixBuckets) { + if strings.HasPrefix(*dir, prefixBuckets) { // detect bucket - bucketAndDir := dir[len(prefixBuckets):] - t := strings.Index(bucketAndDir, "/") - if t < 0 { + bucketAndDir := (*dir)[len(prefixBuckets):] + var bucket string + if t := strings.Index(bucketAndDir, "/"); t > 0 { + bucket = bucketAndDir[:t] + } else if t < 0 { + bucket = bucketAndDir + } + if bucket == "" { return } - bucket := bucketAndDir[:t] - if bucket != "" { - return - } store.dbsLock.Lock() defer store.dbsLock.Unlock() - tablePathPrefix = path.Join(store.tablePathPrefix, bucket) + tablePathPrefixWithBucket := path.Join(store.tablePathPrefix, bucket) if _, found := store.dbs[bucket]; !found { - if err := store.createTable(ctx, tablePathPrefix); err == nil { + if err := store.createTable(ctx, tablePathPrefixWithBucket); err == nil { store.dbs[bucket] = true + glog.V(4).Infof("created table %s", tablePathPrefixWithBucket) } else { - glog.Errorf("createTable %s: %v", tablePathPrefix, err) + glog.Errorf("createTable %s: %v", tablePathPrefixWithBucket, err) } } + tablePathPrefix = &tablePathPrefixWithBucket } return } diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go index d63b243e9..c90821ddd 100644 --- a/weed/filer/ydb/ydb_store_kv.go +++ b/weed/filer/ydb/ydb_store_kv.go @@ -17,7 +17,7 @@ func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err dirStr, dirHash, name := abstract_sql.GenDirAndName(key) fileMeta := FileMeta{dirHash, name, dirStr, value} return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - _, _, err = s.Execute(ctx, rwTX, withPragma(store.getPrefix(ctx, dirStr), insertQuery), + _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, insertQuery), fileMeta.queryParameters(0), options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) if err != nil { @@ -31,7 +31,7 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err dirStr, dirHash, name := abstract_sql.GenDirAndName(key) valueFound := false err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, res, err := s.Execute(ctx, roTX, withPragma(store.getPrefix(ctx, dirStr), findQuery), + _, res, err := s.Execute(ctx, roTX, *withPragma(&store.tablePathPrefix, findQuery), table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(dirHash)), table.ValueParam("$name", types.UTF8Value(name))), @@ -62,7 +62,7 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { dirStr, dirHash, name := abstract_sql.GenDirAndName(key) return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - _, _, err = s.Execute(ctx, rwTX, withPragma(store.getPrefix(ctx, dirStr), insertQuery), + _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, insertQuery), table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(dirHash)), table.ValueParam("$name", types.UTF8Value(name))), diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go index 6b89b7c57..34044b6a7 100644 --- a/weed/filer/ydb/ydb_types.go +++ b/weed/filer/ydb/ydb_types.go @@ -1,6 +1,7 @@ package ydb import ( + "fmt" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" @@ -50,6 +51,7 @@ func createTableOptions() []options.CreateTableOption { ), } } -func withPragma(prefix string, query string) string { - return `PRAGMA TablePathPrefix("` + prefix + `");` + query +func withPragma(prefix *string, query string) *string { + queryWithPragma := fmt.Sprintf(query, *prefix) + return &queryWithPragma }