diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go new file mode 100644 index 000000000..11cc64d78 --- /dev/null +++ b/weed/shell/command_fs_verify.go @@ -0,0 +1,177 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/util" + "io" + "math" + "strings" + "time" +) + +func init() { + Commands = append(Commands, &commandFsVerify{}) +} + +type commandFsVerify struct { + env *CommandEnv + volumeIds map[uint32][]pb.ServerAddress + verbose *bool + modifyTimeAgoAtSec int64 + writer io.Writer +} + +func (c *commandFsVerify) Name() string { + return "fs.verify" +} + +func (c *commandFsVerify) Help() string { + return `recursively verify all files under a directory + + fs.verify [-v] [-modifyTimeAgo 1h] /buckets/dir + +` +} + +func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + c.env = commandEnv + c.writer = writer + + fsVerifyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files") + modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify") + + if err = fsVerifyCommand.Parse(args); err != nil { + return err + } + + path, parseErr := commandEnv.parseUrl(findInputDirectory(fsVerifyCommand.Args())) + if parseErr != nil { + return parseErr + } + + c.modifyTimeAgoAtSec = int64(modifyTimeAgo.Seconds()) + + if err := c.collectVolumeIds(); err != nil { + return parseErr + } + + fCount, eConut, terr := c.verifyTraverseBfs(path) + + if terr == nil { + fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut) + } + + return terr + +} + +func (c *commandFsVerify) collectVolumeIds() error { + c.volumeIds = make(map[uint32][]pb.ServerAddress) + topologyInfo, _, err := collectTopologyInfo(c.env, 0) + if err != nil { + return err + } + eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) { + for _, diskInfo := range nodeInfo.DiskInfos { + for _, vi := range diskInfo.VolumeInfos { + c.volumeIds[vi.Id] = append(c.volumeIds[vi.Id], pb.NewServerAddressFromDataNode(nodeInfo)) + } + } + }) + return nil +} + +func (c *commandFsVerify) verifyEntry(fileId *filer_pb.FileId, volumeServer *pb.ServerAddress) error { + err := operation.WithVolumeServerClient(false, *volumeServer, c.env.option.GrpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeNeedleStatus(context.Background(), + &volume_server_pb.VolumeNeedleStatusRequest{ + VolumeId: fileId.VolumeId, + NeedleId: fileId.FileKey}) + return err + }, + ) + if err != nil && !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) { + return err + } + if *c.verbose { + fmt.Fprintf(c.writer, ".") + } + return nil +} + +type ItemEntry struct { + chunks []*filer_pb.FileChunk + path util.FullPath +} + +func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) { + timeNowAtSec := time.Now().Unix() + return fileCount, errCount, doTraverseBfsAndSaving(c.env, nil, path, false, + func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + if c.modifyTimeAgoAtSec > 0 { + if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime { + return nil + } + } + dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64) + if resolveErr != nil { + return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr) + } + dataChunks = append(dataChunks, manifestChunks...) + if len(dataChunks) > 0 { + outputChan <- &ItemEntry{ + chunks: dataChunks, + path: util.NewFullPath(entry.Dir, entry.Entry.Name), + } + } + return nil + }, + func(outputChan chan interface{}) { + for itemEntry := range outputChan { + i := itemEntry.(*ItemEntry) + fileMsg := fmt.Sprintf("file:%s needle status ", i.path) + if *c.verbose { + fmt.Fprintf(c.writer, fileMsg) + fileMsg = "" + } + for _, chunk := range i.chunks { + if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok { + for _, volumeServer := range volumeIds { + if err = c.verifyEntry(chunk.Fid, &volumeServer); err != nil { + fmt.Fprintf(c.writer, "%sfailed verify %d:%d: %+v\n", + fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) + break + } + } + } else { + err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) + fmt.Fprintf(c.writer, "%sfailed verify chunk %d:%d: %+v\n", + fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) + break + } + } + + if err != nil { + errCount++ + continue + } + + if *c.verbose { + fmt.Fprintf(c.writer, " verifed\n") + } + fileCount++ + } + }) + +}