mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
[volume.fsck] check needles status from volume server (#3926)
check needles status from volume server
This commit is contained in:
parent
bd459db5fb
commit
c0deaa4948
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||
|
@ -26,6 +27,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -35,8 +37,7 @@ func init() {
|
|||
}
|
||||
|
||||
const (
|
||||
readbufferSize = 16
|
||||
verifyProbeBlobSize = 16
|
||||
readbufferSize = 16
|
||||
)
|
||||
|
||||
type commandVolumeFsck struct {
|
||||
|
@ -88,7 +89,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
|
|||
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
|
||||
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
|
||||
cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
|
||||
c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "try get head needle blob from volume server")
|
||||
c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "check needles status from volume server")
|
||||
|
||||
if err = fsckCommand.Parse(args); err != nil {
|
||||
return nil
|
||||
|
@ -263,7 +264,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
|
|||
serverReplicas := make(map[uint32][]pb.ServerAddress)
|
||||
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
|
||||
for volumeId, vinfo := range volumeIdToVInfo {
|
||||
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, vinfo.collection)
|
||||
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo)
|
||||
if checkErr != nil {
|
||||
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
|
||||
}
|
||||
|
@ -533,36 +534,35 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, collection string) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
|
||||
func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
|
||||
|
||||
db := needle_map.NewMemDb()
|
||||
defer db.Close()
|
||||
volumeFileIdDb := needle_map.NewMemDb()
|
||||
defer volumeFileIdDb.Close()
|
||||
|
||||
if err = db.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil {
|
||||
if err = volumeFileIdDb.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil {
|
||||
err = fmt.Errorf("failed to LoadFromIdx %+v", err)
|
||||
return
|
||||
}
|
||||
|
||||
volumeAddr := pb.NewServerAddressWithGrpcPort(dataNodeId, 0)
|
||||
if err = c.readFilerFileIdFile(volumeId, func(nId types.NeedleId, itemPath util.FullPath) {
|
||||
if err = c.readFilerFileIdFile(volumeId, func(filerNeedleId types.NeedleId, itemPath util.FullPath) {
|
||||
inUseCount++
|
||||
if *c.verifyNeedle {
|
||||
if v, ok := db.Get(nId); ok && v.Size.IsValid() {
|
||||
newSize := types.Size(verifyProbeBlobSize)
|
||||
if v.Size > newSize {
|
||||
v.Size = newSize
|
||||
}
|
||||
if _, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, volumeAddr, volumeId, *v); err != nil {
|
||||
fmt.Fprintf(c.writer, "failed to read file %s NeedleBlob %+v: %+v", itemPath, nId, err)
|
||||
if *c.forcePurging {
|
||||
return
|
||||
if needleValue, ok := volumeFileIdDb.Get(filerNeedleId); ok && !needleValue.Size.IsDeleted() {
|
||||
if _, err := readNeedleStatus(c.env.option.GrpcDialOption, vinfo.server, volumeId, *needleValue); err != nil {
|
||||
// files may be deleted during copying filesIds
|
||||
if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
|
||||
fmt.Fprintf(c.writer, "failed to read %d:%s needle status of file %s: %+v\n",
|
||||
volumeId, filerNeedleId.String(), itemPath, err)
|
||||
if *c.forcePurging {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = db.Delete(nId); err != nil && *c.verbose {
|
||||
fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, nId, err)
|
||||
if err = volumeFileIdDb.Delete(filerNeedleId); err != nil && *c.verbose {
|
||||
fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, filerNeedleId, err)
|
||||
}
|
||||
}); err != nil {
|
||||
err = fmt.Errorf("failed to readFilerFileIdFile %+v", err)
|
||||
|
@ -570,8 +570,8 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
|
|||
}
|
||||
|
||||
var orphanFileCount uint64
|
||||
if err = db.AscendingVisit(func(n needle_map.NeedleValue) error {
|
||||
if !n.Size.IsValid() {
|
||||
if err = volumeFileIdDb.AscendingVisit(func(n needle_map.NeedleValue) error {
|
||||
if n.Size.IsDeleted() {
|
||||
return nil
|
||||
}
|
||||
orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
|
||||
|
|
|
@ -169,3 +169,18 @@ func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddres
|
|||
)
|
||||
return
|
||||
}
|
||||
|
||||
func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) {
|
||||
err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
if resp, err = client.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
|
||||
VolumeId: volumeId,
|
||||
NeedleId: uint64(needleValue.Key),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
)
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue