read <- remote_storage

This commit is contained in:
Chris Lu 2021-07-31 22:39:38 -07:00
parent f5a69a0e44
commit 9df7d16791
10 changed files with 851 additions and 463 deletions

View file

@ -45,7 +45,10 @@ func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *F
for _, entry := range entries {
if entry.Name() == REMOTE_STORAGE_MOUNT_FILE {
rs.loadRemoteStorageMountMapping(entry.Content)
if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil {
return err
}
continue
}
if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
return nil
@ -75,16 +78,11 @@ func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc
}
func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) {
var storageLocation string
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
mountDir = util.FullPath(string(key))
storageLocation = value.(string)
mountDir = util.FullPath(string(key[:len(key)-1]))
remoteLocation = value.(*filer_pb.RemoteStorageLocation)
return true
})
if storageLocation == "" {
return
}
remoteLocation = remote_storage.ParseLocation(storageLocation)
return
}
@ -118,8 +116,8 @@ func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client
return
}
func AddMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) {
mappings := &filer_pb.RemoteStorageMapping{
func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) {
mappings = &filer_pb.RemoteStorageMapping{
Mappings: make(map[string]*filer_pb.RemoteStorageLocation),
}
if len(oldContent) > 0 {
@ -127,6 +125,14 @@ func AddMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteS
glog.Warningf("unmarshal existing mappings: %v", err)
}
}
return
}
func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) {
mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
if unmarshalErr != nil {
// skip
}
// set the new mapping
mappings.Mappings[dir] = storageLocation

View file

@ -3,17 +3,16 @@ package filer
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
)
func (entry *Entry) IsRemoteOnly() bool {
return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.Size > 0
}
func (f *Filer) ReadRemote(w io.Writer, entry *Entry, offset int64, size int64) error {
func (f *Filer) ReadRemote(entry *Entry, offset int64, size int64) (data[]byte, err error) {
client, _, found := f.RemoteStorage.GetRemoteStorageClient(entry.Remote.StorageName)
if !found {
return fmt.Errorf("remote storage %v not found", entry.Remote.StorageName)
return nil, fmt.Errorf("remote storage %v not found", entry.Remote.StorageName)
}
mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath)
@ -26,8 +25,5 @@ func (f *Filer) ReadRemote(w io.Writer, entry *Entry, offset int64, size int64)
Path: remoteFullPath,
}
client.ReadFile(sourceLoc, offset, size, func(w io.Writer) error {
return nil
})
return nil
return client.ReadFile(sourceLoc, offset, size)
}

View file

@ -147,3 +147,7 @@ func (fp *FilerConf_PathConf) Key() interface{} {
key, _ := proto.Marshal(fp)
return string(key)
}
func (fp *RemoteStorageLocation) Key() interface{} {
key, _ := proto.Marshal(fp)
return string(key)
}

View file

@ -56,6 +56,8 @@ service VolumeServer {
}
rpc WriteNeedleBlob (WriteNeedleBlobRequest) returns (WriteNeedleBlobResponse) {
}
rpc FetchAndWriteNeedle (FetchAndWriteNeedleRequest) returns (FetchAndWriteNeedleResponse) {
}
rpc VolumeTailSender (VolumeTailSenderRequest) returns (stream VolumeTailSenderResponse) {
}
@ -276,6 +278,23 @@ message WriteNeedleBlobRequest {
}
message WriteNeedleBlobResponse {
}
message FetchAndWriteNeedleRequest {
uint32 volume_id = 1;
uint64 needle_id = 2;
int64 offset = 3;
int64 size = 4;
// remote info
string remote_type = 5;
string remote_name = 6;
string s3_access_key = 8;
string s3_secret_key = 9;
string s3_region = 10;
string s3_endpoint = 11;
string remote_bucket = 12;
string remote_key = 13;
}
message FetchAndWriteNeedleResponse {
}
message VolumeTailSenderRequest {
uint32 volume_id = 1;

File diff suppressed because it is too large Load diff

View file

@ -3,7 +3,6 @@ package remote_storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"strings"
"sync"
)
@ -31,7 +30,7 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file
type RemoteStorageClient interface {
Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error
ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64, writeFn func(w io.Writer) error) error
ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error)
}
type RemoteStorageClientMaker interface {

View file

@ -7,10 +7,10 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
)
func init() {
@ -65,7 +65,7 @@ func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation,
for !isLastPage && err == nil {
listErr := s.conn.ListObjectsV2Pages(listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, content := range page.Contents {
key := (*content.Key)
key := *content.Key
if len(pathKey) == 0 {
key = "/" + key
} else {
@ -91,6 +91,23 @@ func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation,
}
return
}
func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64, writeFn func(w io.Writer) error) error {
return nil
func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) {
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
u.PartSize = int64(4 * 1024 * 1024)
u.Concurrency = 1
})
dataSlice := make([]byte, int(size))
writerAt := aws.NewWriteAtBuffer(dataSlice)
_, err = downloader.Download(writerAt, &s3.GetObjectInput{
Bucket: aws.String(loc.Bucket),
Key: aws.String(loc.Path[1:]),
Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)),
})
if err != nil {
return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
}
return writerAt.Bytes(), nil
}

