package tikv import ( "bytes" "context" "crypto/sha1" "fmt" "io" "strings" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv" ) func init() { filer.Stores = append(filer.Stores, &TikvStore{}) } type TikvStore struct { client *tikv.KVStore } // Basic APIs func (store *TikvStore) GetName() string { return "tikv" } func (store *TikvStore) Initialize(config util.Configuration, prefix string) error { pdAddrs := []string{} pdAddrsStr := config.GetString(prefix + "pdaddrs") for _, item := range strings.Split(pdAddrsStr, ",") { pdAddrs = append(pdAddrs, strings.TrimSpace(item)) } return store.initialize(pdAddrs) } func (store *TikvStore) initialize(pdAddrs []string) error { client, err := tikv.NewTxnClient(pdAddrs) store.client = client return err } func (store *TikvStore) Shutdown() { err := store.client.Close() if err != nil { glog.V(0).Infof("Shutdown TiKV client got error: %v", err) } } // ~ Basic APIs // Entry APIs func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { dir, name := entry.DirAndName() key := generateKey(dir, name) value, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } txn, err := store.getTxn(ctx) if err != nil { return err } err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { return txn.Set(key, value) }) if err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) } return nil } func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error { return store.InsertEntry(ctx, entry) } func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) { dir, name := path.DirAndName() key := generateKey(dir, name) txn, err := store.getTxn(ctx) if err != nil { return nil, err } var value []byte = nil err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { val, err := txn.Get(context.TODO(), key) if err == nil { value = val } return err }) if isNotExists(err) || value == nil { return nil, filer_pb.ErrNotFound } if err != nil { return nil, fmt.Errorf("get %s : %v", path, err) } entry := &filer.Entry{ FullPath: path, } err = entry.DecodeAttributesAndChunks(value) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } return entry, nil } func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error { dir, name := path.DirAndName() key := generateKey(dir, name) txn, err := store.getTxn(ctx) if err != nil { return err } err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { return txn.Delete(key) }) if err != nil { return fmt.Errorf("delete %s : %v", path, err) } return nil } // ~ Entry APIs // Directory APIs func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { directoryPrefix := genDirectoryKeyPrefix(path, "") txn, err := store.getTxn(ctx) if err != nil { return err } err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { iter, err := txn.Iter(directoryPrefix, nil) if err != nil { return err } defer iter.Close() for iter.Valid() { key := iter.Key() if !bytes.HasPrefix(key, directoryPrefix) { break } err = txn.Delete(key) if err != nil { return err } err = iter.Next() if err != nil { return err } } return nil }) if err != nil { return fmt.Errorf("delete %s : %v", path, err) } return nil } func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) } func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { lastFileName := "" directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) lastFileStart := directoryPrefix if startFileName != "" { lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName) } txn, err := store.getTxn(ctx) if err != nil { return lastFileName, err } err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { iter, err := txn.Iter(lastFileStart, nil) if err != nil { return err } defer iter.Close() i := int64(0) first := true for iter.Valid() { if first { first = false if !includeStartFile { if iter.Valid() { // Check first item is lastFileStart if bytes.Equal(iter.Key(), lastFileStart) { // Is lastFileStart and not include start file, just // ignore it. err = iter.Next() if err != nil { return err } continue } } } } // Check for limitation if limit > 0 { i++ if i > limit { break } } // Validate key prefix key := iter.Key() if !bytes.HasPrefix(key, directoryPrefix) { break } value := iter.Value() // Start process fileName := getNameFromKey(key) if fileName != "" { // Got file name, then generate the Entry entry := &filer.Entry{ FullPath: util.NewFullPath(string(dirPath), fileName), } // Update lastFileName lastFileName = fileName // Check for decode value. if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil { // Got error just return the error glog.V(0).Infof("list %s : %v", entry.FullPath, err) return err } // Run for each callback if return false just break the iteration if !eachEntryFunc(entry) { break } } // End process err = iter.Next() if err != nil { return err } } return nil }) if err != nil { return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) } return lastFileName, nil } // ~ Directory APIs // Transaction Related APIs func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { tx, err := store.client.Begin() if err != nil { return ctx, err } return context.WithValue(ctx, "tx", tx), nil } func (store *TikvStore) CommitTransaction(ctx context.Context) error { if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { return tx.Commit(context.Background()) } return nil } func (store *TikvStore) RollbackTransaction(ctx context.Context) error { if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { return tx.Rollback() } return nil } // ~ Transaction Related APIs // Transaction Wrapper type TxnWrapper struct { *txnkv.KVTxn inContext bool } func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error { err := f(w.KVTxn) if !w.inContext { if err != nil { w.KVTxn.Rollback() return err } w.KVTxn.Commit(context.Background()) return nil } return err } func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) { if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { return &TxnWrapper{tx, true}, nil } txn, err := store.client.Begin() if err != nil { return nil, err } return &TxnWrapper{txn, false}, nil } // ~ Transaction Wrapper // Encoding Functions func hashToBytes(dir string) []byte { h := sha1.New() io.WriteString(h, dir) b := h.Sum(nil) return b } func generateKey(dirPath, fileName string) []byte { key := hashToBytes(dirPath) key = append(key, []byte(fileName)...) return key } func getNameFromKey(key []byte) string { return string(key[sha1.Size:]) } func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) { keyPrefix = hashToBytes(string(fullpath)) if len(startFileName) > 0 { keyPrefix = append(keyPrefix, []byte(startFileName)...) } return keyPrefix } func isNotExists(err error) bool { if err == nil { return false } if err.Error() == "not exist" { return true } return false } // ~ Encoding Functions