package weed_server import ( "context" "fmt" "strings" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/protobuf/proto" ) func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) { // load all mappings mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)) if err != nil { return nil, err } mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content) if err != nil { return nil, err } // find mapping var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation var localMountedDir string for k, loc := range mappings.Mappings { if strings.HasPrefix(req.Directory, k) { localMountedDir, remoteStorageMountedLocation = k, loc } } if localMountedDir == "" { return nil, fmt.Errorf("%s is not mounted", req.Directory) } // find storage configuration storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX)) if err != nil { return nil, err } storageConf := &remote_pb.RemoteConf{} if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil { return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) } // find the entry entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name)) if err == filer_pb.ErrNotFound { return nil, err } resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{} if entry.Remote == nil || entry.Remote.RemoteSize == 0 { return resp, nil } // detect storage option so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "") if err != nil { return resp, err } assignRequest, altRequest := so.ToAssignRequests(1) // find a good chunk size chunkSize := int64(5 * 1024 * 1024) chunkCount := entry.Remote.RemoteSize/chunkSize + 1 for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 { chunkSize *= 2 chunkCount = entry.Remote.RemoteSize/chunkSize + 1 } dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):]) var chunks []*filer_pb.FileChunk var fetchAndWriteErr error var wg sync.WaitGroup limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8) for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize { localOffset := offset wg.Add(1) limitedConcurrentExecutor.Execute(func() { defer wg.Done() size := chunkSize if localOffset+chunkSize > entry.Remote.RemoteSize { size = entry.Remote.RemoteSize - localOffset } // assign one volume server assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) if err != nil { fetchAndWriteErr = err return } if assignResult.Error != "" { fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error) return } fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid) if assignResult.Error != "" { fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr) return } var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica for _, r := range assignResult.Replicas { replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{ Url: r.Url, PublicUrl: r.PublicUrl, GrpcPort: int32(r.GrpcPort), }) } // tell filer to tell volume server to download into needles assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort) var etag string err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ VolumeId: uint32(fileId.VolumeId), NeedleId: uint64(fileId.Key), Cookie: uint32(fileId.Cookie), Offset: localOffset, Size: size, Replicas: replicas, Auth: string(assignResult.Auth), RemoteConf: storageConf, RemoteLocation: &remote_pb.RemoteStorageLocation{ Name: remoteStorageMountedLocation.Name, Bucket: remoteStorageMountedLocation.Bucket, Path: string(dest), }, }) if fetchAndWriteErr != nil { return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr) } else { etag = resp.ETag } return nil }) if err != nil && fetchAndWriteErr == nil { fetchAndWriteErr = err return } chunks = append(chunks, &filer_pb.FileChunk{ FileId: assignResult.Fid, Offset: localOffset, Size: uint64(size), ModifiedTsNs: time.Now().Unix(), ETag: etag, Fid: &filer_pb.FileId{ VolumeId: uint32(fileId.VolumeId), FileKey: uint64(fileId.Key), Cookie: uint32(fileId.Cookie), }, }) }) } wg.Wait() if fetchAndWriteErr != nil { return nil, fetchAndWriteErr } garbage := entry.GetChunks() newEntry := entry.ShallowClone() newEntry.Chunks = chunks newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry) newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano() // this skips meta data log events if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil { fs.filer.DeleteChunks(chunks) return nil, err } fs.filer.DeleteChunks(garbage) fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil) resp.Entry = newEntry.ToProtoEntry() return resp, nil }