shell: volume.fsck checks missing chunks in filer

fix https://github.com/chrislusf/seaweedfs/issues/2154
This commit is contained in:
Chris Lu 2021-06-24 23:56:24 -07:00
parent 9dd09bbb33
commit 1cac2f2278

View file

@ -1,6 +1,7 @@
package shell package shell
import ( import (
"bufio"
"context" "context"
"flag" "flag"
"fmt" "fmt"
@ -44,6 +45,12 @@ func (c *commandVolumeFsck) Help() string {
2. collect all file ids from the filer, as set B 2. collect all file ids from the filer, as set B
3. find out the set A subtract B 3. find out the set A subtract B
If -findMissingChunksInFiler is enabled, this works
in a reverse way:
1. collect all file ids from all volumes, as set A
2. collect all file ids from the filer, as set B
3. find out the set B subtract A
` `
} }
@ -55,6 +62,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
verbose := fsckCommand.Bool("v", false, "verbose mode") verbose := fsckCommand.Bool("v", false, "verbose mode")
findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see help volume.fsck")
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer") applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
if err = fsckCommand.Parse(args); err != nil { if err = fsckCommand.Parse(args); err != nil {
return nil return nil
@ -86,21 +94,105 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
} }
} }
if *findMissingChunksInFiler {
// collect all filer file ids and paths
if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
}
// for each volume, check filer file ids
if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
}
} else {
// collect all filer file ids // collect all filer file ids
if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil { if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
return fmt.Errorf("failed to collect file ids from filer: %v", err) return fmt.Errorf("failed to collect file ids from filer: %v", err)
} }
// volume file ids substract filer file ids // volume file ids substract filer file ids
err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, verbose, applyPurging) if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
}
}
return err return nil
} }
func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose *bool, applyPurging *bool) (error) { func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
if verbose {
fmt.Fprintf(writer, "checking each file from filer ...\n")
}
files := make(map[uint32]*os.File)
for vid := range volumeIdToServer {
dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if openErr != nil {
return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
}
files[vid] = dst
}
defer func() {
for _, f := range files {
f.Close()
}
}()
type Item struct {
vid uint32
fileKey uint64
cookie uint32
path util.FullPath
}
return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
buffer := make([]byte, 8)
for item := range outputChan {
i := item.(*Item)
if f, ok := files[i.vid]; ok {
util.Uint64toBytes(buffer, i.fileKey)
f.Write(buffer)
util.Uint32toBytes(buffer, i.cookie)
util.Uint32toBytes(buffer[4:], uint32(len(i.path)))
f.Write(buffer)
f.Write([]byte(i.path))
} else {
fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
}
}
}, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
if resolveErr != nil {
return nil
}
dChunks = append(dChunks, mChunks...)
for _, chunk := range dChunks {
outputChan <- &Item{
vid: chunk.Fid.VolumeId,
fileKey: chunk.Fid.FileKey,
cookie: chunk.Fid.Cookie,
path: util.NewFullPath(entry.Dir, entry.Entry.Name),
}
}
return nil
})
return nil
}
func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
for volumeId, vinfo := range volumeIdToVInfo {
checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose)
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
}
return nil
}
func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
for volumeId, vinfo := range volumeIdToVInfo { for volumeId, vinfo := range volumeIdToVInfo {
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose) inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose)
if checkErr != nil { if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
} }
@ -108,7 +200,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u
totalOrphanChunkCount += uint64(len(orphanFileIds)) totalOrphanChunkCount += uint64(len(orphanFileIds))
totalOrphanDataSize += orphanDataSize totalOrphanDataSize += orphanDataSize
if *verbose { if verbose {
for _, fid := range orphanFileIds { for _, fid := range orphanFileIds {
fmt.Fprintf(writer, "%sxxxxxxxx\n", fid) fmt.Fprintf(writer, "%sxxxxxxxx\n", fid)
} }
@ -223,6 +315,59 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
}) })
} }
func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) {
db := needle_map.NewMemDb()
defer db.Close()
if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
return
}
file := getFilerFileIdFile(tempFolder, volumeId)
fp, err := os.Open(file)
if err != nil {
return
}
defer fp.Close()
type Item struct {
fileKey uint64
cookie uint32
path util.FullPath
}
br := bufio.NewReader(fp)
buffer := make([]byte, 16)
item := &Item{}
var readSize int
for {
readSize, err = br.Read(buffer)
if err != nil || readSize != 16 {
if err == io.EOF {
return nil
} else {
break
}
}
item.fileKey = util.BytesToUint64(buffer[:8])
item.cookie = util.BytesToUint32(buffer[8:12])
pathSize := util.BytesToUint32(buffer[12:16])
pathBytes := make([]byte, int(pathSize))
_, err = br.Read(pathBytes)
item.path = util.FullPath(string(pathBytes))
if _, found := db.Get(types.NeedleId(item.fileKey)); !found {
fmt.Fprintf(writer, "%d,%x%08x in %s not found\n", volumeId, item.fileKey, item.cookie, item.path)
}
}
return
}
func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
db := needle_map.NewMemDb() db := needle_map.NewMemDb()