seaweedfs/weed/s3api/s3api_bucket_handlers.go
chrislu 9f9ef1340c use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
2021-12-26 00:15:03 -08:00

312 lines
8.7 KiB
Go

package s3api
import (
"context"
"encoding/xml"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"math"
"net/http"
"time"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult"`
Owner *s3.Owner
Buckets []*s3.Bucket `xml:"Buckets>Bucket"`
}
func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
glog.V(3).Infof("ListBucketsHandler")
var identity *Identity
var s3Err s3err.ErrorCode
if s3a.iam.isEnabled() {
identity, s3Err = s3a.iam.authUser(r)
if s3Err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, s3Err)
return
}
}
var response ListAllMyBucketsResult
entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
identityId := r.Header.Get(xhttp.AmzIdentityId)
var buckets []*s3.Bucket
for _, entry := range entries {
if entry.IsDirectory {
if identity != nil && !identity.canDo(s3_constants.ACTION_LIST, entry.Name) {
continue
}
buckets = append(buckets, &s3.Bucket{
Name: aws.String(entry.Name),
CreationDate: aws.Time(time.Unix(entry.Attributes.Crtime, 0).UTC()),
})
}
}
response = ListAllMyBucketsResult{
Owner: &s3.Owner{
ID: aws.String(identityId),
DisplayName: aws.String(identityId),
},
Buckets: buckets,
}
writeSuccessResponseXML(w, r, response)
}
func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := xhttp.GetBucketAndObject(r)
glog.V(3).Infof("PutBucketHandler %s", bucket)
// avoid duplicated buckets
errCode := s3err.ErrNone
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{
IncludeEcVolumes: true,
IncludeNormalVolumes: true,
}); err != nil {
glog.Errorf("list collection: %v", err)
return fmt.Errorf("list collections: %v", err)
} else {
for _, c := range resp.Collections {
if bucket == c.Name {
errCode = s3err.ErrBucketAlreadyExists
break
}
}
}
return nil
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist {
errCode = s3err.ErrBucketAlreadyExists
}
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
if s3a.iam.isEnabled() {
if _, errCode = s3a.iam.authRequest(r, s3_constants.ACTION_ADMIN); errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
}
fn := func(entry *filer_pb.Entry) {
if identityId := r.Header.Get(xhttp.AmzIdentityId); identityId != "" {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[xhttp.AmzIdentityId] = []byte(identityId)
}
}
// create the folder for bucket, but lazily create actual collection
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, fn); err != nil {
glog.Errorf("PutBucketHandler mkdir: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
writeSuccessResponseEmpty(w, r)
}
func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := xhttp.GetBucketAndObject(r)
glog.V(3).Infof("DeleteBucketHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// delete collection
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
Collection: bucket,
}
glog.V(1).Infof("delete collection: %v", deleteCollectionRequest)
if _, err := client.DeleteCollection(context.Background(), deleteCollectionRequest); err != nil {
return fmt.Errorf("delete collection %s: %v", bucket, err)
}
return nil
})
err = s3a.rm(s3a.option.BucketsPath, bucket, false, true)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := xhttp.GetBucketAndObject(r)
glog.V(3).Infof("HeadBucketHandler %s", bucket)
if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); entry == nil || err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
writeSuccessResponseEmpty(w, r)
}
func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode {
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if entry == nil || err == filer_pb.ErrNotFound {
return s3err.ErrNoSuchBucket
}
if !s3a.hasAccess(r, entry) {
return s3err.ErrAccessDenied
}
return s3err.ErrNone
}
func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool {
isAdmin := r.Header.Get(xhttp.AmzIsAdmin) != ""
if isAdmin {
return true
}
if entry.Extended == nil {
return true
}
identityId := r.Header.Get(xhttp.AmzIdentityId)
if id, ok := entry.Extended[xhttp.AmzIdentityId]; ok {
if identityId != string(id) {
return false
}
}
return true
}
// GetBucketAclHandler Get Bucket ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketAcl.html
func (s3a *S3ApiServer) GetBucketAclHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := xhttp.GetBucketAndObject(r)
glog.V(3).Infof("GetBucketAclHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
response := AccessControlPolicy{}
for _, ident := range s3a.iam.identities {
if len(ident.Credentials) == 0 {
continue
}
for _, action := range ident.Actions {
if !action.overBucket(bucket) || action.getPermission() == "" {
continue
}
id := ident.Credentials[0].AccessKey
if response.Owner.DisplayName == "" && action.isOwner(bucket) && len(ident.Credentials) > 0 {
response.Owner.DisplayName = ident.Name
response.Owner.ID = id
}
response.AccessControlList.Grant = append(response.AccessControlList.Grant, Grant{
Grantee: Grantee{
ID: id,
DisplayName: ident.Name,
Type: "CanonicalUser",
XMLXSI: "CanonicalUser",
XMLNS: "http://www.w3.org/2001/XMLSchema-instance"},
Permission: action.getPermission(),
})
}
}
writeSuccessResponseXML(w, r, response)
}
// GetBucketLifecycleConfigurationHandler Get Bucket Lifecycle configuration
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html
func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := xhttp.GetBucketAndObject(r)
glog.V(3).Infof("GetBucketAclHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
ttls := fc.GetCollectionTtls(bucket)
if len(ttls) == 0 {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration)
return
}
response := Lifecycle{}
for prefix, internalTtl := range ttls {
ttl, _ := needle.ReadTTL(internalTtl)
days := int(ttl.Minutes() / 60 / 24)
if days == 0 {
continue
}
response.Rules = append(response.Rules, Rule{
Status: Enabled, Filter: Filter{
Prefix: Prefix{string: prefix, set: true},
set: true,
},
Expiration: Expiration{Days: days, set: true},
})
}
writeSuccessResponseXML(w, r, response)
}
// PutBucketLifecycleConfigurationHandler Put Bucket Lifecycle configuration
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html
func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
}
// DeleteBucketMetricsConfiguration Delete Bucket Lifecycle
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html
func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}