View file

@ -164,10 +164,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return err
}
if entry.IsRemoteOnly() {
err = fs.filer.ReadRemote(writer, entry, offset, size)
var data []byte
data, err = fs.filer.ReadRemote(entry, offset, size)
if err != nil {
glog.Errorf("failed to read remote %s: %v", r.URL, err)
}
_, err = w.Write(data)
} else {
err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
if err != nil {

View file

@ -3,7 +3,9 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@ -36,3 +38,41 @@ func (vs *VolumeServer) WriteNeedleBlob(ctx context.Context, req *volume_server_
return resp, nil
}
func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
resp = &volume_server_pb.FetchAndWriteNeedleResponse{}
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
}
remoteConf := &filer_pb.RemoteConf{
Type: req.RemoteType,
Name: req.RemoteName,
S3AccessKey: req.S3AccessKey,
S3SecretKey: req.S3SecretKey,
S3Region: req.S3Region,
S3Endpoint: req.S3Endpoint,
}
client, getClientErr := remote_storage.GetRemoteStorage(remoteConf)
if getClientErr != nil {
return nil, fmt.Errorf("get remote client: %v", getClientErr)
}
remoteStorageLocation := &filer_pb.RemoteStorageLocation{
Name: req.RemoteName,
Bucket: req.RemoteBucket,
Path: req.RemoteKey,
}
data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
if ReadRemoteErr != nil {
return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
}
if err = v.WriteNeedleBlob(types.NeedleId(req.NeedleId), data, types.Size(req.Size)); err != nil {
return nil, fmt.Errorf("write blob needle %d size %d: %v", req.NeedleId, req.Size, err)
}
return resp, nil
}

View file

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"io"
)
@ -49,6 +50,10 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io
return nil
}
if *dir == "" {
return c.listExistingRemoteStorageMounts(commandEnv, writer)
}
remoteStorageLocation := remote_storage.ParseLocation(*remote)
// find configuration for remote storage
@ -71,6 +76,34 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io
return nil
}
func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (err error) {
// read current mapping
var oldContent []byte
err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)
return err
})
if err != nil {
if err != filer_pb.ErrNotFound {
return fmt.Errorf("read existing mapping: %v", err)
}
}
mappings, unmarshalErr := filer.UnmarshalRemoteStorageMappings(oldContent)
if unmarshalErr != nil {
return unmarshalErr
}
m := jsonpb.Marshaler{
EmitDefaults: false,
Indent: " ",
}
return m.Marshal(writer, mappings)
}
func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) {
// read storage configuration data
@ -178,7 +211,7 @@ func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writ
existingEntry.Attributes.Mtime = remoteEntry.LastModifiedAt
_, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
Directory: localDir,
Entry: existingEntry,
Entry: existingEntry,
})
return updateErr
}
@ -210,7 +243,7 @@ func (c *commandRemoteMount) saveMountMapping(commandEnv *CommandEnv, writer io.
}
// add new mapping
newContent, err = filer.AddMapping(oldContent, dir, remoteStorageLocation)
newContent, err = filer.AddRemoteStorageMapping(oldContent, dir, remoteStorageLocation)
if err != nil {
return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err)
}