diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go index 63e98ebb7..923ccc20f 100644 --- a/weed/filer/ydb/ydb_queries.go +++ b/weed/filer/ydb/ydb_queries.go @@ -8,22 +8,24 @@ const ( DECLARE $directory AS Utf8; DECLARE $name AS Utf8; DECLARE $meta AS String; + DECLARE $expire_at AS Optional; UPSERT INTO ` + asql.DEFAULT_TABLE + ` - (dir_hash, name, directory, meta) + (dir_hash, name, directory, meta, expire_at) VALUES - ($dir_hash, $name, $directory, $meta);` + ($dir_hash, $name, $directory, $meta, $expire_at);` updateQuery = ` DECLARE $dir_hash AS int64; DECLARE $directory AS Utf8; DECLARE $name AS Utf8; DECLARE $meta AS String; + DECLARE $expire_at AS Optional; REPLACE INTO ` + asql.DEFAULT_TABLE + ` - (dir_hash, name, directory, meta) + (dir_hash, name, directory, meta, expire_at) VALUES - ($dir_hash, $name, $directory, $meta);` + ($dir_hash, $name, $directory, $meta, $expire_at);` deleteQuery = ` DECLARE $dir_hash AS int64; diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 6e62d2719..e4aeabbb9 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -145,7 +145,7 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent } queryWithPragma := withPragma(store.getPrefix(ctx, dir), query) fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta} - return store.doTxOrDB(ctx, &queryWithPragma, fileMeta.queryParameters(), rwTX, nil) + return store.doTxOrDB(ctx, &queryWithPragma, fileMeta.queryParameters(entry.TtlSec), rwTX, nil) } func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go index af147cf62..d63b243e9 100644 --- a/weed/filer/ydb/ydb_store_kv.go +++ b/weed/filer/ydb/ydb_store_kv.go @@ -18,7 +18,7 @@ func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err fileMeta := FileMeta{dirHash, name, dirStr, value} return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { _, _, err = s.Execute(ctx, rwTX, withPragma(store.getPrefix(ctx, dirStr), insertQuery), - fileMeta.queryParameters(), + fileMeta.queryParameters(0), options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) if err != nil { return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go index aab3d0f87..6b89b7c57 100644 --- a/weed/filer/ydb/ydb_types.go +++ b/weed/filer/ydb/ydb_types.go @@ -19,21 +19,35 @@ type FileMeta struct { //ydb:gen scan,value type FileMetas []FileMeta -func (fm *FileMeta) queryParameters() *table.QueryParameters { +func (fm *FileMeta) queryParameters(ttlSec int32) *table.QueryParameters { + var expireAtValue types.Value + if ttlSec > 0 { + expireAtValue = types.Uint32Value(uint32(ttlSec)) + } else { + expireAtValue = types.NullValue(types.TypeUint32) + } return table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)), table.ValueParam("$directory", types.UTF8Value(fm.Directory)), table.ValueParam("$name", types.UTF8Value(fm.Name)), - table.ValueParam("$meta", types.StringValue(fm.Meta))) + table.ValueParam("$meta", types.StringValue(fm.Meta)), + table.ValueParam("$expire_at", expireAtValue)) } func createTableOptions() []options.CreateTableOption { + columnUnit := options.TimeToLiveUnitSeconds return []options.CreateTableOption{ options.WithColumn("dir_hash", types.Optional(types.TypeInt64)), options.WithColumn("directory", types.Optional(types.TypeUTF8)), options.WithColumn("name", types.Optional(types.TypeUTF8)), options.WithColumn("meta", types.Optional(types.TypeString)), + options.WithColumn("expire_at", types.Optional(types.TypeUint32)), options.WithPrimaryKeyColumn("dir_hash", "name"), + options.WithTimeToLiveSettings(options.TimeToLiveSettings{ + ColumnName: "expire_at", + ColumnUnit: &columnUnit, + Mode: options.TimeToLiveModeValueSinceUnixEpoch}, + ), } } func withPragma(prefix string, query string) string {