mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
directly delete file chunks
keeping current async deletions for now
This commit is contained in:
parent
781585b195
commit
290b5e2cd0
|
@ -47,7 +47,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
|
|||
}
|
||||
|
||||
if shouldDeleteChunks && !isCollection {
|
||||
go f.DeleteChunks(chunks)
|
||||
f.DirectDeleteChunks(chunks)
|
||||
}
|
||||
// A case not handled:
|
||||
// what if the chunk is in a different collection?
|
||||
|
|
|
@ -68,6 +68,50 @@ func (f *Filer) loopProcessingDeletion() {
|
|||
}
|
||||
}
|
||||
|
||||
func (f *Filer) doDeleteFileIds(fileIds []string) {
|
||||
|
||||
lookupFunc := LookupByMasterClientFn(f.MasterClient)
|
||||
DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
|
||||
|
||||
for len(fileIds) > 0 {
|
||||
var toDeleteFileIds []string
|
||||
if len(fileIds) > DeletionBatchSize {
|
||||
toDeleteFileIds = fileIds[:DeletionBatchSize]
|
||||
fileIds = fileIds[DeletionBatchSize:]
|
||||
} else {
|
||||
toDeleteFileIds = fileIds
|
||||
fileIds = fileIds[:0]
|
||||
}
|
||||
deletionCount := len(toDeleteFileIds)
|
||||
_, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "already deleted") {
|
||||
glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk) {
|
||||
var fildIdsToDelete []string
|
||||
for _, chunk := range chunks {
|
||||
if !chunk.IsChunkManifest {
|
||||
fildIdsToDelete = append(fildIdsToDelete, chunk.GetFileIdString())
|
||||
continue
|
||||
}
|
||||
dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
|
||||
if manifestResolveErr != nil {
|
||||
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
|
||||
}
|
||||
for _, dChunk := range dataChunks {
|
||||
fildIdsToDelete = append(fildIdsToDelete, dChunk.GetFileIdString())
|
||||
}
|
||||
fildIdsToDelete = append(fildIdsToDelete, chunk.GetFileIdString())
|
||||
}
|
||||
|
||||
f.doDeleteFileIds(fildIdsToDelete)
|
||||
}
|
||||
|
||||
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
|
||||
for _, chunk := range chunks {
|
||||
if !chunk.IsChunkManifest {
|
||||
|
|
Loading…
Reference in a new issue