seaweedfs/weed/server/filer_grpc_server_remote.go

199 lines
6.2 KiB
Go
Raw Permalink Normal View History

2021-08-09 21:35:18 +00:00
package weed_server
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
2022-08-17 19:05:07 +00:00
"google.golang.org/protobuf/proto"
2021-08-09 21:35:18 +00:00
)
2021-10-31 02:27:25 +00:00
func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) {
2021-08-09 21:35:18 +00:00
// load all mappings
mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
if err != nil {
return nil, err
}
mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
if err != nil {
return nil, err
}
// find mapping
2021-08-26 22:18:34 +00:00
var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
2021-08-09 21:35:18 +00:00
var localMountedDir string
for k, loc := range mappings.Mappings {
if strings.HasPrefix(req.Directory, k) {
localMountedDir, remoteStorageMountedLocation = k, loc
}
}
if localMountedDir == "" {
return nil, fmt.Errorf("%s is not mounted", req.Directory)
}
// find storage configuration
storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
if err != nil {
return nil, err
}
2021-08-26 22:18:34 +00:00
storageConf := &remote_pb.RemoteConf{}
2021-08-09 21:35:18 +00:00
if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
}
// find the entry
entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
if err == filer_pb.ErrNotFound {
return nil, err
}
2021-10-31 02:27:25 +00:00
resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{}
2021-08-09 21:35:18 +00:00
if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
return resp, nil
}
// detect storage option
so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
2021-08-09 21:35:18 +00:00
if err != nil {
return resp, err
}
assignRequest, altRequest := so.ToAssignRequests(1)
// find a good chunk size
chunkSize := int64(5 * 1024 * 1024)
chunkCount := entry.Remote.RemoteSize/chunkSize + 1
2021-08-09 22:08:53 +00:00
for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
2021-08-09 21:35:18 +00:00
chunkSize *= 2
chunkCount = entry.Remote.RemoteSize/chunkSize + 1
}
dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
var chunks []*filer_pb.FileChunk
2021-08-14 22:41:13 +00:00
var fetchAndWriteErr error
var wg sync.WaitGroup
2021-08-09 21:35:18 +00:00
2021-08-14 22:41:13 +00:00
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
2021-08-09 21:35:18 +00:00
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
2021-08-14 22:41:13 +00:00
localOffset := offset
2021-08-09 21:35:18 +00:00
wg.Add(1)
2021-08-14 22:41:13 +00:00
limitedConcurrentExecutor.Execute(func() {
defer wg.Done()
2021-08-14 22:41:13 +00:00
size := chunkSize
if localOffset+chunkSize > entry.Remote.RemoteSize {
size = entry.Remote.RemoteSize - localOffset
}
2021-08-09 21:35:18 +00:00
2021-08-14 22:41:13 +00:00
// assign one volume server
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
fetchAndWriteErr = err
return
}
if assignResult.Error != "" {
fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
return
}
fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
if assignResult.Error != "" {
fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
return
2021-08-09 21:35:18 +00:00
}
2021-09-07 01:30:44 +00:00
var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
for _, r := range assignResult.Replicas {
replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
Url: r.Url,
PublicUrl: r.PublicUrl,
2021-09-14 17:37:06 +00:00
GrpcPort: int32(r.GrpcPort),
2021-09-07 01:30:44 +00:00
})
}
2021-08-14 22:41:13 +00:00
// tell filer to tell volume server to download into needles
assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
var etag string
err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
2021-09-01 09:45:42 +00:00
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
Offset: localOffset,
Size: size,
2021-09-07 01:30:44 +00:00
Replicas: replicas,
Auth: string(assignResult.Auth),
2021-08-26 22:18:34 +00:00
RemoteConf: storageConf,
RemoteLocation: &remote_pb.RemoteStorageLocation{
Name: remoteStorageMountedLocation.Name,
Bucket: remoteStorageMountedLocation.Bucket,
Path: string(dest),
},
2021-08-14 22:41:13 +00:00
})
if fetchAndWriteErr != nil {
return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
} else {
etag = resp.ETag
2021-08-14 22:41:13 +00:00
}
return nil
})
2021-08-09 21:35:18 +00:00
2021-08-26 22:18:34 +00:00
if err != nil && fetchAndWriteErr == nil {
2021-08-14 22:41:13 +00:00
fetchAndWriteErr = err
return
}
2021-08-09 21:35:18 +00:00
2021-08-14 22:41:13 +00:00
chunks = append(chunks, &filer_pb.FileChunk{
FileId: assignResult.Fid,
Offset: localOffset,
Size: uint64(size),
ModifiedTsNs: time.Now().UnixNano(),
ETag: etag,
2021-08-14 22:41:13 +00:00
Fid: &filer_pb.FileId{
VolumeId: uint32(fileId.VolumeId),
FileKey: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
},
})
})
2021-08-09 21:35:18 +00:00
}
wg.Wait()
2021-08-16 06:20:46 +00:00
if fetchAndWriteErr != nil {
return nil, fetchAndWriteErr
}
garbage := entry.GetChunks()
2021-08-09 21:35:18 +00:00
newEntry := entry.ShallowClone()
newEntry.Chunks = chunks
newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
2021-08-15 04:46:34 +00:00
newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano()
2021-08-09 21:35:18 +00:00
// this skips meta data log events
if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
fs.filer.DeleteChunks(chunks)
2021-08-09 21:35:18 +00:00
return nil, err
}
fs.filer.DeleteChunks(garbage)
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
2021-08-10 05:11:57 +00:00
resp.Entry = newEntry.ToProtoEntry()
2021-08-09 21:35:18 +00:00
return resp, nil
}