2020-03-24 09:18:13 +00:00
package shell
import (
2021-06-25 06:56:24 +00:00
"bufio"
2022-09-10 22:29:17 +00:00
"bytes"
2020-03-24 09:18:13 +00:00
"context"
2022-09-10 22:29:17 +00:00
"errors"
2020-03-25 05:38:33 +00:00
"flag"
2020-03-24 09:18:13 +00:00
"fmt"
2022-07-29 07:17:28 +00:00
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
2022-09-10 22:29:17 +00:00
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
2022-07-29 07:17:28 +00:00
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
2022-03-31 16:36:10 +00:00
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
2020-03-24 09:18:13 +00:00
)
func init ( ) {
Commands = append ( Commands , & commandVolumeFsck { } )
}
type commandVolumeFsck struct {
2022-04-25 17:59:46 +00:00
env * CommandEnv
2022-04-25 18:10:01 +00:00
forcePurging * bool
2020-03-24 09:18:13 +00:00
}
func ( c * commandVolumeFsck ) Name ( ) string {
return "volume.fsck"
}
func ( c * commandVolumeFsck ) Help ( ) string {
return ` check all volumes to find entries not used by the filer
Important assumption ! ! !
the system is all used by one filer .
This command works this way :
1. collect all file ids from all volumes , as set A
2. collect all file ids from the filer , as set B
3. find out the set A subtract B
2021-06-25 06:56:24 +00:00
If - findMissingChunksInFiler is enabled , this works
in a reverse way :
1. collect all file ids from all volumes , as set A
2. collect all file ids from the filer , as set B
3. find out the set B subtract A
2020-03-24 09:18:13 +00:00
`
}
func ( c * commandVolumeFsck ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
2020-03-25 05:38:33 +00:00
fsckCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
verbose := fsckCommand . Bool ( "v" , false , "verbose mode" )
2021-06-28 06:32:57 +00:00
findMissingChunksInFiler := fsckCommand . Bool ( "findMissingChunksInFiler" , false , "see \"help volume.fsck\"" )
findMissingChunksInFilerPath := fsckCommand . String ( "findMissingChunksInFilerPath" , "/" , "used together with findMissingChunksInFiler" )
2022-03-31 14:10:06 +00:00
findMissingChunksInVolumeId := fsckCommand . Int ( "findMissingChunksInVolumeId" , 0 , "used together with findMissingChunksInFiler" )
2022-05-15 18:07:04 +00:00
applyPurging := fsckCommand . Bool ( "reallyDeleteFromVolume" , false , "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer. Currently this only works with default filerGroup." )
2022-04-25 18:11:56 +00:00
c . forcePurging = fsckCommand . Bool ( "forcePurging" , false , "delete missing data from volumes in one replica used together with applyPurging" )
2022-02-12 23:53:35 +00:00
purgeAbsent := fsckCommand . Bool ( "reallyDeleteFilerEntries" , false , "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler" )
2022-03-31 14:10:06 +00:00
tempPath := fsckCommand . String ( "tempPath" , path . Join ( os . TempDir ( ) ) , "path for temporary idx files" )
2022-09-10 22:29:17 +00:00
cutoffTimeAgo := fsckCommand . Duration ( "cutoffTimeAgo" , 5 * time . Minute , "only include entries on volume servers before this cutoff time to check orphan chunks" )
2022-03-31 14:10:06 +00:00
2020-03-25 05:38:33 +00:00
if err = fsckCommand . Parse ( args ) ; err != nil {
return nil
}
2021-12-10 21:24:38 +00:00
if err = commandEnv . confirmIsLocked ( args ) ; err != nil {
2021-09-14 05:13:34 +00:00
return
}
2020-03-24 09:18:13 +00:00
c . env = commandEnv
// create a temp folder
2022-03-31 14:10:06 +00:00
tempFolder , err := os . MkdirTemp ( * tempPath , "sw_fsck" )
2020-03-24 09:18:13 +00:00
if err != nil {
return fmt . Errorf ( "failed to create temp folder: %v" , err )
}
2020-03-25 05:38:33 +00:00
if * verbose {
fmt . Fprintf ( writer , "working directory: %s\n" , tempFolder )
}
defer os . RemoveAll ( tempFolder )
2020-03-24 09:18:13 +00:00
2020-03-25 07:56:47 +00:00
// collect all volume id locations
2022-03-31 14:10:06 +00:00
dataNodeVolumeIdToVInfo , err := c . collectVolumeIds ( commandEnv , * verbose , writer )
2020-03-25 07:56:47 +00:00
if err != nil {
return fmt . Errorf ( "failed to collect all volume locations: %v" , err )
}
2022-03-31 08:35:58 +00:00
isBucketsPath := false
var fillerBucketsPath string
2022-03-31 14:10:06 +00:00
if * findMissingChunksInFiler && * findMissingChunksInFilerPath != "/" {
2022-03-31 08:35:58 +00:00
fillerBucketsPath , err = readFilerBucketsPath ( commandEnv )
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
}
if strings . HasPrefix ( * findMissingChunksInFilerPath , fillerBucketsPath ) {
isBucketsPath = true
}
}
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
}
2022-03-31 16:36:10 +00:00
collectMtime := time . Now ( ) . Unix ( )
2020-03-24 09:18:13 +00:00
// collect each volume file ids
2022-03-31 14:10:06 +00:00
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
if * findMissingChunksInVolumeId > 0 && uint32 ( * findMissingChunksInVolumeId ) != volumeId {
delete ( volumeIdToVInfo , volumeId )
continue
}
if isBucketsPath && ! strings . HasPrefix ( * findMissingChunksInFilerPath , fillerBucketsPath + "/" + vinfo . collection ) {
delete ( volumeIdToVInfo , volumeId )
continue
}
2022-09-10 22:29:17 +00:00
cutoffFrom := time . Now ( ) . Add ( - * cutoffTimeAgo ) . UnixNano ( )
err = c . collectOneVolumeFileIds ( tempFolder , dataNodeId , volumeId , vinfo , * verbose , writer , uint64 ( cutoffFrom ) )
2022-03-31 14:10:06 +00:00
if err != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , err )
}
2020-03-24 09:18:13 +00:00
}
}
2021-06-25 06:56:24 +00:00
if * findMissingChunksInFiler {
// collect all filer file ids and paths
2022-03-31 16:36:10 +00:00
if err = c . collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo , tempFolder , writer , * findMissingChunksInFilerPath , * verbose , * purgeAbsent , collectMtime ) ; err != nil {
2021-06-25 06:56:24 +00:00
return fmt . Errorf ( "collectFilerFileIdAndPaths: %v" , err )
}
2022-03-31 14:10:06 +00:00
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
// for each volume, check filer file ids
if err = c . findFilerChunksMissingInVolumeServers ( volumeIdToVInfo , tempFolder , dataNodeId , writer , * verbose , * applyPurging ) ; err != nil {
return fmt . Errorf ( "findFilerChunksMissingInVolumeServers: %v" , err )
}
2021-06-25 06:56:24 +00:00
}
} else {
// collect all filer file ids
2022-03-31 14:10:06 +00:00
if err = c . collectFilerFileIds ( dataNodeVolumeIdToVInfo , tempFolder , writer , * verbose ) ; err != nil {
2021-06-25 06:56:24 +00:00
return fmt . Errorf ( "failed to collect file ids from filer: %v" , err )
}
2022-01-21 05:38:34 +00:00
// volume file ids subtract filer file ids
2022-03-31 14:10:06 +00:00
if err = c . findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo , tempFolder , writer , * verbose , * applyPurging ) ; err != nil {
2021-06-25 06:56:24 +00:00
return fmt . Errorf ( "findExtraChunksInVolumeServers: %v" , err )
}
}
return nil
}
2022-03-31 16:36:10 +00:00
func ( c * commandVolumeFsck ) collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , filerPath string , verbose bool , purgeAbsent bool , collectMtime int64 ) error {
2021-06-25 06:56:24 +00:00
if verbose {
fmt . Fprintf ( writer , "checking each file from filer ...\n" )
2020-03-24 09:18:13 +00:00
}
2021-06-25 06:56:24 +00:00
files := make ( map [ uint32 ] * os . File )
2022-03-31 14:10:06 +00:00
for _ , volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
2022-04-01 05:17:09 +00:00
if _ , ok := files [ vid ] ; ok {
continue
}
2022-03-31 14:10:06 +00:00
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
}
2022-04-01 05:17:09 +00:00
files [ vid ] = dst
2021-06-25 06:56:24 +00:00
}
}
defer func ( ) {
for _ , f := range files {
f . Close ( )
}
} ( )
type Item struct {
vid uint32
fileKey uint64
cookie uint32
path util . FullPath
}
2022-01-21 20:08:58 +00:00
return doTraverseBfsAndSaving ( c . env , nil , filerPath , false , func ( entry * filer_pb . FullEntry , outputChan chan interface { } ) ( err error ) {
2021-06-28 06:32:57 +00:00
if verbose && entry . Entry . IsDirectory {
fmt . Fprintf ( writer , "checking directory %s\n" , util . NewFullPath ( entry . Dir , entry . Entry . Name ) )
}
2022-03-08 08:22:43 +00:00
dataChunks , manifestChunks , resolveErr := filer . ResolveChunkManifest ( filer . LookupFn ( c . env ) , entry . Entry . Chunks , 0 , math . MaxInt64 )
2021-06-25 06:56:24 +00:00
if resolveErr != nil {
return nil
}
2022-03-08 08:22:43 +00:00
dataChunks = append ( dataChunks , manifestChunks ... )
for _ , chunk := range dataChunks {
2022-03-31 16:36:10 +00:00
if chunk . Mtime > collectMtime {
continue
}
2021-06-25 06:56:24 +00:00
outputChan <- & Item {
vid : chunk . Fid . VolumeId ,
fileKey : chunk . Fid . FileKey ,
cookie : chunk . Fid . Cookie ,
path : util . NewFullPath ( entry . Dir , entry . Entry . Name ) ,
}
}
return nil
2022-01-21 20:08:58 +00:00
} , func ( outputChan chan interface { } ) {
buffer := make ( [ ] byte , 16 )
for item := range outputChan {
i := item . ( * Item )
if f , ok := files [ i . vid ] ; ok {
util . Uint64toBytes ( buffer , i . fileKey )
util . Uint32toBytes ( buffer [ 8 : ] , i . cookie )
util . Uint32toBytes ( buffer [ 12 : ] , uint32 ( len ( i . path ) ) )
f . Write ( buffer )
f . Write ( [ ] byte ( i . path ) )
// fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
} else {
fmt . Fprintf ( writer , "%d,%x%08x %s volume not found\n" , i . vid , i . fileKey , i . cookie , i . path )
2022-02-06 22:46:52 +00:00
if purgeAbsent {
fmt . Printf ( "deleting path %s after volume not found" , i . path )
2022-02-06 22:22:04 +00:00
c . httpDelete ( i . path , verbose )
}
2022-01-21 20:08:58 +00:00
}
}
2021-06-25 06:56:24 +00:00
} )
}
2022-03-31 14:10:06 +00:00
func ( c * commandVolumeFsck ) findFilerChunksMissingInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , tempFolder string , dataNodeId string , writer io . Writer , verbose bool , applyPurging bool ) error {
2021-06-25 00:22:53 +00:00
2021-06-25 06:56:24 +00:00
for volumeId , vinfo := range volumeIdToVInfo {
2022-03-31 14:10:06 +00:00
checkErr := c . oneVolumeFileIdsCheckOneVolume ( tempFolder , dataNodeId , volumeId , writer , verbose , applyPurging )
2021-06-25 06:56:24 +00:00
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
}
return nil
2021-06-25 00:22:53 +00:00
}
2022-03-31 14:10:06 +00:00
func ( c * commandVolumeFsck ) findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool , applyPurging bool ) error {
2022-01-21 05:38:34 +00:00
2020-03-25 05:38:33 +00:00
var totalInUseCount , totalOrphanChunkCount , totalOrphanDataSize uint64
2022-04-01 09:45:41 +00:00
volumeIdOrphanFileIds := make ( map [ uint32 ] map [ string ] bool )
isSeveralReplicas := make ( map [ uint32 ] bool )
isEcVolumeReplicas := make ( map [ uint32 ] bool )
isReadOnlyReplicas := make ( map [ uint32 ] bool )
serverReplicas := make ( map [ uint32 ] [ ] pb . ServerAddress )
2022-03-31 14:10:06 +00:00
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
inUseCount , orphanFileIds , orphanDataSize , checkErr := c . oneVolumeFileIdsSubtractFilerFileIds ( tempFolder , dataNodeId , volumeId , writer , verbose )
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
2020-10-22 06:48:07 +00:00
}
2022-04-01 09:45:41 +00:00
isSeveralReplicas [ volumeId ] = false
if _ , found := volumeIdOrphanFileIds [ volumeId ] ; ! found {
volumeIdOrphanFileIds [ volumeId ] = make ( map [ string ] bool )
} else {
isSeveralReplicas [ volumeId ] = true
}
for _ , fid := range orphanFileIds {
if isSeveralReplicas [ volumeId ] {
if _ , found := volumeIdOrphanFileIds [ volumeId ] [ fid ] ; ! found {
continue
}
}
volumeIdOrphanFileIds [ volumeId ] [ fid ] = isSeveralReplicas [ volumeId ]
}
2022-03-31 14:10:06 +00:00
totalInUseCount += inUseCount
totalOrphanChunkCount += uint64 ( len ( orphanFileIds ) )
totalOrphanDataSize += orphanDataSize
2020-10-22 06:48:07 +00:00
2022-02-06 22:22:04 +00:00
if verbose {
2022-03-31 14:10:06 +00:00
for _ , fid := range orphanFileIds {
fmt . Fprintf ( writer , "%s\n" , fid )
}
2020-03-25 09:41:22 +00:00
}
2022-04-01 09:45:41 +00:00
isEcVolumeReplicas [ volumeId ] = vinfo . isEcVolume
if isReadOnly , found := isReadOnlyReplicas [ volumeId ] ; ! ( found && isReadOnly ) {
isReadOnlyReplicas [ volumeId ] = vinfo . isReadOnly
}
serverReplicas [ volumeId ] = append ( serverReplicas [ volumeId ] , vinfo . server )
}
2022-01-21 05:38:34 +00:00
2022-04-01 09:45:41 +00:00
for volumeId , orphanReplicaFileIds := range volumeIdOrphanFileIds {
if ! ( applyPurging && len ( orphanReplicaFileIds ) > 0 ) {
continue
}
orphanFileIds := [ ] string { }
for fid , foundInAllReplicas := range orphanReplicaFileIds {
2022-04-25 18:10:01 +00:00
if ! isSeveralReplicas [ volumeId ] || * c . forcePurging || ( isSeveralReplicas [ volumeId ] && foundInAllReplicas ) {
2022-04-01 09:45:41 +00:00
orphanFileIds = append ( orphanFileIds , fid )
2021-07-12 18:22:00 +00:00
}
2022-04-01 09:45:41 +00:00
}
if ! ( len ( orphanFileIds ) > 0 ) {
continue
}
if verbose {
2022-04-25 17:59:46 +00:00
fmt . Fprintf ( writer , "purging process for volume %d.\n" , volumeId )
2022-04-01 09:45:41 +00:00
}
2022-01-21 05:38:34 +00:00
2022-04-01 09:45:41 +00:00
if isEcVolumeReplicas [ volumeId ] {
fmt . Fprintf ( writer , "skip purging for Erasure Coded volume %d.\n" , volumeId )
continue
}
for _ , server := range serverReplicas [ volumeId ] {
2022-03-31 14:10:06 +00:00
needleVID := needle . VolumeId ( volumeId )
2020-03-24 09:18:13 +00:00
2022-04-01 09:45:41 +00:00
if isReadOnlyReplicas [ volumeId ] {
err := markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , server , true )
2022-03-31 14:10:06 +00:00
if err != nil {
return fmt . Errorf ( "mark volume %d read/write: %v" , volumeId , err )
}
2022-01-21 05:38:34 +00:00
2022-04-01 09:45:41 +00:00
fmt . Fprintf ( writer , "temporarily marked %d on server %v writable for forced purge\n" , volumeId , server )
defer markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , server , false )
2022-02-06 22:22:04 +00:00
2022-04-01 09:45:41 +00:00
fmt . Fprintf ( writer , "marked %d on server %v writable for forced purge\n" , volumeId , server )
}
2022-03-31 14:10:06 +00:00
if verbose {
fmt . Fprintf ( writer , "purging files from volume %d\n" , volumeId )
}
if err := c . purgeFileIdsForOneVolume ( volumeId , orphanFileIds , writer ) ; err != nil {
return fmt . Errorf ( "purging volume %d: %v" , volumeId , err )
}
2020-03-25 09:21:15 +00:00
}
}
2020-03-24 09:18:13 +00:00
}
2022-02-06 22:22:04 +00:00
if ! applyPurging {
2020-03-25 09:21:15 +00:00
pct := float64 ( totalOrphanChunkCount * 100 ) / ( float64 ( totalOrphanChunkCount + totalInUseCount ) )
fmt . Fprintf ( writer , "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
totalOrphanChunkCount + totalInUseCount , totalOrphanChunkCount , pct , totalOrphanDataSize )
2020-03-25 05:38:33 +00:00
2020-03-25 09:21:15 +00:00
fmt . Fprintf ( writer , "This could be normal if multiple filers or no filers are used.\n" )
2020-03-25 05:38:33 +00:00
}
2022-01-21 05:38:34 +00:00
if totalOrphanChunkCount == 0 {
fmt . Fprintf ( writer , "no orphan data\n" )
//return nil
}
2020-03-24 09:18:13 +00:00
return nil
}
2022-09-10 22:29:17 +00:00
func ( c * commandVolumeFsck ) collectOneVolumeFileIds ( tempFolder string , dataNodeId string , volumeId uint32 , vinfo VInfo , verbose bool , writer io . Writer , cutoffFrom uint64 ) error {
2020-03-25 07:56:47 +00:00
if verbose {
fmt . Fprintf ( writer , "collecting volume %d file ids from %s ...\n" , volumeId , vinfo . server )
}
2020-03-24 09:18:13 +00:00
2021-12-26 08:15:03 +00:00
return operation . WithVolumeServerClient ( false , vinfo . server , c . env . option . GrpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
2020-03-24 09:18:13 +00:00
2020-04-02 05:10:09 +00:00
ext := ".idx"
if vinfo . isEcVolume {
ext = ".ecx"
}
2020-03-24 09:18:13 +00:00
copyFileClient , err := volumeServerClient . CopyFile ( context . Background ( ) , & volume_server_pb . CopyFileRequest {
VolumeId : volumeId ,
2020-04-02 05:10:09 +00:00
Ext : ext ,
2020-03-24 09:18:13 +00:00
CompactionRevision : math . MaxUint32 ,
StopOffset : math . MaxInt64 ,
Collection : vinfo . collection ,
IsEcVolume : vinfo . isEcVolume ,
IgnoreSourceFileNotFound : false ,
} )
if err != nil {
2020-11-27 11:17:10 +00:00
return fmt . Errorf ( "failed to start copying volume %d%s: %v" , volumeId , ext , err )
2020-03-24 09:18:13 +00:00
}
2022-09-10 22:29:17 +00:00
var buf bytes . Buffer
for {
resp , err := copyFileClient . Recv ( )
if errors . Is ( err , io . EOF ) {
break
}
if err != nil {
return err
}
buf . Write ( resp . FileContent )
}
if vinfo . isReadOnly == false {
index , err := idx . FirstInvalidIndex ( buf . Bytes ( ) , func ( key types . NeedleId , offset types . Offset , size types . Size ) ( bool , error ) {
resp , err := volumeServerClient . ReadNeedleMeta ( context . Background ( ) , & volume_server_pb . ReadNeedleMetaRequest {
VolumeId : volumeId ,
NeedleId : uint64 ( key ) ,
Offset : offset . ToActualOffset ( ) ,
Size : int32 ( size ) ,
} )
if err != nil {
return false , fmt . Errorf ( "to read needle meta with id %d from volume %d with error %v" , key , volumeId , err )
}
return resp . LastModified <= cutoffFrom , nil
} )
if err != nil {
fmt . Fprintf ( writer , "Failed to search for last vilad index on volume %d with error %v" , volumeId , err )
}
buf . Truncate ( index * types . NeedleMapEntrySize )
}
idxFilename := getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId )
err = writeToFile ( buf . Bytes ( ) , idxFilename )
2020-03-24 09:18:13 +00:00
if err != nil {
2020-11-27 11:17:10 +00:00
return fmt . Errorf ( "failed to copy %d%s from %s: %v" , volumeId , ext , vinfo . server , err )
2020-03-24 09:18:13 +00:00
}
return nil
} )
}
2022-03-31 14:10:06 +00:00
func ( c * commandVolumeFsck ) collectFilerFileIds ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool ) error {
2020-03-25 07:56:47 +00:00
if verbose {
fmt . Fprintf ( writer , "collecting file ids from filer ...\n" )
}
2020-03-24 09:18:13 +00:00
files := make ( map [ uint32 ] * os . File )
2022-03-31 14:10:06 +00:00
for _ , volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
}
files [ vid ] = dst
2020-03-24 09:18:13 +00:00
}
}
defer func ( ) {
for _ , f := range files {
f . Close ( )
}
} ( )
type Item struct {
vid uint32
fileKey uint64
}
2022-01-21 20:08:58 +00:00
return doTraverseBfsAndSaving ( c . env , nil , "/" , false , func ( entry * filer_pb . FullEntry , outputChan chan interface { } ) ( err error ) {
2022-03-08 08:22:43 +00:00
dataChunks , manifestChunks , resolveErr := filer . ResolveChunkManifest ( filer . LookupFn ( c . env ) , entry . Entry . Chunks , 0 , math . MaxInt64 )
2020-07-21 05:02:05 +00:00
if resolveErr != nil {
2021-08-13 09:57:14 +00:00
if verbose {
fmt . Fprintf ( writer , "resolving manifest chunks in %s: %v\n" , util . NewFullPath ( entry . Dir , entry . Entry . Name ) , resolveErr )
}
2020-07-21 05:02:05 +00:00
return nil
}
2022-03-08 08:22:43 +00:00
dataChunks = append ( dataChunks , manifestChunks ... )
for _ , chunk := range dataChunks {
2020-03-24 09:18:13 +00:00
outputChan <- & Item {
vid : chunk . Fid . VolumeId ,
fileKey : chunk . Fid . FileKey ,
}
}
return nil
2022-01-21 20:08:58 +00:00
} , func ( outputChan chan interface { } ) {
buffer := make ( [ ] byte , 8 )
for item := range outputChan {
i := item . ( * Item )
util . Uint64toBytes ( buffer , i . fileKey )
files [ i . vid ] . Write ( buffer )
}
2020-03-24 09:18:13 +00:00
} )
}
2022-03-31 14:10:06 +00:00
func ( c * commandVolumeFsck ) oneVolumeFileIdsCheckOneVolume ( tempFolder string , dataNodeId string , volumeId uint32 , writer io . Writer , verbose bool , applyPurging bool ) ( err error ) {
2021-06-25 06:56:24 +00:00
2021-06-28 06:32:57 +00:00
if verbose {
2022-03-31 16:36:10 +00:00
fmt . Fprintf ( writer , "find missing file chunks in dataNodeId %s volume %d ...\n" , dataNodeId , volumeId )
2021-06-28 06:32:57 +00:00
}
2021-06-25 06:56:24 +00:00
db := needle_map . NewMemDb ( )
defer db . Close ( )
2022-03-31 14:10:06 +00:00
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) ) ; err != nil {
2021-06-25 06:56:24 +00:00
return
}
file := getFilerFileIdFile ( tempFolder , volumeId )
fp , err := os . Open ( file )
if err != nil {
return
}
defer fp . Close ( )
type Item struct {
fileKey uint64
cookie uint32
path util . FullPath
}
br := bufio . NewReader ( fp )
buffer := make ( [ ] byte , 16 )
item := & Item { }
var readSize int
for {
2021-06-28 06:32:57 +00:00
readSize , err = io . ReadFull ( br , buffer )
2021-06-25 06:56:24 +00:00
if err != nil || readSize != 16 {
2022-01-21 05:38:34 +00:00
break
2021-06-25 06:56:24 +00:00
}
item . fileKey = util . BytesToUint64 ( buffer [ : 8 ] )
item . cookie = util . BytesToUint32 ( buffer [ 8 : 12 ] )
pathSize := util . BytesToUint32 ( buffer [ 12 : 16 ] )
pathBytes := make ( [ ] byte , int ( pathSize ) )
2021-06-28 06:32:57 +00:00
n , err := io . ReadFull ( br , pathBytes )
if err != nil {
fmt . Fprintf ( writer , "%d,%x%08x in unexpected error: %v\n" , volumeId , item . fileKey , item . cookie , err )
}
if n != int ( pathSize ) {
fmt . Fprintf ( writer , "%d,%x%08x %d unexpected file name size %d\n" , volumeId , item . fileKey , item . cookie , pathSize , n )
}
2021-06-25 06:56:24 +00:00
item . path = util . FullPath ( string ( pathBytes ) )
2022-01-21 05:38:34 +00:00
needleId := types . NeedleId ( item . fileKey )
if _ , found := db . Get ( needleId ) ; ! found {
fmt . Fprintf ( writer , "%s\n" , item . path )
2022-02-06 22:22:04 +00:00
if applyPurging {
2022-01-21 05:38:34 +00:00
// defining the URL this way automatically escapes complex path names
2022-02-06 22:22:04 +00:00
c . httpDelete ( item . path , verbose )
}
2021-06-25 06:56:24 +00:00
}
2022-02-06 22:22:04 +00:00
}
return nil
}
2021-06-25 06:56:24 +00:00
2022-02-06 22:22:04 +00:00
func ( c * commandVolumeFsck ) httpDelete ( path util . FullPath , verbose bool ) {
req , err := http . NewRequest ( http . MethodDelete , "" , nil )
2021-06-25 06:56:24 +00:00
2022-02-06 22:22:04 +00:00
req . URL = & url . URL {
Scheme : "http" ,
Host : c . env . option . FilerAddress . ToHttpAddress ( ) ,
Path : string ( path ) ,
2021-06-25 06:56:24 +00:00
}
2022-02-06 22:22:04 +00:00
if verbose {
fmt . Printf ( "full HTTP delete request to be sent: %v\n" , req )
}
if err != nil {
fmt . Errorf ( "HTTP delete request error: %v\n" , err )
}
2022-01-21 05:38:34 +00:00
2022-02-06 22:22:04 +00:00
client := & http . Client { }
2021-06-25 06:56:24 +00:00
2022-02-06 22:22:04 +00:00
resp , err := client . Do ( req )
if err != nil {
fmt . Errorf ( "DELETE fetch error: %v\n" , err )
}
defer resp . Body . Close ( )
2021-06-25 06:56:24 +00:00
2022-02-06 22:22:04 +00:00
_ , err = ioutil . ReadAll ( resp . Body )
if err != nil {
fmt . Errorf ( "DELETE response error: %v\n" , err )
}
2021-06-25 06:56:24 +00:00
2022-02-06 22:22:04 +00:00
if verbose {
fmt . Println ( "delete response Status : " , resp . Status )
fmt . Println ( "delete response Headers : " , resp . Header )
2022-01-21 05:38:34 +00:00
}
2021-06-25 06:56:24 +00:00
}
2022-03-31 14:10:06 +00:00
func ( c * commandVolumeFsck ) oneVolumeFileIdsSubtractFilerFileIds ( tempFolder string , dataNodeId string , volumeId uint32 , writer io . Writer , verbose bool ) ( inUseCount uint64 , orphanFileIds [ ] string , orphanDataSize uint64 , err error ) {
2020-03-24 09:18:13 +00:00
db := needle_map . NewMemDb ( )
defer db . Close ( )
2022-03-31 14:10:06 +00:00
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) ) ; err != nil {
2020-03-24 09:18:13 +00:00
return
}
2021-10-14 04:27:58 +00:00
filerFileIdsData , err := os . ReadFile ( getFilerFileIdFile ( tempFolder , volumeId ) )
2020-03-24 09:18:13 +00:00
if err != nil {
return
}
dataLen := len ( filerFileIdsData )
if dataLen % 8 != 0 {
2020-03-25 09:21:15 +00:00
return 0 , nil , 0 , fmt . Errorf ( "filer data is corrupted" )
2020-03-24 09:18:13 +00:00
}
for i := 0 ; i < len ( filerFileIdsData ) ; i += 8 {
fileKey := util . BytesToUint64 ( filerFileIdsData [ i : i + 8 ] )
db . Delete ( types . NeedleId ( fileKey ) )
2020-03-25 05:38:33 +00:00
inUseCount ++
2020-03-24 09:18:13 +00:00
}
2020-03-25 09:21:15 +00:00
var orphanFileCount uint64
2020-03-24 09:18:13 +00:00
db . AscendingVisit ( func ( n needle_map . NeedleValue ) error {
2020-03-24 09:34:28 +00:00
// fmt.Printf("%d,%x\n", volumeId, n.Key)
2021-07-11 06:16:06 +00:00
orphanFileIds = append ( orphanFileIds , fmt . Sprintf ( "%d,%s00000000" , volumeId , n . Key . String ( ) ) )
2020-03-25 09:21:15 +00:00
orphanFileCount ++
2020-03-24 09:18:13 +00:00
orphanDataSize += uint64 ( n . Size )
return nil
} )
2020-03-25 09:21:15 +00:00
if orphanFileCount > 0 {
pct := float64 ( orphanFileCount * 100 ) / ( float64 ( orphanFileCount + inUseCount ) )
2022-03-31 14:10:06 +00:00
fmt . Fprintf ( writer , "dataNode:%s\tvolume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
dataNodeId , volumeId , orphanFileCount + inUseCount , orphanFileCount , pct , orphanDataSize )
2020-03-24 09:34:28 +00:00
}
2020-03-24 09:18:13 +00:00
return
}
type VInfo struct {
2021-09-13 05:47:52 +00:00
server pb . ServerAddress
2020-03-24 09:18:13 +00:00
collection string
isEcVolume bool
2022-01-06 17:52:28 +00:00
isReadOnly bool
2020-03-24 09:18:13 +00:00
}
2022-03-31 14:10:06 +00:00
func ( c * commandVolumeFsck ) collectVolumeIds ( commandEnv * CommandEnv , verbose bool , writer io . Writer ) ( volumeIdToServer map [ string ] map [ uint32 ] VInfo , err error ) {
2020-03-25 07:56:47 +00:00
if verbose {
fmt . Fprintf ( writer , "collecting volume id and locations from master ...\n" )
}
2020-03-24 09:18:13 +00:00
2022-03-31 14:10:06 +00:00
volumeIdToServer = make ( map [ string ] map [ uint32 ] VInfo )
2021-02-22 08:28:42 +00:00
// collect topology information
2022-02-08 08:53:55 +00:00
topologyInfo , _ , err := collectTopologyInfo ( commandEnv , 0 )
2020-03-24 09:18:13 +00:00
if err != nil {
return
}
2021-02-22 08:28:42 +00:00
eachDataNode ( topologyInfo , func ( dc string , rack RackId , t * master_pb . DataNodeInfo ) {
2021-02-18 04:57:08 +00:00
for _ , diskInfo := range t . DiskInfos {
2022-03-31 14:10:06 +00:00
dataNodeId := t . GetId ( )
volumeIdToServer [ dataNodeId ] = make ( map [ uint32 ] VInfo )
2021-02-16 10:47:02 +00:00
for _ , vi := range diskInfo . VolumeInfos {
2022-03-31 14:10:06 +00:00
volumeIdToServer [ dataNodeId ] [ vi . Id ] = VInfo {
2021-09-13 05:47:52 +00:00
server : pb . NewServerAddressFromDataNode ( t ) ,
2021-02-16 10:47:02 +00:00
collection : vi . Collection ,
isEcVolume : false ,
2022-01-06 17:52:28 +00:00
isReadOnly : vi . ReadOnly ,
2021-02-16 10:47:02 +00:00
}
2020-03-24 09:18:13 +00:00
}
2021-02-16 10:47:02 +00:00
for _ , ecShardInfo := range diskInfo . EcShardInfos {
2022-03-31 14:10:06 +00:00
volumeIdToServer [ dataNodeId ] [ ecShardInfo . Id ] = VInfo {
2021-09-13 05:47:52 +00:00
server : pb . NewServerAddressFromDataNode ( t ) ,
2021-02-16 10:47:02 +00:00
collection : ecShardInfo . Collection ,
isEcVolume : true ,
2022-01-06 17:52:28 +00:00
isReadOnly : true ,
2021-02-16 10:47:02 +00:00
}
2020-03-24 09:18:13 +00:00
}
}
} )
2020-03-25 07:56:47 +00:00
if verbose {
fmt . Fprintf ( writer , "collected %d volumes and locations.\n" , len ( volumeIdToServer ) )
}
2020-03-24 09:18:13 +00:00
return
}
2020-03-25 09:21:15 +00:00
func ( c * commandVolumeFsck ) purgeFileIdsForOneVolume ( volumeId uint32 , fileIds [ ] string , writer io . Writer ) ( err error ) {
fmt . Fprintf ( writer , "purging orphan data for volume %d...\n" , volumeId )
locations , found := c . env . MasterClient . GetLocations ( volumeId )
if ! found {
return fmt . Errorf ( "failed to find volume %d locations" , volumeId )
}
resultChan := make ( chan [ ] * volume_server_pb . DeleteResult , len ( locations ) )
var wg sync . WaitGroup
for _ , location := range locations {
wg . Add ( 1 )
2021-09-13 05:47:52 +00:00
go func ( server pb . ServerAddress , fidList [ ] string ) {
2020-03-25 09:21:15 +00:00
defer wg . Done ( )
if deleteResults , deleteErr := operation . DeleteFilesAtOneVolumeServer ( server , c . env . option . GrpcDialOption , fidList , false ) ; deleteErr != nil {
err = deleteErr
} else if deleteResults != nil {
resultChan <- deleteResults
}
2021-09-13 05:47:52 +00:00
} ( location . ServerAddress ( ) , fileIds )
2020-03-25 09:21:15 +00:00
}
wg . Wait ( )
close ( resultChan )
for results := range resultChan {
for _ , result := range results {
if result . Error != "" {
fmt . Fprintf ( writer , "purge error: %s\n" , result . Error )
}
}
}
return
}
2022-03-31 14:10:06 +00:00
func getVolumeFileIdFile ( tempFolder string , dataNodeid string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%s_%d.idx" , dataNodeid , vid ) )
2020-03-24 09:18:13 +00:00
}
func getFilerFileIdFile ( tempFolder string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%d.fid" , vid ) )
}
2022-09-10 22:29:17 +00:00
func writeToFile ( bytes [ ] byte , fileName string ) error {
2020-03-24 09:18:13 +00:00
flags := os . O_WRONLY | os . O_CREATE | os . O_TRUNC
dst , err := os . OpenFile ( fileName , flags , 0644 )
if err != nil {
return nil
}
defer dst . Close ( )
2022-09-10 22:29:17 +00:00
dst . Write ( bytes )
2020-03-24 09:18:13 +00:00
return nil
}