seaweedfs/weed/server/filer_grpc_server_remote.go
chrislu 9f9ef1340c use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
2021-12-26 00:15:03 -08:00

194 lines
6.1 KiB
Go

package weed_server
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
)
func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) {
// load all mappings
mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
if err != nil {
return nil, err
}
mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
if err != nil {
return nil, err
}
// find mapping
var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
var localMountedDir string
for k, loc := range mappings.Mappings {
if strings.HasPrefix(req.Directory, k) {
localMountedDir, remoteStorageMountedLocation = k, loc
}
}
if localMountedDir == "" {
return nil, fmt.Errorf("%s is not mounted", req.Directory)
}
// find storage configuration
storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
if err != nil {
return nil, err
}
storageConf := &remote_pb.RemoteConf{}
if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
}
// find the entry
entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
if err == filer_pb.ErrNotFound {
return nil, err
}
resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{}
if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
return resp, nil
}
// detect storage option
so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
if err != nil {
return resp, err
}
assignRequest, altRequest := so.ToAssignRequests(1)
// find a good chunk size
chunkSize := int64(5 * 1024 * 1024)
chunkCount := entry.Remote.RemoteSize/chunkSize + 1
for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
chunkSize *= 2
chunkCount = entry.Remote.RemoteSize/chunkSize + 1
}
dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
var chunks []*filer_pb.FileChunk
var fetchAndWriteErr error
var wg sync.WaitGroup
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
localOffset := offset
wg.Add(1)
limitedConcurrentExecutor.Execute(func() {
defer wg.Done()
size := chunkSize
if localOffset+chunkSize > entry.Remote.RemoteSize {
size = entry.Remote.RemoteSize - localOffset
}
// assign one volume server
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
fetchAndWriteErr = err
return
}
if assignResult.Error != "" {
fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
return
}
fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
if assignResult.Error != "" {
fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
return
}
var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
for _, r := range assignResult.Replicas {
replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
Url: r.Url,
PublicUrl: r.PublicUrl,
GrpcPort: int32(r.GrpcPort),
})
}
// tell filer to tell volume server to download into needles
assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
Offset: localOffset,
Size: size,
Replicas: replicas,
Auth: string(assignResult.Auth),
RemoteConf: storageConf,
RemoteLocation: &remote_pb.RemoteStorageLocation{
Name: remoteStorageMountedLocation.Name,
Bucket: remoteStorageMountedLocation.Bucket,
Path: string(dest),
},
})
if fetchAndWriteErr != nil {
return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
}
return nil
})
if err != nil && fetchAndWriteErr == nil {
fetchAndWriteErr = err
return
}
chunks = append(chunks, &filer_pb.FileChunk{
FileId: assignResult.Fid,
Offset: localOffset,
Size: uint64(size),
Mtime: time.Now().Unix(),
Fid: &filer_pb.FileId{
VolumeId: uint32(fileId.VolumeId),
FileKey: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
},
})
})
}
wg.Wait()
if fetchAndWriteErr != nil {
return nil, fetchAndWriteErr
}
garbage := entry.Chunks
newEntry := entry.ShallowClone()
newEntry.Chunks = chunks
newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano()
// this skips meta data log events
if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
fs.filer.DeleteChunks(chunks)
return nil, err
}
fs.filer.DeleteChunks(garbage)
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
resp.Entry = newEntry.ToProtoEntry()
return resp, nil
}