mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
volume copy: stream out copying progress and avoid grpc request timeout
fix https://github.com/chrislusf/seaweedfs/issues/2386
This commit is contained in:
parent
3be3c17f59
commit
5435027ff0
|
@ -47,7 +47,7 @@ service VolumeServer {
|
|||
}
|
||||
|
||||
// copy the .idx .dat files, and mount this volume
|
||||
rpc VolumeCopy (VolumeCopyRequest) returns (VolumeCopyResponse) {
|
||||
rpc VolumeCopy (VolumeCopyRequest) returns (stream VolumeCopyResponse) {
|
||||
}
|
||||
rpc ReadVolumeFileStatus (ReadVolumeFileStatusRequest) returns (ReadVolumeFileStatusResponse) {
|
||||
}
|
||||
|
@ -252,6 +252,7 @@ message VolumeCopyRequest {
|
|||
}
|
||||
message VolumeCopyResponse {
|
||||
uint64 last_append_at_ns = 1;
|
||||
int64 processed_bytes = 2;
|
||||
}
|
||||
|
||||
message CopyFileRequest {
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -22,7 +22,7 @@ import (
|
|||
const BufferSizeLimit = 1024 * 1024 * 2
|
||||
|
||||
// VolumeCopy copy the .idx .dat .vif files, and mount the volume
|
||||
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
|
||||
func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v != nil {
|
||||
|
@ -31,7 +31,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
|||
|
||||
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
|
||||
return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
|
||||
}
|
||||
|
||||
glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
|
||||
|
@ -79,22 +79,38 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
|||
}()
|
||||
|
||||
// println("source:", volFileInfoResp.String())
|
||||
copyResponse := &volume_server_pb.VolumeCopyResponse{}
|
||||
reportInterval := int64(1024*1024*128)
|
||||
nextReportTarget := reportInterval
|
||||
var modifiedTsNs int64
|
||||
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
|
||||
var sendErr error
|
||||
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
|
||||
if processed > nextReportTarget {
|
||||
copyResponse.ProcessedBytes = processed
|
||||
if sendErr = stream.Send(copyResponse); sendErr != nil {
|
||||
return false
|
||||
}
|
||||
nextReportTarget = processed + reportInterval
|
||||
}
|
||||
return true
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if sendErr != nil {
|
||||
return sendErr
|
||||
}
|
||||
if modifiedTsNs > 0 {
|
||||
os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
|
||||
}
|
||||
|
||||
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
|
||||
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if modifiedTsNs > 0 {
|
||||
os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
|
||||
}
|
||||
|
||||
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
|
||||
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if modifiedTsNs > 0 {
|
||||
|
@ -107,10 +123,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
if dataBaseFileName == "" {
|
||||
return nil, fmt.Errorf("not found volume %d file", req.VolumeId)
|
||||
return fmt.Errorf("not found volume %d file", req.VolumeId)
|
||||
}
|
||||
|
||||
idxFileName = indexBaseFileName + ".idx"
|
||||
|
@ -125,21 +141,25 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
|||
}()
|
||||
|
||||
if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// mount the volume
|
||||
err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
|
||||
return fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
|
||||
}
|
||||
|
||||
return &volume_server_pb.VolumeCopyResponse{
|
||||
if err = stream.Send(&volume_server_pb.VolumeCopyResponse{
|
||||
LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
|
||||
}, err
|
||||
}); err != nil {
|
||||
glog.Errorf("send response: %v", err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) (modifiedTsNs int64, err error) {
|
||||
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
|
||||
|
||||
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: vid,
|
||||
|
@ -154,7 +174,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
|
|||
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
|
||||
}
|
||||
|
||||
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
|
||||
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn)
|
||||
if err != nil {
|
||||
return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
|
||||
}
|
||||
|
@ -188,7 +208,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
|
|||
return nil
|
||||
}
|
||||
|
||||
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) (modifiedTsNs int64, err error) {
|
||||
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
|
||||
glog.V(4).Infof("writing to %s", fileName)
|
||||
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
|
||||
if isAppend {
|
||||
|
@ -200,6 +220,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
|
|||
}
|
||||
defer dst.Close()
|
||||
|
||||
var progressedBytes int64
|
||||
for {
|
||||
resp, receiveErr := client.Recv()
|
||||
if receiveErr == io.EOF {
|
||||
|
@ -212,6 +233,12 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
|
|||
return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr)
|
||||
}
|
||||
dst.Write(resp.FileContent)
|
||||
progressedBytes += int64(len(resp.FileContent))
|
||||
if progressFn != nil {
|
||||
if !progressFn(progressedBytes) {
|
||||
return modifiedTsNs, fmt.Errorf("interrupted copy operation")
|
||||
}
|
||||
}
|
||||
wt.MaybeSlowdown(int64(len(resp.FileContent)))
|
||||
}
|
||||
return modifiedTsNs, nil
|
||||
|
|
|
@ -130,7 +130,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
|||
|
||||
// copy ec data slices
|
||||
for _, shardId := range req.ShardIds {
|
||||
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
|
||||
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
|||
if req.CopyEcxFile {
|
||||
|
||||
// copy ecx file
|
||||
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
|
||||
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -146,14 +146,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
|||
|
||||
if req.CopyEcjFile {
|
||||
// copy ecj file
|
||||
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
|
||||
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if req.CopyVifFile {
|
||||
// copy vif file
|
||||
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
|
||||
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,6 +53,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
|
|||
return fmt.Errorf("source and target volume servers are the same!")
|
||||
}
|
||||
|
||||
_, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, "")
|
||||
_, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "")
|
||||
return
|
||||
}
|
||||
|
|
|
@ -255,13 +255,27 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
|
|||
}
|
||||
|
||||
err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
||||
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
||||
VolumeId: replica.info.Id,
|
||||
SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
|
||||
})
|
||||
if replicateErr != nil {
|
||||
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
|
||||
}
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
if recvErr != nil {
|
||||
if recvErr == io.EOF {
|
||||
break
|
||||
} else {
|
||||
return recvErr
|
||||
}
|
||||
}
|
||||
if resp.ProcessedBytes > 0 {
|
||||
fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
|
|||
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
|
||||
|
||||
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
|
||||
lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
|
||||
lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
|
||||
if err != nil {
|
||||
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
|
|||
return nil
|
||||
}
|
||||
|
||||
func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) {
|
||||
func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) {
|
||||
|
||||
// check to see if the volume is already read-only and if its not then we need
|
||||
// to mark it as read-only and then before we return we need to undo what we
|
||||
|
@ -141,15 +141,31 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
|
|||
}
|
||||
|
||||
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
||||
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
SourceDataNode: string(sourceVolumeServer),
|
||||
DiskType: diskType,
|
||||
})
|
||||
if replicateErr == nil {
|
||||
lastAppendAtNs = resp.LastAppendAtNs
|
||||
if replicateErr != nil {
|
||||
return replicateErr
|
||||
}
|
||||
return replicateErr
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
if recvErr != nil {
|
||||
if recvErr == io.EOF {
|
||||
break
|
||||
} else {
|
||||
return recvErr
|
||||
}
|
||||
}
|
||||
if resp.LastAppendAtNs != 0 {
|
||||
lastAppendAtNs = resp.LastAppendAtNs
|
||||
} else {
|
||||
fmt.Fprintf(writer, "volume %d processed %d bytes\n", volumeId, resp.ProcessedBytes)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
|
|
Loading…
Reference in a new issue