This commit is contained in:
banjiaojuhao 2022-03-08 16:22:43 +08:00
parent d61bea9038
commit f7f2a597dd
3 changed files with 18 additions and 17 deletions

View file

@ -68,12 +68,12 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
manifestChunks = append(manifestChunks, chunk) manifestChunks = append(manifestChunks, chunk)
// recursive // recursive
dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) dataChunks, manifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil { if subErr != nil {
return chunks, nil, subErr return chunks, nil, subErr
} }
dataChunks = append(dataChunks, dchunks...) dataChunks = append(dataChunks, dataChunks...)
manifestChunks = append(manifestChunks, mchunks...) manifestChunks = append(manifestChunks, manifestChunks...)
} }
return return
} }

View file

@ -1,6 +1,7 @@
package filer package filer
import ( import (
"math"
"strings" "strings"
"time" "time"
@ -147,33 +148,33 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
var toDelete []*filer_pb.FileChunk var toDelete []*filer_pb.FileChunk
newChunkIds := make(map[string]bool) newChunkIds := make(map[string]bool)
newDChunks, newMChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(), newDataChunks, newManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
newEntry.Chunks, 0, int64(newEntry.Size())) newEntry.Chunks, 0, math.MaxInt64)
if err != nil { if err != nil {
glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s", glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s",
newEntry.Chunks, oldEntry.Chunks) newEntry.Chunks, oldEntry.Chunks)
return return
} }
for _, newChunk := range newDChunks { for _, newChunk := range newDataChunks {
newChunkIds[newChunk.GetFileIdString()] = true newChunkIds[newChunk.GetFileIdString()] = true
} }
for _, newChunk := range newMChunks { for _, newChunk := range newManifestChunks {
newChunkIds[newChunk.GetFileIdString()] = true newChunkIds[newChunk.GetFileIdString()] = true
} }
oldDChunks, oldMChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(), oldDataChunks, oldManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
oldEntry.Chunks, 0, int64(oldEntry.Size())) oldEntry.Chunks, 0, math.MaxInt64)
if err != nil { if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
newEntry.Chunks, oldEntry.Chunks) newEntry.Chunks, oldEntry.Chunks)
return return
} }
for _, oldChunk := range oldDChunks { for _, oldChunk := range oldDataChunks {
if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk) toDelete = append(toDelete, oldChunk)
} }
} }
for _, oldChunk := range oldMChunks { for _, oldChunk := range oldManifestChunks {
if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk) toDelete = append(toDelete, oldChunk)
} }

View file

@ -153,12 +153,12 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
if verbose && entry.Entry.IsDirectory { if verbose && entry.Entry.IsDirectory {
fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
} }
dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil { if resolveErr != nil {
return nil return nil
} }
dChunks = append(dChunks, mChunks...) dataChunks = append(dataChunks, manifestChunks...)
for _, chunk := range dChunks { for _, chunk := range dataChunks {
outputChan <- &Item{ outputChan <- &Item{
vid: chunk.Fid.VolumeId, vid: chunk.Fid.VolumeId,
fileKey: chunk.Fid.FileKey, fileKey: chunk.Fid.FileKey,
@ -332,15 +332,15 @@ func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInf
fileKey uint64 fileKey uint64
} }
return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil { if resolveErr != nil {
if verbose { if verbose {
fmt.Fprintf(writer, "resolving manifest chunks in %s: %v\n", util.NewFullPath(entry.Dir, entry.Entry.Name), resolveErr) fmt.Fprintf(writer, "resolving manifest chunks in %s: %v\n", util.NewFullPath(entry.Dir, entry.Entry.Name), resolveErr)
} }
return nil return nil
} }
dChunks = append(dChunks, mChunks...) dataChunks = append(dataChunks, manifestChunks...)
for _, chunk := range dChunks { for _, chunk := range dataChunks {
outputChan <- &Item{ outputChan <- &Item{
vid: chunk.Fid.VolumeId, vid: chunk.Fid.VolumeId,
fileKey: chunk.Fid.FileKey, fileKey: chunk.Fid.FileKey,