diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 96632fc86..f40e759d3 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -34,7 +34,7 @@ jobs: cd weed; go get -v -t -d ./... - name: Build - run: cd weed; go build -tags "elastic gocdk sqlite hdfs" -v . + run: cd weed; go build -tags "elastic gocdk sqlite hdfs ydb" -v . - name: Test - run: cd weed; go test -tags "elastic gocdk sqlite hdfs" -v ./... + run: cd weed; go test -tags "elastic gocdk sqlite hdfs ydb" -v ./... diff --git a/Makefile b/Makefile index 36da78434..1287d9f8d 100644 --- a/Makefile +++ b/Makefile @@ -8,4 +8,4 @@ install: cd weed; go install full_install: - cd weed; go install -tags "elastic gocdk sqlite hdfs" + cd weed; go install -tags "elastic gocdk sqlite hdfs ydb" diff --git a/README.md b/README.md index 13c23d577..b04c5188c 100644 --- a/README.md +++ b/README.md @@ -110,7 +110,7 @@ Also, SeaweedFS implements erasure coding with ideas from On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, -e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, RocksDB, Sqlite, MemSql, TiDB, Etcd, CockroachDB, etc. +e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, RocksDB, Sqlite, MemSql, TiDB, Etcd, CockroachDB, YDB, etc. For any distributed key value stores, the large values can be offloaded to SeaweedFS. With the fast access speed and linearly scalable capacity, @@ -412,7 +412,7 @@ The architectures are mostly the same. SeaweedFS aims to store and read files fa * SeaweedFS optimizes for small files, ensuring O(1) disk seek operation, and can also handle large files. * SeaweedFS statically assigns a volume id for a file. Locating file content becomes just a lookup of the volume id, which can be easily cached. -* SeaweedFS Filer metadata store can be any well-known and proven data stores, e.g., Redis, Cassandra, HBase, Mongodb, Elastic Search, MySql, Postgres, Sqlite, MemSql, TiDB, CockroachDB, Etcd etc, and is easy to customized. +* SeaweedFS Filer metadata store can be any well-known and proven data stores, e.g., Redis, Cassandra, HBase, Mongodb, Elastic Search, MySql, Postgres, Sqlite, MemSql, TiDB, CockroachDB, Etcd, YDB etc, and is easy to customized. * SeaweedFS Volume server also communicates directly with clients via HTTP, supporting range queries, direct uploads, etc. | System | File Metadata | File Content Read| POSIX | REST API | Optimized for large number of small files | @@ -454,7 +454,7 @@ Ceph uses CRUSH hashing to automatically manage the data placement, which is eff SeaweedFS is optimized for small files. Small files are stored as one continuous block of content, with at most 8 unused bytes between files. Small file access is O(1) disk read. -SeaweedFS Filer uses off-the-shelf stores, such as MySql, Postgres, Sqlite, Mongodb, Redis, Elastic Search, Cassandra, HBase, MemSql, TiDB, CockroachCB, Etcd, to manage file directories. These stores are proven, scalable, and easier to manage. +SeaweedFS Filer uses off-the-shelf stores, such as MySql, Postgres, Sqlite, Mongodb, Redis, Elastic Search, Cassandra, HBase, MemSql, TiDB, CockroachCB, Etcd, YDB, to manage file directories. These stores are proven, scalable, and easier to manage. | SeaweedFS | comparable to Ceph | advantage | | ------------- | ------------- | ---------------- | diff --git a/docker/Makefile b/docker/Makefile index 1cd3e3178..a1e82a338 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -49,6 +49,9 @@ dev_replicate: build dev_auditlog: build docker-compose -f compose/local-auditlog-compose.yml -p seaweedfs up +dev_ydb: build + docker-compose -f compose/local-ydb-compose.yml -p seaweedfs up + cluster: build docker-compose -f compose/local-cluster-compose.yml -p seaweedfs up diff --git a/docker/compose/local-ydb-compose.yml b/docker/compose/local-ydb-compose.yml new file mode 100644 index 000000000..a17b77b8a --- /dev/null +++ b/docker/compose/local-ydb-compose.yml @@ -0,0 +1,35 @@ +version: '2' + +services: + ydb: + image: cr.yandex/yc/yandex-docker-local-ydb + ports: + - 2135:2135 + - 8765:8765 + - 2136:2136 + environment: + - YDB_DEFAULT_LOG_LEVEL=DEBUG + - GRPC_TLS_PORT=2135 + - GRPC_PORT=2136 + - MON_PORT=8765 + s3: + image: chrislusf/seaweedfs:local + ports: + - 9333:9333 + - 19333:19333 + - 8084:8080 + - 18084:18080 + - 8888:8888 + - 8000:8000 + - 18888:18888 + command: "server -ip=s3 -filer -master.volumeSizeLimitMB=16 -volume.max=0 -volume -volume.preStopSeconds=1 -s3 -s3.config=/etc/seaweedfs/s3.json -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false" + volumes: + - ./s3.json:/etc/seaweedfs/s3.json + environment: + WEED_LEVELDB2_ENABLED: "false" + WEED_YDB_ENABLED: "true" + WEED_YDB_DSN: "grpc://ydb:2136/?database=local" + WEED_YDB_PREFIX: "seaweedfs" + YDB_ANONYMOUS_CREDENTIALS: 1 + WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 + WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1 \ No newline at end of file diff --git a/go.mod b/go.mod index 09c393674..0ee247636 100644 --- a/go.mod +++ b/go.mod @@ -124,15 +124,15 @@ require ( golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd golang.org/x/image v0.0.0-20200119044424-58c23975cae1 - golang.org/x/net v0.0.0-20220412020605-290c469a71a5 + golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect - golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 + golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023 golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect google.golang.org/api v0.77.0 google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20220414192740-2d67ff6cf2b4 // indirect + google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect google.golang.org/grpc v1.46.0 google.golang.org/protobuf v1.28.0 gopkg.in/inf.v0 v0.9.1 // indirect @@ -203,6 +203,12 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.2.0 // indirect github.com/tinylib/msgp v1.1.6 // indirect + github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e // indirect + github.com/ydb-platform/ydb-go-genproto v0.0.0-20220203104745-929cf9c248bc // indirect + github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.0 // indirect + github.com/ydb-platform/ydb-go-sdk/v3 v3.24.2 // indirect + github.com/ydb-platform/ydb-go-yc v0.6.1 // indirect + github.com/ydb-platform/ydb-go-yc-metadata v0.0.9 // indirect go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect go.uber.org/atomic v1.9.0 // indirect diff --git a/go.sum b/go.sum index 2899f0d23..109a3e0e1 100644 --- a/go.sum +++ b/go.sum @@ -858,6 +858,19 @@ github.com/xdg-go/scram v1.1.0 h1:d70R37I0HrDLsafRrMBXyrD4lmQbCHE873t00Vr0gm0= github.com/xdg-go/scram v1.1.0/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= github.com/xdg-go/stringprep v1.0.2 h1:6iq84/ryjjeRmMJwxutI51F2GIPlP5BfTvXHeYjyhBc= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= +github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e h1:9LPdmD1vqadsDQUva6t2O9MbnyvoOgo8nFNPaOIH5U8= +github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e/go.mod h1:HEUYX/p8966tMUHHT+TsS0hF/Ca/NYwqprC5WXSDMfE= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20220203104745-929cf9c248bc h1:xvTP0fhYNm+Ws+xC34jzF9EdorPUKkucJr0TyybqVSk= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20220203104745-929cf9c248bc/go.mod h1:cc138nptTn9eKptCQl/grxP6pBKpo/bnXDiOxuVZtps= +github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.0 h1:74zGbvLn5kwLkkVoicJWzmvWhaIGdIWLUr1tfaMul08= +github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.0/go.mod h1:9E5eeVy08G/cu5azQVYGHduqxa6hz/dyZzJDjDTE010= +github.com/ydb-platform/ydb-go-sdk/v3 v3.9.0/go.mod h1:/KCidORSzHOsn+j46Iqb8Tf6UXNhPWRMcrMieh+i9Xw= +github.com/ydb-platform/ydb-go-sdk/v3 v3.24.2 h1:hFwtj5icsW5s3+sDXkmEa+rZN6ez9f4FeIOv2AYBrMk= +github.com/ydb-platform/ydb-go-sdk/v3 v3.24.2/go.mod h1:/KCidORSzHOsn+j46Iqb8Tf6UXNhPWRMcrMieh+i9Xw= +github.com/ydb-platform/ydb-go-yc v0.6.1 h1:DBw32JwTOsfFGnMlwri2f7C2joYvFnLNYEQAopID/Qg= +github.com/ydb-platform/ydb-go-yc v0.6.1/go.mod h1:iMalotfQEHibqPDNkwn0oT2UC5ieS5j6teIuGgPzaSE= +github.com/ydb-platform/ydb-go-yc-metadata v0.0.9 h1:jymJK3FVUphDa5q6oyjLODXLc34AR+aFTMiybacMLy0= +github.com/ydb-platform/ydb-go-yc-metadata v0.0.9/go.mod h1:L7zqxXrf3DXY2CWd9T9JBiPVpdooGBxJr4CPtWiMLCg= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1037,6 +1050,8 @@ golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220401154927-543a649e0bdd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220412020605-290c469a71a5 h1:bRb386wvrE+oBNdF1d/Xh9mQrfQ4ecYhW5qJ5GvTGT4= golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1167,6 +1182,8 @@ golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc= golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba h1:AyHWHCBVlIYI5rgEM3o+1PLd0sLPcIAoaUckGQMaWtw= +golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1406,6 +1423,8 @@ google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac/go.mod h1:8w6bsBMX google.golang.org/genproto v0.0.0-20220413183235-5e96e2839df9/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/genproto v0.0.0-20220414192740-2d67ff6cf2b4 h1:myaecH64R0bIEDjNORIel4iXubqzaHU1K2z8ajBwWcM= google.golang.org/genproto v0.0.0-20220414192740-2d67ff6cf2b4/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= +google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 h1:hrbNEivu7Zn1pxvHk6MBrq9iE22woVILTHqexqBxe6I= +google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -1432,6 +1451,7 @@ google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnD google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.46.0 h1:oCjezcn6g6A75TGoKYBPgKmVBLexhYLM6MebdrPApP8= diff --git a/weed/command/imports.go b/weed/command/imports.go index 5b3195907..04079b162 100644 --- a/weed/command/imports.go +++ b/weed/command/imports.go @@ -32,4 +32,5 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" + _ "github.com/chrislusf/seaweedfs/weed/filer/ydb" ) diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 0a505bbdc..595fb2e62 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -295,6 +295,19 @@ password="" # skip tls cert validation insecure_skip_verify = true +[ydb] # https://ydb.tech/ +enabled = false +dsn = "grpc://localhost:2136?database=/local" +prefix = "seaweedfs" +useBucketPrefix = true # Fast Bucket Deletion +poolSizeLimit = 50 +dialTimeOut = 10 + +# Authenticate produced with one of next environment variables: +# YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS= — used service account key file by path +# YDB_ANONYMOUS_CREDENTIALS="1" — used for authenticate with anonymous access. Anonymous access needs for connect to testing YDB installation +# YDB_METADATA_CREDENTIALS="1" — used metadata service for authenticate to YDB from yandex cloud virtual machine or from yandex function +# YDB_ACCESS_TOKEN_CREDENTIALS= — used for authenticate to YDB with short-life access token. For example, access token may be IAM token ########################## ########################## diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 4bf9b16fa..13268b944 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -156,7 +156,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go index 03b016c76..aaf1c196c 100644 --- a/weed/filer/abstract_sql/abstract_sql_store_kv.go +++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go @@ -18,7 +18,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by return fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value) if err == nil { @@ -53,7 +53,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b return nil, fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr) err = row.Scan(&value) @@ -76,7 +76,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er return fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr) if err != nil { @@ -92,7 +92,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er } -func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) { +func GenDirAndName(key []byte) (dirStr string, dirHash int64, name string) { for len(key) < 8 { key = append(key, 0) } diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go index 9fd1fffb3..13d14b2b0 100644 --- a/weed/filer/arangodb/arangodb_store.go +++ b/weed/filer/arangodb/arangodb_store.go @@ -157,7 +157,7 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } model := &Model{ @@ -196,7 +196,7 @@ func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } model := &Model{ diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index fb61b0771..d8c094a45 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -100,7 +100,7 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 825cbb365..1912bcd6d 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -82,7 +82,7 @@ func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (er return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = weed_util.MaybeGzipData(meta) } diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index a092ee456..260945b33 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -7,6 +7,8 @@ import ( "io" ) +const CountEntryChunksForGzip = 50 + var ( ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing") diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index e0d878ca7..c5d6eb48c 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -75,7 +75,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er if err != nil { return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index 73d757e62..6abb37f99 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -86,7 +86,7 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 966686ed9..d68493bd7 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -88,7 +88,7 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index e448f0093..d21515bd4 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -177,7 +177,7 @@ func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index c12354ad6..83686bfe7 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -107,7 +107,7 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index 0cdf58d7f..89684647b 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -40,7 +40,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index deccf8922..7a34092a0 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -52,7 +52,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go index f04ee493d..10a87e2a4 100644 --- a/weed/filer/redis3/universal_redis_store.go +++ b/weed/filer/redis3/universal_redis_store.go @@ -40,7 +40,7 @@ func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go index 9674ac03f..0ab0f2f24 100644 --- a/weed/filer/redis_lua/universal_redis_store.go +++ b/weed/filer/redis_lua/universal_redis_store.go @@ -53,7 +53,7 @@ func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *fil return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/ydb/doc.go b/weed/filer/ydb/doc.go new file mode 100644 index 000000000..6ade3a8d8 --- /dev/null +++ b/weed/filer/ydb/doc.go @@ -0,0 +1,9 @@ +/* + +Package ydb is for YDB filer store. + +The referenced "github.com/ydb-platform/ydb-go-sdk/v3" library is too big when compiled. +So this is only compiled in "make full_install". + +*/ +package ydb diff --git a/weed/filer/ydb/readme.md b/weed/filer/ydb/readme.md new file mode 100644 index 000000000..b617461fd --- /dev/null +++ b/weed/filer/ydb/readme.md @@ -0,0 +1,27 @@ +## YDB + +database: https://github.com/ydb-platform/ydb + +go driver: https://github.com/ydb-platform/ydb-go-sdk + +options: + +``` +[ydb] +enabled=true +dsn=grpcs://ydb-ru.yandex.net:2135/?database=/ru/home/username/db +prefix="seaweedfs" +useBucketPrefix=true +poolSizeLimit=50 +dialTimeOut = 10 +``` + +Authenticate produced with one of next environment variables: + * `YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=` — used service account key file by path + * `YDB_ANONYMOUS_CREDENTIALS="1"` — used for authenticate with anonymous access. Anonymous access needs for connect to testing YDB installation + * `YDB_METADATA_CREDENTIALS="1"` — used metadata service for authenticate to YDB from yandex cloud virtual machine or from yandex function + * `YDB_ACCESS_TOKEN_CREDENTIALS=` — used for authenticate to YDB with short-life access token. For example, access token may be IAM token + * `YDB_CONNECTION_STRING="grpcs://endpoint/?database=database"` + + * i test using this dev database: +`make dev_ydb` diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go new file mode 100644 index 000000000..c8876e004 --- /dev/null +++ b/weed/filer/ydb/ydb_queries.go @@ -0,0 +1,85 @@ +//go:build ydb +// +build ydb + +package ydb + +import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + +const ( + insertQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + DECLARE $name AS Utf8; + DECLARE $meta AS String; + DECLARE $expire_at AS Optional; + + UPSERT INTO ` + asql.DEFAULT_TABLE + ` + (dir_hash, name, directory, meta, expire_at) + VALUES + ($dir_hash, $name, $directory, $meta, $expire_at);` + + updateQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + DECLARE $name AS Utf8; + DECLARE $meta AS String; + DECLARE $expire_at AS Optional; + + REPLACE INTO ` + asql.DEFAULT_TABLE + ` + (dir_hash, name, directory, meta, expire_at) + VALUES + ($dir_hash, $name, $directory, $meta, $expire_at);` + + deleteQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $name AS Utf8; + + DELETE FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND name = $name;` + + findQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $name AS Utf8; + + SELECT meta + FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND name = $name;` + + deleteFolderChildrenQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + + DELETE FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND directory = $directory;` + + listDirectoryQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + DECLARE $start_name AS Utf8; + DECLARE $prefix AS Utf8; + DECLARE $limit AS Uint64; + + SELECT name, meta + FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND directory = $directory and name > $start_name and name LIKE $prefix + ORDER BY name ASC LIMIT $limit;` + + listInclusiveDirectoryQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + DECLARE $start_name AS Utf8; + DECLARE $prefix AS Utf8; + DECLARE $limit AS Uint64; + + SELECT name, meta + FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND directory = $directory and name >= $start_name and name LIKE $prefix + ORDER BY name ASC LIMIT $limit;` +) diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go new file mode 100644 index 000000000..5b0e4e764 --- /dev/null +++ b/weed/filer/ydb/ydb_store.go @@ -0,0 +1,403 @@ +//go:build ydb +// +build ydb + +package ydb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/sugar" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "os" + "path" + "strings" + "sync" + "time" +) + +const ( + defaultDialTimeOut = 10 +) + +var ( + roTX = table.TxControl( + table.BeginTx(table.WithOnlineReadOnly()), + table.CommitTx(), + ) + rwTX = table.DefaultTxControl() +) + +type YdbStore struct { + DB ydb.Connection + dirBuckets string + tablePathPrefix string + SupportBucketTable bool + dbs map[string]bool + dbsLock sync.Mutex +} + +func init() { + filer.Stores = append(filer.Stores, &YdbStore{}) +} + +func (store *YdbStore) GetName() string { + return "ydb" +} + +func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString("filer.options.buckets_folder"), + configuration.GetString(prefix+"dsn"), + configuration.GetString(prefix+"prefix"), + configuration.GetBool(prefix+"useBucketPrefix"), + configuration.GetInt(prefix+"dialTimeOut"), + configuration.GetInt(prefix+"poolSizeLimit"), + ) +} + +func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, dialTimeOut int, poolSizeLimit int) (err error) { + store.dirBuckets = dirBuckets + store.SupportBucketTable = useBucketPrefix + if store.SupportBucketTable { + glog.V(0).Infof("enabled BucketPrefix") + } + store.dbs = make(map[string]bool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if dialTimeOut == 0 { + dialTimeOut = defaultDialTimeOut + } + opts := []ydb.Option{ + ydb.WithDialTimeout(time.Duration(dialTimeOut) * time.Second), + environ.WithEnvironCredentials(ctx), + } + if poolSizeLimit > 0 { + opts = append(opts, ydb.WithSessionPoolSizeLimit(poolSizeLimit)) + } + if dsn == "" { + dsn = os.Getenv("YDB_CONNECTION_STRING") + } + store.DB, err = ydb.Open(ctx, dsn, opts...) + if err != nil || store.DB == nil { + if store.DB != nil { + _ = store.DB.Close(ctx) + store.DB = nil + } + return fmt.Errorf("can not connect to %s error: %v", dsn, err) + } + + store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix) + if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { + return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err) + } + + if err = store.createTable(ctx, store.tablePathPrefix); err != nil { + glog.Errorf("createTable %s: %v", store.tablePathPrefix, err) + } + return err +} + +func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) { + var res result.Result + if tx, ok := ctx.Value("tx").(table.Transaction); ok { + res, err = tx.Execute(ctx, *query, params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) + if err != nil { + return fmt.Errorf("execute transaction: %v", err) + } + } else { + err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { + _, res, err = s.Execute(ctx, tc, *query, + params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) + if err != nil { + return fmt.Errorf("execute statement: %v", err) + } + return nil + }) + } + if err != nil { + return err + } + if res != nil { + defer func() { _ = res.Close() }() + if processResultFunc != nil { + if err = processResultFunc(res); err != nil { + return fmt.Errorf("process result: %v", err) + } + } + } + return err +} + +func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, isUpdate bool) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.Chunks) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta} + var query *string + if isUpdate { + query = withPragma(tablePathPrefix, updateQuery) + } else { + query = withPragma(tablePathPrefix, insertQuery) + } + return store.doTxOrDB(ctx, query, fileMeta.queryParameters(entry.TtlSec), rwTX, nil) +} + +func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.insertOrUpdateEntry(ctx, entry, false) +} + +func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.insertOrUpdateEntry(ctx, entry, true) +} + +func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + var data []byte + entryFound := false + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, findQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$name", types.UTF8Value(name))) + + err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { + for res.NextResultSet(ctx) { + for res.NextRow() { + if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil { + return fmt.Errorf("scanNamed %s : %v", fullpath, err) + } + entryFound = true + return nil + } + } + return res.Err() + }) + if err != nil { + return nil, err + } + if !entryFound { + return nil, filer_pb.ErrNotFound + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { + return nil, fmt.Errorf("decode %s : %v", fullpath, err) + } + + return entry, nil +} + +func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, deleteQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$name", types.UTF8Value(name))) + + return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) +} + +func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + dir, _ := fullpath.DirAndName() + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, deleteFolderChildrenQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$directory", types.UTF8Value(*shortDir))) + + return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) +} + +func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) +} + +func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + dir := string(dirPath) + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + var query *string + if includeStartFile { + query = withPragma(tablePathPrefix, listInclusiveDirectoryQuery) + } else { + query = withPragma(tablePathPrefix, listDirectoryQuery) + } + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$directory", types.UTF8Value(*shortDir)), + table.ValueParam("$start_name", types.UTF8Value(startFileName)), + table.ValueParam("$prefix", types.UTF8Value(prefix+"%")), + table.ValueParam("$limit", types.Uint64Value(uint64(limit))), + ) + err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { + var name string + var data []byte + for res.NextResultSet(ctx) { + for res.NextRow() { + if err := res.ScanNamed( + named.OptionalWithDefault("name", &name), + named.OptionalWithDefault("meta", &data)); err != nil { + return fmt.Errorf("list scanNamed %s : %v", dir, err) + } + lastFileName = name + entry := &filer.Entry{ + FullPath: util.NewFullPath(dir, name), + } + if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { + return fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + } + if !eachEntryFunc(entry) { + break + } + } + } + return res.Err() + }) + if err != nil { + return lastFileName, err + } + return lastFileName, nil +} + +func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + session, err := store.DB.Table().CreateSession(ctx) + if err != nil { + return ctx, err + } + tx, err := session.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite())) + if err != nil { + return ctx, err + } + return context.WithValue(ctx, "tx", tx), nil +} + +func (store *YdbStore) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(table.Transaction); ok { + _, err := tx.CommitTx(ctx) + return err + } + return nil +} + +func (store *YdbStore) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(table.Transaction); ok { + return tx.Rollback(ctx) + } + return nil +} + +func (store *YdbStore) Shutdown() { + _ = store.DB.Close(context.Background()) +} + +func (store *YdbStore) CanDropWholeBucket() bool { + return store.SupportBucketTable +} + +func (store *YdbStore) OnBucketCreation(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if err := store.createTable(context.Background(), + path.Join(store.tablePathPrefix, bucket)); err != nil { + glog.Errorf("createTable %s: %v", bucket, err) + } + + if store.dbs == nil { + return + } + store.dbs[bucket] = true +} + +func (store *YdbStore) OnBucketDeletion(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if err := store.deleteTable(context.Background(), + path.Join(store.tablePathPrefix, bucket)); err != nil { + glog.Errorf("deleteTable %s: %v", bucket, err) + } + + if store.dbs == nil { + return + } + delete(store.dbs, bucket) +} + +func (store *YdbStore) createTable(ctx context.Context, prefix string) error { + return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...) + }) +} + +func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error { + if !store.SupportBucketTable { + return nil + } + if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE)) + }); err != nil { + return err + } + glog.V(4).Infof("deleted table %s", prefix) + + return nil +} + +func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPrefix *string, shortDir *string) { + tablePathPrefix = &store.tablePathPrefix + shortDir = dir + if !store.SupportBucketTable { + return + } + + prefixBuckets := store.dirBuckets + "/" + if strings.HasPrefix(*dir, prefixBuckets) { + // detect bucket + bucketAndDir := (*dir)[len(prefixBuckets):] + var bucket string + if t := strings.Index(bucketAndDir, "/"); t > 0 { + bucket = bucketAndDir[:t] + } else if t < 0 { + bucket = bucketAndDir + } + if bucket == "" { + return + } + + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + tablePathPrefixWithBucket := path.Join(store.tablePathPrefix, bucket) + if _, found := store.dbs[bucket]; !found { + if err := store.createTable(ctx, tablePathPrefixWithBucket); err == nil { + store.dbs[bucket] = true + glog.V(4).Infof("created table %s", tablePathPrefixWithBucket) + } else { + glog.Errorf("createTable %s: %v", tablePathPrefixWithBucket, err) + } + } + tablePathPrefix = &tablePathPrefixWithBucket + } + return +} diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go new file mode 100644 index 000000000..d64597764 --- /dev/null +++ b/weed/filer/ydb/ydb_store_kv.go @@ -0,0 +1,79 @@ +//go:build ydb +// +build ydb + +package ydb + +import ( + "context" + "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" +) + +func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + fileMeta := FileMeta{dirHash, name, dirStr, value} + return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { + _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, insertQuery), + fileMeta.queryParameters(0), + options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) + if err != nil { + return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) + } + return nil + }) +} + +func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + valueFound := false + err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, res, err := s.Execute(ctx, roTX, *withPragma(&store.tablePathPrefix, findQuery), + table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(dirHash)), + table.ValueParam("$name", types.UTF8Value(name))), + options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) + if err != nil { + return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) + } + defer func() { _ = res.Close() }() + for res.NextResultSet(ctx) { + for res.NextRow() { + if err := res.ScanNamed(named.OptionalWithDefault("meta", &value)); err != nil { + return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err) + } + valueFound = true + return nil + } + } + return res.Err() + }) + + if !valueFound { + return nil, filer.ErrKvNotFound + } + + return value, nil +} + +func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { + _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, insertQuery), + table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(dirHash)), + table.ValueParam("$name", types.UTF8Value(name))), + options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) + if err != nil { + return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err) + } + return nil + }) + +} diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go new file mode 100644 index 000000000..d256eaf50 --- /dev/null +++ b/weed/filer/ydb/ydb_types.go @@ -0,0 +1,60 @@ +//go:build ydb +// +build ydb + +package ydb + +import ( + "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" +) + +//go:generate ydbgen + +//ydb:gen +type FileMeta struct { + DirHash int64 `ydb:"type:int64"` + Name string `ydb:"type:utf8"` + Directory string `ydb:"type:utf8"` + Meta []byte `ydb:"type:string"` +} + +//ydb:gen scan,value +type FileMetas []FileMeta + +func (fm *FileMeta) queryParameters(ttlSec int32) *table.QueryParameters { + var expireAtValue types.Value + if ttlSec > 0 { + expireAtValue = types.Uint32Value(uint32(ttlSec)) + } else { + expireAtValue = types.NullValue(types.TypeUint32) + } + return table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)), + table.ValueParam("$directory", types.UTF8Value(fm.Directory)), + table.ValueParam("$name", types.UTF8Value(fm.Name)), + table.ValueParam("$meta", types.StringValue(fm.Meta)), + table.ValueParam("$expire_at", expireAtValue)) +} + +func createTableOptions() []options.CreateTableOption { + columnUnit := options.TimeToLiveUnitSeconds + return []options.CreateTableOption{ + options.WithColumn("dir_hash", types.Optional(types.TypeInt64)), + options.WithColumn("directory", types.Optional(types.TypeUTF8)), + options.WithColumn("name", types.Optional(types.TypeUTF8)), + options.WithColumn("meta", types.Optional(types.TypeString)), + options.WithColumn("expire_at", types.Optional(types.TypeUint32)), + options.WithPrimaryKeyColumn("dir_hash", "name"), + options.WithTimeToLiveSettings(options.TimeToLiveSettings{ + ColumnName: "expire_at", + ColumnUnit: &columnUnit, + Mode: options.TimeToLiveModeValueSinceUnixEpoch}, + ), + } +} +func withPragma(prefix *string, query string) *string { + queryWithPragma := fmt.Sprintf(query, *prefix) + return &queryWithPragma +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 29baf46ea..82b15084d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -38,6 +38,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" + _ "github.com/chrislusf/seaweedfs/weed/filer/ydb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"