From 9711a6ffaabdac8516317a5539316e9a2bb83faf Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 18 Nov 2019 19:24:34 -0800 Subject: [PATCH] WIP --- weed/command/scaffold.go | 8 ++ weed/notification/aws_sqs/aws_sqs_pub.go | 6 +- weed/replication/sink/s3sink/s3_sink.go | 6 +- weed/replication/sub/notification_aws_sqs.go | 6 +- weed/storage/backend/backend.go | 4 + weed/storage/backend/s3_backend/s3_backend.go | 120 ++++++++++++++++++ .../storage/backend/s3_backend/s3_sessions.go | 54 ++++++++ 7 files changed, 195 insertions(+), 9 deletions(-) create mode 100644 weed/storage/backend/s3_backend/s3_backend.go create mode 100644 weed/storage/backend/s3_backend/s3_sessions.go diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index ed7df359a..13091764e 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -356,5 +356,13 @@ type = memory # Choose [memory|etcd] type for storing the file id sequence sequencer_etcd_urls = http://127.0.0.1:2379 +[storage.backend.s3] +enabled = true +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +bucket = "your_bucket_name" # an existing bucket +directory = "/" # destination directory + ` ) diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go index c1af7f27a..4c1302abb 100644 --- a/weed/notification/aws_sqs/aws_sqs_pub.go +++ b/weed/notification/aws_sqs/aws_sqs_pub.go @@ -38,13 +38,13 @@ func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) { ) } -func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { +func (k *AwsSqsPub) initialize(awsAccessKeyId, awsSecretAccessKey, region, queueName string) (err error) { config := &aws.Config{ Region: aws.String(region), } - if awsAccessKeyId != "" && aswSecretAccessKey != "" { - config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + if awsAccessKeyId != "" && awsSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") } sess, err := session.NewSession(config) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index d5cad3541..4cff341d0 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -56,7 +56,7 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) { s3sink.filerSource = s } -func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, bucket, dir string) error { +func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string) error { s3sink.region = region s3sink.bucket = bucket s3sink.dir = dir @@ -64,8 +64,8 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, buc config := &aws.Config{ Region: aws.String(s3sink.region), } - if awsAccessKeyId != "" && aswSecretAccessKey != "" { - config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + if awsAccessKeyId != "" && awsSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") } sess, err := session.NewSession(config) diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index f0100f4de..bed26c79c 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -38,13 +38,13 @@ func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { ) } -func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { +func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, queueName string) (err error) { config := &aws.Config{ Region: aws.String(region), } - if awsAccessKeyId != "" && aswSecretAccessKey != "" { - config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + if awsAccessKeyId != "" && awsSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") } sess, err := session.NewSession(config) diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index ae0f84216..3c297f20b 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -13,3 +13,7 @@ type DataStorageBackend interface { GetStat() (datSize int64, modTime time.Time, err error) String() string } + +var ( + StorageBackends []DataStorageBackend +) diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go new file mode 100644 index 000000000..0ff7eca21 --- /dev/null +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -0,0 +1,120 @@ +package s3_backend + +import ( + "fmt" + "strings" + "time" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + _ backend.DataStorageBackend = &S3Backend{} +) + +func init() { + backend.StorageBackends = append(backend.StorageBackends, &S3Backend{}) +} + +type S3Backend struct { + conn s3iface.S3API + region string + bucket string + dir string + vid needle.VolumeId + key string +} + +func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) { + bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) + getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{ + Bucket: &s3backend.bucket, + Key: &s3backend.key, + Range: &bytesRange, + }) + + if getObjectErr != nil { + return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr) + } + defer getObjectOutput.Body.Close() + + return getObjectOutput.Body.Read(p) + +} + +func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) { + panic("implement me") +} + +func (s3backend S3Backend) Truncate(off int64) error { + panic("implement me") +} + +func (s3backend S3Backend) Close() error { + return nil +} + +func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) { + + headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{ + Bucket: &s3backend.bucket, + Key: &s3backend.key, + }) + + if headObjectErr != nil { + return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr) + } + + datSize = int64(*headObjectOutput.ContentLength) + modTime = *headObjectOutput.LastModified + + return +} + +func (s3backend S3Backend) String() string { + return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key) +} + +func (s3backend *S3Backend) GetName() string { + return "s3" +} + +func (s3backend *S3Backend) GetSinkToDirectory() string { + return s3backend.dir +} + +func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error { + glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region")) + glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket")) + glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory")) + + return s3backend.initialize( + configuration.GetString("aws_access_key_id"), + configuration.GetString("aws_secret_access_key"), + configuration.GetString("region"), + configuration.GetString("bucket"), + configuration.GetString("directory"), + vid, + ) +} + +func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string, + vid needle.VolumeId) (err error) { + s3backend.region = region + s3backend.bucket = bucket + s3backend.dir = dir + s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region) + + s3backend.vid = vid + s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid) + if strings.HasPrefix(s3backend.key, "/") { + s3backend.key = s3backend.key[1:] + } + + return err +} diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go new file mode 100644 index 000000000..ef4f8c137 --- /dev/null +++ b/weed/storage/backend/s3_backend/s3_sessions.go @@ -0,0 +1,54 @@ +package s3_backend + +import ( + "fmt" + "sync" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" +) + +var ( + s3Sessions = make(map[string]s3iface.S3API) + sessionsLock sync.RWMutex +) + +func getSession(region string) (s3iface.S3API, bool) { + sessionsLock.RLock() + defer sessionsLock.RUnlock() + + sess, found := s3Sessions[region] + return sess, found +} + +func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S3API, error) { + + sessionsLock.Lock() + defer sessionsLock.Unlock() + + if t, found := s3Sessions[region]; found { + return t, nil + } + + config := &aws.Config{ + Region: aws.String(region), + } + if awsAccessKeyId != "" && awsSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") + } + + sess, err := session.NewSession(config) + if err != nil { + return nil, fmt.Errorf("create aws session in region %s: %v", region, err) + } + + t:= s3.New(sess) + + s3Sessions[region] = t + + return t, nil + +} \ No newline at end of file