feat(weed.move): add a speed limit parameter of moving files (#3478)

* feat(weed.move): add a speed limit parameter of moving files

* fix(weed.move): set the default value of ioBytePerSecond to vs.compactionBytePerSecond

Co-authored-by: zhihao.qu <zhihao.qu@ly.com>
This commit is contained in:
qzh 2022-08-22 14:08:31 +08:00 committed by GitHub
parent c4e862e908
commit 74b53729e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 795 additions and 768 deletions

View file

@ -254,6 +254,7 @@ message VolumeCopyRequest {
string ttl = 4;
string source_data_node = 5;
string disk_type = 6;
int64 io_byte_per_second = 7;
}
message VolumeCopyResponse {
uint64 last_append_at_ns = 1;

File diff suppressed because it is too large Load diff

View file

@ -108,7 +108,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
nextReportTarget := reportInterval
var modifiedTsNs int64
var sendErr error
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
var ioBytePerSecond int64
if req.IoBytePerSecond <= 0 {
ioBytePerSecond = vs.compactionBytePerSecond
} else {
ioBytePerSecond = req.IoBytePerSecond
}
throttler := util.NewWriteThrottler(ioBytePerSecond)
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
if processed > nextReportTarget {
copyResponse.ProcessedBytes = processed
if sendErr = stream.Send(copyResponse); sendErr != nil {
@ -117,7 +124,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
nextReportTarget = processed + reportInterval
}
return true
}); err != nil {
}, throttler); err != nil {
return err
}
if sendErr != nil {
@ -127,14 +134,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil {
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil {
return err
}
if modifiedTsNs > 0 {
os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil {
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil, throttler); err != nil {
return err
}
if modifiedTsNs > 0 {
@ -184,6 +191,10 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond))
}
func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@ -198,7 +209,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn)
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, throttler, isAppend, progressFn)
if err != nil {
return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
@ -207,7 +218,8 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
}
/**
/*
*
only check the the differ of the file size
todo: maybe should check the received count and deleted count of the volume
*/

View file

@ -330,7 +330,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange {
return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, false)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, 0, false)
}
return nil
}

View file

@ -53,6 +53,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
_, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "")
_, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "", 0)
return
}

View file

@ -55,6 +55,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>")
targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>")
diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
ioBytePerSecond := volMoveCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
if err = volMoveCommand.Parse(args); err != nil {
return nil
}
@ -71,14 +72,14 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, false)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, *ioBytePerSecond, false)
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, ioBytePerSecond int64, skipTailError bool) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType, ioBytePerSecond)
if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
@ -101,7 +102,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
return nil
}
func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) {
func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string, ioBytePerSecond int64) (lastAppendAtNs uint64, err error) {
// check to see if the volume is already read-only and if its not then we need
// to mark it as read-only and then before we return we need to undo what we
@ -142,9 +143,10 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceVolumeServer),
DiskType: diskType,
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceVolumeServer),
DiskType: diskType,
IoBytePerSecond: ioBytePerSecond,
})
if replicateErr != nil {
return replicateErr

View file

@ -58,6 +58,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
target := tierCommand.String("toDiskType", "", "the target disk type")
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
applyChange := tierCommand.Bool("force", false, "actually apply the changes")
ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
if err = tierCommand.Parse(args); err != nil {
return nil
}
@ -118,7 +119,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
unlock := c.Lock(job.src)
if applyChanges {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst); err != nil {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
}
}
@ -219,13 +220,13 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil
}
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location) (err error) {
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64) (err error) {
// mark all replicas as read only
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil {
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {