This commit is contained in:
chrislu 2022-09-10 15:36:02 -07:00
commit 4957d8eec6
3 changed files with 126 additions and 15 deletions

View file

@ -2,7 +2,9 @@ package shell
import (
"bufio"
"bytes"
"context"
"errors"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -11,6 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
@ -72,6 +75,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging")
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
if err = fsckCommand.Parse(args); err != nil {
return nil
@ -126,7 +130,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
delete(volumeIdToVInfo, volumeId)
continue
}
err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer)
cutoffFrom := time.Now().Add(-*cutoffTimeAgo).UnixNano()
err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer, uint64(cutoffFrom))
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
}
@ -351,7 +356,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
return nil
}
func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer, cutoffFrom uint64) error {
if verbose {
fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
@ -377,13 +382,42 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeI
return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
}
err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, dataNodeId, volumeId))
var buf bytes.Buffer
for {
resp, err := copyFileClient.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return err
}
buf.Write(resp.FileContent)
}
if vinfo.isReadOnly == false {
index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
VolumeId: volumeId,
NeedleId: uint64(key),
Offset: offset.ToActualOffset(),
Size: int32(size),
})
if err != nil {
return false, fmt.Errorf("to read needle meta with id %d from volume %d with error %v", key, volumeId, err)
}
return resp.LastModified <= cutoffFrom, nil
})
if err != nil {
fmt.Fprintf(writer, "Failed to search for last vilad index on volume %d with error %v", volumeId, err)
}
buf.Truncate(index * types.NeedleMapEntrySize)
}
idxFilename := getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)
err = writeToFile(buf.Bytes(), idxFilename)
if err != nil {
return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
}
return nil
})
}
@ -673,7 +707,7 @@ func getFilerFileIdFile(tempFolder string, vid uint32) string {
return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
}
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
func writeToFile(bytes []byte, fileName string) error {
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
@ -681,15 +715,6 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
}
defer dst.Close()
for {
resp, receiveErr := client.Recv()
if receiveErr == io.EOF {
break
}
if receiveErr != nil {
return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
}
dst.Write(resp.FileContent)
}
dst.Write(bytes)
return nil
}

View file

@ -0,0 +1,29 @@
package idx
import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
// firstInvalidIndex find the first index the failed lessThanOrEqualToFn function's requirement.
func FirstInvalidIndex(bytes []byte, lessThanOrEqualToFn func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error)) (int, error) {
left, right := 0, len(bytes)/types.NeedleMapEntrySize-1
index := right + 1
for left <= right {
mid := left + (right-left)>>1
loc := mid * types.NeedleMapEntrySize
key := types.BytesToNeedleId(bytes[loc : loc+types.NeedleIdSize])
offset := types.BytesToOffset(bytes[loc+types.NeedleIdSize : loc+types.NeedleIdSize+types.OffsetSize])
size := types.BytesToSize(bytes[loc+types.NeedleIdSize+types.OffsetSize : loc+types.NeedleIdSize+types.OffsetSize+types.SizeSize])
res, err := lessThanOrEqualToFn(key, offset, size)
if err != nil {
return -1, err
}
if res {
left = mid + 1
} else {
index = mid
right = mid - 1
}
}
return index, nil
}

View file

@ -0,0 +1,57 @@
package storage
import (
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/stretchr/testify/assert"
"os"
"testing"
)
func TestFirstInvalidIndex(t *testing.T) {
dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
type WriteInfo struct {
offset int64
size int32
}
// initialize 20 needles then update first 10 needles
for i := 1; i <= 30; i++ {
n := newRandomNeedle(uint64(i))
n.Flags = 0x08
_, _, _, err := v.writeNeedle2(n, true, false)
if err != nil {
t.Fatalf("write needle %d: %v", i, err)
}
}
b, err := os.ReadFile(v.IndexFileName() + ".idx")
// base case every record is valid -> nothing is filtered
index, err := idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
return true, nil
})
if err != nil {
t.Fatalf("failed to complete binary search %v", err)
}
assert.Equal(t, 30, index, "when every record is valid nothing should be filtered from binary search")
index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
return false, nil
})
assert.Equal(t, 0, index, "when every record is invalid everything should be filtered from binary search")
index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
return key < 20, nil
})
// needle key range from 1 to 30 so < 20 means 19 keys are valid and cutoff the bytes at 19 * 16 = 304
assert.Equal(t, 19, index, "when every record is invalid everything should be filtered from binary search")
index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
return key <= 1, nil
})
// needle key range from 1 to 30 so <=1 1 means 1 key is valid and cutoff the bytes at 1 * 16 = 16
assert.Equal(t, 1, index, "when every record is invalid everything should be filtered from binary search")
}