[bugfix] filer: In file modification, old chunks will be mis-deleted when they are merged(Manifestized).

This commit is contained in:
banjiaojuhao 2022-03-07 15:14:24 +08:00
parent 3aeee3d748
commit d61bea9038

View file

@ -129,6 +129,12 @@ func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
} }
} }
func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
}
}
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil { if oldEntry == nil {
@ -141,14 +147,36 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
var toDelete []*filer_pb.FileChunk var toDelete []*filer_pb.FileChunk
newChunkIds := make(map[string]bool) newChunkIds := make(map[string]bool)
for _, newChunk := range newEntry.Chunks { newDChunks, newMChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
newEntry.Chunks, 0, int64(newEntry.Size()))
if err != nil {
glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s",
newEntry.Chunks, oldEntry.Chunks)
return
}
for _, newChunk := range newDChunks {
newChunkIds[newChunk.GetFileIdString()] = true
}
for _, newChunk := range newMChunks {
newChunkIds[newChunk.GetFileIdString()] = true newChunkIds[newChunk.GetFileIdString()] = true
} }
for _, oldChunk := range oldEntry.Chunks { oldDChunks, oldMChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
oldEntry.Chunks, 0, int64(oldEntry.Size()))
if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
newEntry.Chunks, oldEntry.Chunks)
return
}
for _, oldChunk := range oldDChunks {
if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk) toDelete = append(toDelete, oldChunk)
} }
} }
f.DeleteChunks(toDelete) for _, oldChunk := range oldMChunks {
if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk)
}
}
f.DeleteChunksNotRecursive(toDelete)
} }