volume server: read all files in a volume

This commit is contained in:
Chris Lu 2021-09-27 01:45:32 -07:00
parent 4a1d4d7462
commit c4d7ee6c5c
3 changed files with 1269 additions and 1001 deletions

View file

@ -58,6 +58,8 @@ service VolumeServer {
} }
rpc WriteNeedleBlob (WriteNeedleBlobRequest) returns (WriteNeedleBlobResponse) { rpc WriteNeedleBlob (WriteNeedleBlobRequest) returns (WriteNeedleBlobResponse) {
} }
rpc ReadAllNeedles (ReadAllNeedlesRequest) returns (stream ReadAllNeedlesResponse) {
}
rpc VolumeTailSender (VolumeTailSenderRequest) returns (stream VolumeTailSenderResponse) { rpc VolumeTailSender (VolumeTailSenderRequest) returns (stream VolumeTailSenderResponse) {
} }
@ -284,6 +286,14 @@ message WriteNeedleBlobRequest {
message WriteNeedleBlobResponse { message WriteNeedleBlobResponse {
} }
message ReadAllNeedlesRequest {
uint32 volume_id = 1;
}
message ReadAllNeedlesResponse {
uint64 needle_id = 1;
bytes needle_blob = 2;
}
message VolumeTailSenderRequest { message VolumeTailSenderRequest {
uint32 volume_id = 1; uint32 volume_id = 1;
uint64 since_ns = 2; uint64 since_ns = 2;

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,50 @@
package weed_server
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
scanner := &VolumeFileScanner4ReadAll{
stream: stream,
}
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, super_block.SuperBlockSize, scanner)
return err
}
type VolumeFileScanner4ReadAll struct {
stream volume_server_pb.VolumeServer_ReadAllNeedlesServer
}
func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}
func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
sendErr := scanner.stream.Send(&volume_server_pb.ReadAllNeedlesResponse{
NeedleId: uint64(n.Id),
NeedleBlob: needleBody,
})
if sendErr != nil {
return sendErr
}
return nil
}