hbase add ttl

This commit is contained in:
Chris Lu 2020-12-24 12:10:35 -08:00
parent 75c6edba9e
commit 0e016bc7bd
2 changed files with 13 additions and 5 deletions

View file

@ -79,7 +79,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er
value = util.MaybeGzipData(value) value = util.MaybeGzipData(value)
} }
return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value) return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
} }
func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {

View file

@ -4,13 +4,14 @@ import (
"context" "context"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/hrpc"
"time"
) )
const( const(
COLUMN_NAME = "a" COLUMN_NAME = "a"
) )
func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return store.doPut(ctx, store.cfKv, key, value) return store.doPut(ctx, store.cfKv, key, value, 0)
} }
func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
@ -21,10 +22,17 @@ func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) {
return store.doDelete(ctx, store.cfKv, key) return store.doDelete(ctx, store.cfKv, key)
} }
func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte) (err error) { func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte, ttlSecond int32) (err error) {
if ttlSecond > 0 {
return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal), hrpc.TTL(time.Duration(ttlSecond)*time.Second))
}
return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal))
}
func (store *HbaseStore) doPutWithOptions(ctx context.Context, cf string, key, value []byte, options ...func(hrpc.Call) error) (err error) {
values := map[string]map[string][]byte{cf: map[string][]byte{}} values := map[string]map[string][]byte{cf: map[string][]byte{}}
values[cf][COLUMN_NAME] = value values[cf][COLUMN_NAME] = value
putRequest, err := hrpc.NewPut(ctx, store.table, key, values) putRequest, err := hrpc.NewPut(ctx, store.table, key, values, options...)
if err != nil { if err != nil {
return err return err
} }
@ -55,7 +63,7 @@ func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (valu
func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) { func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) {
values := map[string]map[string][]byte{cf: map[string][]byte{}} values := map[string]map[string][]byte{cf: map[string][]byte{}}
values[cf][COLUMN_NAME] = nil values[cf][COLUMN_NAME] = nil
deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values) deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values, hrpc.Durability(hrpc.AsyncWal))
if err != nil { if err != nil {
return err return err
} }