seaweedfs/weed/server/volume_grpc_tier_upload.go

96 lines
2.8 KiB
Go
Raw Permalink Normal View History

2019-11-27 11:09:42 +00:00
package weed_server
import (
"fmt"
"os"
"time"
2019-11-27 11:09:42 +00:00
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
2019-11-27 11:09:42 +00:00
)
// VolumeTierMoveDatToRemote copy dat file to a remote tier
func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error {
2019-11-27 11:09:42 +00:00
// find existing volume
2019-11-27 11:09:42 +00:00
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("volume %d not found", req.VolumeId)
2019-11-27 11:09:42 +00:00
}
// verify the collection
2019-11-27 11:09:42 +00:00
if v.Collection != req.Collection {
return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
2019-11-27 11:09:42 +00:00
}
// locate the disk file
2019-11-27 11:09:42 +00:00
diskFile, ok := v.DataBackend.(*backend.DiskFile)
if !ok {
return nil // already copied to remove. fmt.Errorf("volume %d is not on local disk", req.VolumeId)
2019-11-27 11:09:42 +00:00
}
// check valid storage backend type
backendStorage, found := backend.BackendStorages[req.DestinationBackendName]
if !found {
var keys []string
for key := range backend.BackendStorages {
keys = append(keys, key)
}
return fmt.Errorf("destination %s not found, supported: %v", req.DestinationBackendName, keys)
}
// check whether the existing backend storage is the same as requested
// if same, skip
backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName)
2019-12-28 19:21:49 +00:00
for _, remoteFile := range v.GetVolumeInfo().GetFiles() {
if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId {
return fmt.Errorf("destination %s already exists", req.DestinationBackendName)
}
}
startTime := time.Now()
fn := func(progressed int64, percentage float32) error {
now := time.Now()
if now.Sub(startTime) < time.Second {
return nil
}
startTime = now
return stream.Send(&volume_server_pb.VolumeTierMoveDatToRemoteResponse{
Processed: progressed,
ProcessedPercentage: percentage,
})
}
// copy the data file
key, size, err := backendStorage.CopyFile(diskFile.File, fn)
if err != nil {
2019-12-09 03:44:16 +00:00
return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err)
}
2019-11-27 11:09:42 +00:00
// save the remote file to volume tier info
2019-12-28 19:21:49 +00:00
v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{
BackendType: backendType,
BackendId: backendId,
Key: key,
Offset: 0,
FileSize: uint64(size),
ModifiedTime: uint64(time.Now().Unix()),
2019-12-29 05:13:10 +00:00
Extension: ".dat",
})
2019-12-28 19:21:49 +00:00
if err := v.SaveVolumeInfo(); err != nil {
return fmt.Errorf("volume %d failed to save remote file info: %v", v.Id, err)
}
if err := v.LoadRemoteFile(); err != nil {
return fmt.Errorf("volume %d failed to load remote file: %v", v.Id, err)
}
if !req.KeepLocalDatFile {
os.Remove(v.FileName(".dat"))
}
2019-11-27 11:09:42 +00:00
return nil
2019-11-27 20:34:57 +00:00
}