[volume.fsck] hotfix apply purging and add option verifyNeedle #3860 (#3861)

* fix apply purging and add verifyNeedle

* common readSourceNeedleBlob

* use consts
This commit is contained in:
Konstantin Lebedev 2022-10-16 08:38:46 +05:00 committed by GitHub
parent f476cf3403
commit 7836f7574e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 9 deletions

View file

@ -10,6 +10,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"google.golang.org/grpc"
"io" "io"
"math" "math"
) )
@ -166,7 +167,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
for _, needleValue := range missingNeedles { for _, needleValue := range missingNeedles {
needleBlob, err := c.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) needleBlob, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue)
if err != nil { if err != nil {
return hasChanges, err return hasChanges, err
} }
@ -190,9 +191,9 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
return return
} }
func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
err = operation.WithVolumeServerClient(false, sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
VolumeId: volumeId, VolumeId: volumeId,
Offset: needleValue.Offset.ToActualOffset(), Offset: needleValue.Offset.ToActualOffset(),

View file

@ -34,6 +34,11 @@ func init() {
Commands = append(Commands, &commandVolumeFsck{}) Commands = append(Commands, &commandVolumeFsck{})
} }
const (
readbufferSize = 16
verifyProbeBlobSize = 16
)
type commandVolumeFsck struct { type commandVolumeFsck struct {
env *CommandEnv env *CommandEnv
writer io.Writer writer io.Writer
@ -44,6 +49,7 @@ type commandVolumeFsck struct {
verbose *bool verbose *bool
forcePurging *bool forcePurging *bool
findMissingChunksInFiler *bool findMissingChunksInFiler *bool
verifyNeedle *bool
} }
func (c *commandVolumeFsck) Name() string { func (c *commandVolumeFsck) Name() string {
@ -82,6 +88,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks") cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "try get head needle blob from volume server")
if err = fsckCommand.Parse(args); err != nil { if err = fsckCommand.Parse(args); err != nil {
return nil return nil
@ -219,7 +226,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
return nil return nil
}, },
func(outputChan chan interface{}) { func(outputChan chan interface{}) {
buffer := make([]byte, 16) buffer := make([]byte, readbufferSize)
for item := range outputChan { for item := range outputChan {
i := item.(*Item) i := item.(*Item)
if f, ok := files[i.vid]; ok { if f, ok := files[i.vid]; ok {
@ -285,7 +292,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
if *c.verbose { if *c.verbose {
for _, fid := range orphanFileIds { for _, fid := range orphanFileIds {
fmt.Fprintf(c.writer, "%s\n", fid) fmt.Fprintf(c.writer, "%s:%s\n", vinfo.collection, fid)
} }
} }
isEcVolumeReplicas[volumeId] = vinfo.isEcVolume isEcVolumeReplicas[volumeId] = vinfo.isEcVolume
@ -440,7 +447,7 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI
defer fp.Close() defer fp.Close()
br := bufio.NewReader(fp) br := bufio.NewReader(fp)
buffer := make([]byte, 16) buffer := make([]byte, readbufferSize)
var readSize int var readSize int
var readErr error var readErr error
item := &Item{vid: volumeId} item := &Item{vid: volumeId}
@ -452,7 +459,7 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI
if readErr != nil { if readErr != nil {
return readErr return readErr
} }
if readSize != 16 { if readSize != readbufferSize {
return fmt.Errorf("readSize mismatch") return fmt.Errorf("readSize mismatch")
} }
item.fileKey = util.BytesToUint64(buffer[:8]) item.fileKey = util.BytesToUint64(buffer[:8])
@ -541,11 +548,27 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
return return
} }
voluemAddr := pb.NewServerAddressWithGrpcPort(dataNodeId, 0)
if err = c.readFilerFileIdFile(volumeId, func(nId types.NeedleId, itemPath util.FullPath) { if err = c.readFilerFileIdFile(volumeId, func(nId types.NeedleId, itemPath util.FullPath) {
inUseCount++
if *c.verifyNeedle {
if v, ok := db.Get(nId); ok && v.Size.IsValid() {
newSize := types.Size(verifyProbeBlobSize)
if v.Size > newSize {
v.Size = newSize
}
if _, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, voluemAddr, volumeId, *v); err != nil {
fmt.Fprintf(c.writer, "failed to read file %s NeedleBlob %+v: %+v", itemPath, nId, err)
if *c.forcePurging {
return
}
}
}
}
if err = db.Delete(nId); err != nil && *c.verbose { if err = db.Delete(nId); err != nil && *c.verbose {
fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, nId, err) fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, nId, err)
} }
inUseCount++
}); err != nil { }); err != nil {
err = fmt.Errorf("failed to readFilerFileIdFile %+v", err) err = fmt.Errorf("failed to readFilerFileIdFile %+v", err)
return return
@ -553,7 +576,10 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
var orphanFileCount uint64 var orphanFileCount uint64
if err = db.AscendingVisit(func(n needle_map.NeedleValue) error { if err = db.AscendingVisit(func(n needle_map.NeedleValue) error {
orphanFileIds = append(orphanFileIds, fmt.Sprintf("%s:%d,%s00000000", collection, volumeId, n.Key.String())) if !n.Size.IsValid() {
return nil
}
orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String()))
orphanFileCount++ orphanFileCount++
orphanDataSize += uint64(n.Size) orphanDataSize += uint64(n.Size)
return nil return nil