Change DeleteFolderChildren to DeleteRange api

This commit is contained in:
yulai.li 2021-08-26 17:49:56 +08:00
parent de8ef28460
commit 318757ef8c
2 changed files with 24 additions and 7 deletions

View file

@ -82,7 +82,6 @@ func (f *Filer) LoadConfiguration(config *util.ViperProxy) {
func validateOneEnabledStore(config *util.ViperProxy) { func validateOneEnabledStore(config *util.ViperProxy) {
enabledStore := "" enabledStore := ""
for _, store := range Stores { for _, store := range Stores {
glog.V(0).Infof("Store Engines: %v", store.GetName())
if config.GetBool(store.GetName() + ".enabled") { if config.GetBool(store.GetName() + ".enabled") {
if enabledStore == "" { if enabledStore == "" {
enabledStore = store.GetName() enabledStore = store.GetName()

View file

@ -21,7 +21,8 @@ func init() {
} }
type TikvStore struct { type TikvStore struct {
client *tikv.KVStore client *tikv.KVStore
deleteRangeConcurrency int
} }
// Basic APIs // Basic APIs
@ -35,6 +36,7 @@ func (store *TikvStore) Initialize(config util.Configuration, prefix string) err
for _, item := range strings.Split(pdAddrsStr, ",") { for _, item := range strings.Split(pdAddrsStr, ",") {
pdAddrs = append(pdAddrs, strings.TrimSpace(item)) pdAddrs = append(pdAddrs, strings.TrimSpace(item))
} }
store.deleteRangeConcurrency = 1
return store.initialize(pdAddrs) return store.initialize(pdAddrs)
} }
@ -142,7 +144,10 @@ func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.Full
if err != nil { if err != nil {
return err return err
} }
var (
startKey []byte = nil
endKey []byte = nil
)
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
iter, err := txn.Iter(directoryPrefix, nil) iter, err := txn.Iter(directoryPrefix, nil)
if err != nil { if err != nil {
@ -151,24 +156,37 @@ func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.Full
defer iter.Close() defer iter.Close()
for iter.Valid() { for iter.Valid() {
key := iter.Key() key := iter.Key()
endKey = key
if !bytes.HasPrefix(key, directoryPrefix) { if !bytes.HasPrefix(key, directoryPrefix) {
break break
} }
err = txn.Delete(key) if startKey == nil {
if err != nil { startKey = key
return err
} }
err = iter.Next() err = iter.Next()
if err != nil { if err != nil {
return err return err
} }
} }
// Only one Key matched just delete it.
if startKey != nil && bytes.Equal(startKey, endKey) {
return txn.Delete(startKey)
}
return nil return nil
}) })
if err != nil { if err != nil {
return fmt.Errorf("delete %s : %v", path, err) return fmt.Errorf("delete %s : %v", path, err)
} }
return nil
if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
// has startKey and endKey and they are not equals, so use delete range
_, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
if err != nil {
return fmt.Errorf("delete %s : %v", path, err)
}
}
return err
} }
func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {