seaweedfs/weed/server/volume_grpc_copy.go
chrislu 9f9ef1340c use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
2021-12-26 00:15:03 -08:00

354 lines
11 KiB
Go

package weed_server
import (
"context"
"fmt"
"io"
"math"
"os"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
const BufferSizeLimit = 1024 * 1024 * 2
// VolumeCopy copy the .idx .dat .vif files, and mount the volume
func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v != nil {
glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
if err != nil {
return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
}
glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
}
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
// copy .dat and .idx files
// read .idx .dat file size and timestamp
// send .idx file
// send .dat file
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: req.VolumeId,
})
if nil != err {
return fmt.Errorf("read volume file status failed, %v", err)
}
diskType := volFileInfoResp.DiskType
if req.DiskType != "" {
diskType = req.DiskType
}
location := vs.store.FindFreeLocation(types.ToDiskType(diskType))
if location == nil {
return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString())
}
dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
os.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
defer func() {
if err != nil {
os.Remove(dataBaseFileName + ".dat")
os.Remove(indexBaseFileName + ".idx")
os.Remove(dataBaseFileName + ".vif")
os.Remove(dataBaseFileName + ".note")
}
}()
// println("source:", volFileInfoResp.String())
copyResponse := &volume_server_pb.VolumeCopyResponse{}
reportInterval := int64(1024 * 1024 * 128)
nextReportTarget := reportInterval
var modifiedTsNs int64
var sendErr error
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
if processed > nextReportTarget {
copyResponse.ProcessedBytes = processed
if sendErr = stream.Send(copyResponse); sendErr != nil {
return false
}
nextReportTarget = processed + reportInterval
}
return true
}); err != nil {
return err
}
if sendErr != nil {
return sendErr
}
if modifiedTsNs > 0 {
os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil {
return err
}
if modifiedTsNs > 0 {
os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil {
return err
}
if modifiedTsNs > 0 {
os.Chtimes(dataBaseFileName+".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
os.Remove(dataBaseFileName + ".note")
return nil
})
if err != nil {
return err
}
if dataBaseFileName == "" {
return fmt.Errorf("not found volume %d file", req.VolumeId)
}
idxFileName = indexBaseFileName + ".idx"
datFileName = dataBaseFileName + ".dat"
defer func() {
if err != nil && dataBaseFileName != "" {
os.Remove(idxFileName)
os.Remove(datFileName)
os.Remove(dataBaseFileName + ".vif")
}
}()
if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
return err
}
// mount the volume
err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
return fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
}
if err = stream.Send(&volume_server_pb.VolumeCopyResponse{
LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
}); err != nil {
glog.Errorf("send response: %v", err)
}
return err
}
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
Ext: ext,
CompactionRevision: compactRevision,
StopOffset: stopOffset,
Collection: collection,
IsEcVolume: isEcVolume,
IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn)
if err != nil {
return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
return modifiedTsNs, nil
}
/**
only check the the differ of the file size
todo: maybe should check the received count and deleted count of the volume
*/
func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
stat, err := os.Stat(idxFileName)
if err != nil {
return fmt.Errorf("stat idx file %s failed: %v", idxFileName, err)
}
if originFileInf.IdxFileSize != uint64(stat.Size()) {
return fmt.Errorf("idx file %s size [%v] is not same as origin file size [%v]",
idxFileName, stat.Size(), originFileInf.IdxFileSize)
}
stat, err = os.Stat(datFileName)
if err != nil {
return fmt.Errorf("get dat file info failed, %v", err)
}
if originFileInf.DatFileSize != uint64(stat.Size()) {
return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]",
stat.Size(), originFileInf.DatFileSize)
}
return nil
}
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
glog.V(4).Infof("writing to %s", fileName)
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if isAppend {
flags = os.O_WRONLY | os.O_CREATE
}
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
return modifiedTsNs, nil
}
defer dst.Close()
var progressedBytes int64
for {
resp, receiveErr := client.Recv()
if receiveErr == io.EOF {
break
}
if resp != nil && resp.ModifiedTsNs != 0 {
modifiedTsNs = resp.ModifiedTsNs
}
if receiveErr != nil {
return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr)
}
dst.Write(resp.FileContent)
progressedBytes += int64(len(resp.FileContent))
if progressFn != nil {
if !progressFn(progressedBytes) {
return modifiedTsNs, fmt.Errorf("interrupted copy operation")
}
}
wt.MaybeSlowdown(int64(len(resp.FileContent)))
}
return modifiedTsNs, nil
}
func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
}
resp.VolumeId = req.VolumeId
datSize, idxSize, modTime := v.FileStat()
resp.DatFileSize = datSize
resp.IdxFileSize = idxSize
resp.DatFileTimestampSeconds = uint64(modTime.Unix())
resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
resp.FileCount = v.FileCount()
resp.CompactionRevision = uint32(v.CompactionRevision)
resp.Collection = v.Collection
resp.DiskType = string(v.DiskType())
return resp, nil
}
// CopyFile client pulls the volume related file from the source server.
// if req.CompactionRevision != math.MaxUint32, it ensures the compact revision is as expected
// The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data.
func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
var fileName string
if !req.IsEcVolume {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
return fmt.Errorf("volume %d is compacted", req.VolumeId)
}
fileName = v.FileName(req.Ext)
} else {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
for _, location := range vs.store.Locations {
tName := util.Join(location.Directory, baseFileName)
if util.FileExists(tName) {
fileName = tName
}
tName = util.Join(location.IdxDirectory, baseFileName)
if util.FileExists(tName) {
fileName = tName
}
}
if fileName == "" {
if req.IgnoreSourceFileNotFound {
return nil
}
return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
}
}
bytesToRead := int64(req.StopOffset)
file, err := os.Open(fileName)
if err != nil {
if req.IgnoreSourceFileNotFound && err == os.ErrNotExist {
return nil
}
return err
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return err
}
fileModTsNs := fileInfo.ModTime().UnixNano()
buffer := make([]byte, BufferSizeLimit)
for bytesToRead > 0 {
bytesread, err := file.Read(buffer)
// println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
if err != nil {
if err != io.EOF {
return err
}
// println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
break
}
if int64(bytesread) > bytesToRead {
bytesread = int(bytesToRead)
}
err = stream.Send(&volume_server_pb.CopyFileResponse{
FileContent: buffer[:bytesread],
ModifiedTsNs: fileModTsNs,
})
if err != nil {
// println("sending", bytesread, "bytes err", err.Error())
return err
}
fileModTsNs = 0 // only send once
bytesToRead -= int64(bytesread)
}
return nil
}