filer: fix mysql, postgres batch delete error

This commit is contained in:
Chris Lu 2021-07-22 08:23:20 -07:00
parent 84d91f143f
commit 182288f860
17 changed files with 26 additions and 35 deletions

View file

@ -10,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"strings" "strings"
"sync" "sync"
"time"
) )
type SqlGenerator interface { type SqlGenerator interface {
@ -262,7 +261,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu
return nil return nil
} }
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) error { func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
if err != nil { if err != nil {
@ -280,23 +279,16 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
} }
} }
for {
glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath), limit) res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
if err != nil { if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
} }
rowCount, err := res.RowsAffected() _, err = res.RowsAffected()
if err != nil { if err != nil {
return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
} }
if rowCount < limit {
break
}
// to give the Galera Cluster a chance to breath
time.Sleep(time.Second)
}
return nil return nil
} }

View file

@ -161,7 +161,7 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full
return nil return nil
} }
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) error { func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok { if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
return nil // filer.ErrUnsupportedSuperLargeDirectoryListing return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
} }

View file

@ -186,7 +186,7 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
return fmt.Errorf("delete entry %v.", err) return fmt.Errorf("delete entry %v.", err)
} }
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
_, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool { _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
if err := store.DeleteEntry(ctx, entry.FullPath); err != nil { if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
glog.Errorf("elastic delete %s: %v.", entry.FullPath, err) glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)

View file

@ -130,7 +130,7 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.Full
return nil return nil
} }
func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "") directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {

View file

@ -19,7 +19,6 @@ import (
const ( const (
LogFlushInterval = time.Minute LogFlushInterval = time.Minute
PaginationSize = 1024 PaginationSize = 1024
DeleteMaxRows = 10000
FilerStoreId = "filer.store.id" FilerStoreId = "filer.store.id"
) )

View file

@ -115,7 +115,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath, DeleteMaxRows); storeDeletionErr != nil { if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
} }

View file

@ -25,7 +25,7 @@ type FilerStore interface {
// err == filer_pb.ErrNotFound if not found // err == filer_pb.ErrNotFound if not found
FindEntry(context.Context, util.FullPath) (entry *Entry, err error) FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, util.FullPath) (err error) DeleteEntry(context.Context, util.FullPath) (err error)
DeleteFolderChildren(context.Context, util.FullPath, int64) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)

View file

@ -100,10 +100,10 @@ func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEn
return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath) return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath)
} }
func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath, limit int64) (err error) { func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
newFullPath := t.translatePath(fp) newFullPath := t.translatePath(fp)
return t.actualStore.DeleteFolderChildren(ctx, newFullPath, limit) return t.actualStore.DeleteFolderChildren(ctx, newFullPath)
} }
func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {

View file

@ -213,7 +213,7 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry
return actualStore.DeleteEntry(ctx, existingEntry.FullPath) return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
} }
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath, limit int64) (err error) { func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
actualStore := fsw.getActualStore(fp + "/") actualStore := fsw.getActualStore(fp + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now() start := time.Now()
@ -222,7 +222,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
}() }()
glog.V(4).Infof("DeleteFolderChildren %s", fp) glog.V(4).Infof("DeleteFolderChildren %s", fp)
return actualStore.DeleteFolderChildren(ctx, fp, limit) return actualStore.DeleteFolderChildren(ctx, fp)
} }
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {

View file

@ -109,7 +109,7 @@ func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (e
return store.doDelete(ctx, store.cfMetaDir, []byte(path)) return store.doDelete(ctx, store.cfMetaDir, []byte(path))
} }
func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath, limit int64) (err error) { func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(path.Child("")) expectedPrefix := []byte(path.Child(""))

View file

@ -136,7 +136,7 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
return nil return nil
} }
func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
batch := new(leveldb.Batch) batch := new(leveldb.Batch)

View file

@ -144,7 +144,7 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.
return nil return nil
} }
func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
batch := new(leveldb.Batch) batch := new(leveldb.Batch)

View file

@ -245,7 +245,7 @@ func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.
return nil return nil
} }
func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
db, bucket, shortPath, err := store.findDB(fullpath, true) db, bucket, shortPath, err := store.findDB(fullpath, true)
if err != nil { if err != nil {

View file

@ -167,7 +167,7 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa
return nil return nil
} }
func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) error { func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
where := bson.M{"directory": fullpath} where := bson.M{"directory": fullpath}
_, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)

View file

@ -107,7 +107,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
return nil return nil
} }
func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) (err error) { func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
members, err := store.Client.SMembers(ctx, genDirectoryListKey(string(fullpath))).Result() members, err := store.Client.SMembers(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil { if err != nil {

View file

@ -127,7 +127,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
return nil return nil
} }
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) (err error) { func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
if store.isSuperLargeDirectory(string(fullpath)) { if store.isSuperLargeDirectory(string(fullpath)) {
return nil return nil

View file

@ -148,7 +148,7 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
return nil return nil
} }
func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "") directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
batch := gorocksdb.NewWriteBatch() batch := gorocksdb.NewWriteBatch()