skip directory loop if dropping the bucket

This commit is contained in:
Chris Lu 2021-01-13 13:49:04 -08:00
parent ca73013453
commit f17aa1d06c

View file

@ -30,7 +30,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
// delete the folder children, not including the folder itself // delete the folder children, not including the folder itself
var dirChunks []*filer_pb.FileChunk var dirChunks []*filer_pb.FileChunk
var dirHardLinkIds []HardLinkId var dirHardLinkIds []HardLinkId
dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isFromOtherCluster, signatures) dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures)
if err != nil { if err != nil {
glog.V(0).Infof("delete directory %s: %v", p, err) glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err) return fmt.Errorf("delete directory %s: %v", p, err)
@ -63,46 +63,49 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return nil return nil
} }
func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) { func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) {
lastFileName := "" lastFileName := ""
includeLastFile := false includeLastFile := false
for { if !isDeletingBucket {
entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "") for {
if err != nil { entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "")
glog.Errorf("list folder %s: %v", entry.FullPath, err) if err != nil {
return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) glog.Errorf("list folder %s: %v", entry.FullPath, err)
} return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
if lastFileName == "" && !isRecursive && len(entries) > 0 { }
// only for first iteration in the loop if lastFileName == "" && !isRecursive && len(entries) > 0 {
glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) // only for first iteration in the loop
return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
} return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
}
for _, sub := range entries { for _, sub := range entries {
lastFileName = sub.Name() lastFileName = sub.Name()
var dirChunks []*filer_pb.FileChunk var dirChunks []*filer_pb.FileChunk
var dirHardLinkIds []HardLinkId var dirHardLinkIds []HardLinkId
if sub.IsDirectory() { if sub.IsDirectory() {
dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil) subIsDeletingBucket := f.isBucket(sub)
chunks = append(chunks, dirChunks...) dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil)
hardlinkIds = append(hardlinkIds, dirHardLinkIds...) chunks = append(chunks, dirChunks...)
} else { hardlinkIds = append(hardlinkIds, dirHardLinkIds...)
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
if len(sub.HardLinkId) != 0 {
// hard link chunk data are deleted separately
hardlinkIds = append(hardlinkIds, sub.HardLinkId)
} else { } else {
chunks = append(chunks, sub.Chunks...) f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
if len(sub.HardLinkId) != 0 {
// hard link chunk data are deleted separately
hardlinkIds = append(hardlinkIds, sub.HardLinkId)
} else {
chunks = append(chunks, sub.Chunks...)
}
}
if err != nil && !ignoreRecursiveError {
return nil, nil, err
} }
} }
if err != nil && !ignoreRecursiveError {
return nil, nil, err
}
}
if len(entries) < PaginationSize { if len(entries) < PaginationSize {
break break
}
} }
} }