From 275c3bb19cda3492ca987c24cd5a25c267a1eb46 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Sun, 1 May 2022 17:28:58 +0500 Subject: [PATCH] ydb initial https://github.com/chrislusf/seaweedfs/issues/2942 --- docker/compose/local-ydb-compose.yml | 30 ++++ weed/filer/ydb/readme.md | 27 +++ weed/filer/ydb/ydb_store.go | 256 +++++++++++++++++++++++++++ weed/filer/ydb/ydb_types.go | 14 ++ weed/filer/ydb/ydb_types_ydbgen.go | 194 ++++++++++++++++++++ 5 files changed, 521 insertions(+) create mode 100644 docker/compose/local-ydb-compose.yml create mode 100644 weed/filer/ydb/readme.md create mode 100644 weed/filer/ydb/ydb_store.go create mode 100644 weed/filer/ydb/ydb_types.go create mode 100644 weed/filer/ydb/ydb_types_ydbgen.go diff --git a/docker/compose/local-ydb-compose.yml b/docker/compose/local-ydb-compose.yml new file mode 100644 index 000000000..ce3e7e9ed --- /dev/null +++ b/docker/compose/local-ydb-compose.yml @@ -0,0 +1,30 @@ +version: '2' + +services: + ydb: + image: cr.yandex/yc/yandex-docker-local-ydb + ports: + - 2135:2135 + - 8765:8765 + - 2136:2136 + volumes: + - ./seaweedfs.sql:/docker-entrypoint-initdb.d/seaweedfs.sql + environment: + - YDB_DEFAULT_LOG_LEVEL=NOTICE + - GRPC_TLS_PORT=2135 + - GRPC_PORT=2136 + - MON_PORT=8765 + server: + image: chrislusf/seaweedfs:local + ports: + - 9333:9333 + - 19333:19333 + - 8084:8080 + - 18084:18080 + - 8888:8888 + - 18888:18888 + command: "server -ip=server -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1" + volumes: + - ./master-cloud.toml:/etc/seaweedfs/master.toml + depends_on: + - ydb \ No newline at end of file diff --git a/weed/filer/ydb/readme.md b/weed/filer/ydb/readme.md new file mode 100644 index 000000000..90d7a18e9 --- /dev/null +++ b/weed/filer/ydb/readme.md @@ -0,0 +1,27 @@ +## YDB + +database: https://github.com/ydb-platform/ydb +go driver: https://github.com/ydb-platform/ydb-go-sdk + +options: + +``` +[ydb] +enabled=true +db_name="seaweedfs" +servers=["http://localhost:8529"] +#basic auth +user="root" +pass="test" + +# tls settings +insecure_skip_verify=true +``` + +get ydb types +``` +ydbgen -dir weed/filer/ydb +``` + +i test using this dev database: +`make dev_ydb` diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go new file mode 100644 index 000000000..7278c6301 --- /dev/null +++ b/weed/filer/ydb/ydb_store.go @@ -0,0 +1,256 @@ +package ydb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/yandex-cloud/ydb-go-sdk/v2" + "github.com/yandex-cloud/ydb-go-sdk/v2/connect" + "github.com/yandex-cloud/ydb-go-sdk/v2/table" + "strings" + "time" +) + +var ( + roTX = table.TxControl( + table.BeginTx(table.WithOnlineReadOnly()), + table.CommitTx(), + ) + rwTX = table.TxControl( + table.BeginTx(table.WithSerializableReadWrite()), + table.CommitTx(), + ) +) + +const ( + createQuery = ` + PRAGMA TablePathPrefix("%s"); + CREATE TABLE file_meta ( + dir_hash int64, + name Utf8, + directory Utf8, + meta String, + PRIMARY KEY (dir_hash, name) + );` + insertQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + DECLARE $directory AS Utf8; + DECLARE $meta AS String; + + UPSERT INTO file_meta + (dir_hash, name, directory, meta) + VALUES + ($dir_hash, $name, $directory, $meta);` + updateQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + DECLARE $directory AS Utf8; + DECLARE $meta AS String; + + REPLACE INTO file_meta + (dir_hash, name, directory, meta) + VALUES + ($dir_hash, $name, $directory, $meta) + COMMIT;` + deleteQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + + DELETE FROM file_meta + WHERE dir_hash == $dir_hash AND name == $name; + COMMIT;` + findQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + + SELECT meta + FROM file_meta + WHERE dir_hash == $dir_hash AND name == $name;` +) + +type YdbStore struct { + SupportBucketTable bool + DB *connect.Connection + connParams connect.ConnectParams + connCtx context.Context + dirBuckets string + tablePathPrefix string +} + +func init() { + filer.Stores = append(filer.Stores, &YdbStore{}) +} + +func (store *YdbStore) GetName() string { + return "ydb" +} + +func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize(configuration.GetString(prefix + "coonectionUrl")) +} + +func (store *YdbStore) initialize(sqlUrl string) (err error) { + store.SupportBucketTable = false + var cancel context.CancelFunc + store.connCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + store.connParams = connect.MustConnectionString(sqlUrl) + store.DB, err = connect.New(store.connCtx, store.connParams) + if err != nil { + store.DB.Close() + store.DB = nil + return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + } + defer store.DB.Close() + + if err = store.DB.EnsurePathExists(store.connCtx, store.connParams.Database()); err != nil { + return fmt.Errorf("connect to %s error:%v", sqlUrl, err) + } + return nil +} + +func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, query string) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta} + return table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), query)) + if err != nil { + return err + } + _, _, err = stmt.Execute(ctx, rwTX, fileMeta.QueryParameters()) + return err + }), + ) +} + +func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.insertOrUpdateEntry(ctx, entry, insertQuery) +} + +func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.insertOrUpdateEntry(ctx, entry, updateQuery) +} + +func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + var res *table.Result + err = table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), findQuery)) + if err != nil { + return err + } + _, res, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( + table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$name", ydb.UTF8Value(name)))) + return err + }), + ) + if err != nil { + return nil, err + } + for res.NextSet() { + for res.NextRow() { + res.SeekItem("meta") + entry.FullPath = fullpath + if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(res.String())); err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + return entry, nil + } + } + + return nil, filer_pb.ErrNotFound +} + +func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + return table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteQuery)) + if err != nil { + return err + } + _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( + table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$name", ydb.UTF8Value(name)))) + return err + }), + ) +} + +func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + return nil +} + +func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) CommitTransaction(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) RollbackTransaction(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) Shutdown() { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) getPrefix(dir string) string { + prefixBuckets := store.dirBuckets + "/" + if strings.HasPrefix(dir, prefixBuckets) { + // detect bucket + bucketAndDir := dir[len(prefixBuckets):] + if t := strings.Index(bucketAndDir, "/"); t > 0 { + return bucketAndDir[:t] + } + } + return "" +} + +func (store *YdbStore) withPragma(prefix, query string) string { + if store.tablePathPrefix != "" && store.tablePathPrefix != "/" { + prefix = store.tablePathPrefix + "/" + prefix + } + return `PRAGMA TablePathPrefix("` + prefix + `");` + query +} diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go new file mode 100644 index 000000000..5bc0d2afd --- /dev/null +++ b/weed/filer/ydb/ydb_types.go @@ -0,0 +1,14 @@ +package ydb + +//go:generate ydbgen + +//ydb:gen +type FileMeta struct { + DirHash int64 `ydb:"type:int64"` + Name string `ydb:"type:utf8"` + Directory string `ydb:"type:utf8"` + Meta []byte `ydb:"-"` +} + +//ydb:gen scan,value +type FileMetas []FileMeta diff --git a/weed/filer/ydb/ydb_types_ydbgen.go b/weed/filer/ydb/ydb_types_ydbgen.go new file mode 100644 index 000000000..8bd1906f9 --- /dev/null +++ b/weed/filer/ydb/ydb_types_ydbgen.go @@ -0,0 +1,194 @@ +// Code generated by ydbgen; DO NOT EDIT. + +package ydb + +import ( + "strconv" + + "github.com/yandex-cloud/ydb-go-sdk/v2" + "github.com/yandex-cloud/ydb-go-sdk/v2/table" +) + +var ( + _ = strconv.Itoa + _ = ydb.StringValue + _ = table.NewQueryParameters +) + +func (f *FileMeta) Scan(res *table.Result) (err error) { + res.SeekItem("dir_hash") + f.DirHash = res.Int64() + + res.SeekItem("name") + f.Name = res.UTF8() + + res.SeekItem("directory") + f.Directory = res.UTF8() + + return res.Err() +} + +func (f *FileMeta) QueryParameters() *table.QueryParameters { + var v0 ydb.Value + { + vp0 := ydb.Int64Value(f.DirHash) + v0 = vp0 + } + var v1 ydb.Value + { + vp0 := ydb.UTF8Value(f.Name) + v1 = vp0 + } + var v2 ydb.Value + { + vp0 := ydb.UTF8Value(f.Directory) + v2 = vp0 + } + return table.NewQueryParameters( + table.ValueParam("$dir_hash", v0), + table.ValueParam("$name", v1), + table.ValueParam("$directory", v2), + ) +} + +func (f *FileMeta) StructValue() ydb.Value { + var v0 ydb.Value + { + var v1 ydb.Value + { + vp0 := ydb.Int64Value(f.DirHash) + v1 = vp0 + } + var v2 ydb.Value + { + vp0 := ydb.UTF8Value(f.Name) + v2 = vp0 + } + var v3 ydb.Value + { + vp0 := ydb.UTF8Value(f.Directory) + v3 = vp0 + } + v0 = ydb.StructValue( + ydb.StructFieldValue("dir_hash", v1), + ydb.StructFieldValue("name", v2), + ydb.StructFieldValue("directory", v3), + ) + } + return v0 +} + +func (f *FileMeta) StructType() ydb.Type { + var t0 ydb.Type + { + fs0 := make([]ydb.StructOption, 3) + var t1 ydb.Type + { + tp0 := ydb.TypeInt64 + t1 = tp0 + } + fs0[0] = ydb.StructField("dir_hash", t1) + var t2 ydb.Type + { + tp0 := ydb.TypeUTF8 + t2 = tp0 + } + fs0[1] = ydb.StructField("name", t2) + var t3 ydb.Type + { + tp0 := ydb.TypeUTF8 + t3 = tp0 + } + fs0[2] = ydb.StructField("directory", t3) + t0 = ydb.Struct(fs0...) + } + return t0 +} + +func (fs *FileMetas) Scan(res *table.Result) (err error) { + for res.NextRow() { + var x0 FileMeta + res.SeekItem("dir_hash") + x0.DirHash = res.Int64() + + res.SeekItem("name") + x0.Name = res.UTF8() + + res.SeekItem("directory") + x0.Directory = res.UTF8() + + if res.Err() == nil { + *fs = append(*fs, x0) + } + } + return res.Err() +} + +func (fs FileMetas) ListValue() ydb.Value { + var list0 ydb.Value + vs0 := make([]ydb.Value, len(fs)) + for i0, item0 := range fs { + var v0 ydb.Value + { + var v1 ydb.Value + { + var v2 ydb.Value + { + vp0 := ydb.Int64Value(item0.DirHash) + v2 = vp0 + } + var v3 ydb.Value + { + vp0 := ydb.UTF8Value(item0.Name) + v3 = vp0 + } + var v4 ydb.Value + { + vp0 := ydb.UTF8Value(item0.Directory) + v4 = vp0 + } + v1 = ydb.StructValue( + ydb.StructFieldValue("dir_hash", v2), + ydb.StructFieldValue("name", v3), + ydb.StructFieldValue("directory", v4), + ) + } + v0 = v1 + } + vs0[i0] = v0 + } + if len(vs0) == 0 { + var t1 ydb.Type + { + var t2 ydb.Type + { + fs0 := make([]ydb.StructOption, 3) + var t3 ydb.Type + { + tp0 := ydb.TypeInt64 + t3 = tp0 + } + fs0[0] = ydb.StructField("dir_hash", t3) + var t4 ydb.Type + { + tp0 := ydb.TypeUTF8 + t4 = tp0 + } + fs0[1] = ydb.StructField("name", t4) + var t5 ydb.Type + { + tp0 := ydb.TypeUTF8 + t5 = tp0 + } + fs0[2] = ydb.StructField("directory", t5) + t2 = ydb.Struct(fs0...) + } + t1 = t2 + } + t0 := ydb.List(t1) + list0 = ydb.ZeroValue(t0) + } else { + list0 = ydb.ListValue(vs0...) + } + return list0 +}