mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
s3: sync bucket info from filer (#3759)
This commit is contained in:
parent
5e9039d728
commit
3de1e19780
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) {
|
func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, prefix string, directoriesToWatch []string) {
|
||||||
|
|
||||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
|
||||||
|
|
||||||
_ = s3a.onIamConfigUpdate(dir, fileName, content)
|
_ = s3a.onIamConfigUpdate(dir, fileName, content)
|
||||||
_ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
|
_ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
|
||||||
|
_ = s3a.onBucketMetadataChange(dir, message.OldEntry, message.NewEntry)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -35,7 +36,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
|
||||||
var clientEpoch int32
|
var clientEpoch int32
|
||||||
util.RetryForever("followIamChanges", func() error {
|
util.RetryForever("followIamChanges", func() error {
|
||||||
clientEpoch++
|
clientEpoch++
|
||||||
return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, clientEpoch, prefix, nil, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError)
|
return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, clientEpoch, prefix, directoriesToWatch, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError)
|
||||||
}, func(err error) bool {
|
}, func(err error) bool {
|
||||||
glog.V(0).Infof("iam follow metadata changes: %v", err)
|
glog.V(0).Infof("iam follow metadata changes: %v", err)
|
||||||
return true
|
return true
|
||||||
|
@ -63,3 +64,17 @@ func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, conte
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//reload bucket metadata
|
||||||
|
func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
|
||||||
|
if dir == s3a.option.BucketsPath {
|
||||||
|
if newEntry != nil {
|
||||||
|
s3a.bucketRegistry.LoadBucketMetadata(newEntry)
|
||||||
|
glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry)
|
||||||
|
} else {
|
||||||
|
s3a.bucketRegistry.RemoveBucketMetadata(oldEntry)
|
||||||
|
glog.V(0).Infof("remove bucketMetadata %s/%s", dir, newEntry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
213
weed/s3api/bucket_metadata.go
Normal file
213
weed/s3api/bucket_metadata.go
Normal file
|
@ -0,0 +1,213 @@
|
||||||
|
package s3api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/aws/aws-sdk-go/private/protocol/json/jsonutil"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||||
|
|
||||||
|
//"github.com/seaweedfs/seaweedfs/weed/s3api"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) {
|
||||||
|
entry, err := filer_pb.GetEntry(r.s3a, util.NewFullPath(r.s3a.option.BucketsPath, bucketName))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return buildBucketMetadata(entry), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type BucketMetaData struct {
|
||||||
|
_ struct{} `type:"structure"`
|
||||||
|
|
||||||
|
Name string
|
||||||
|
|
||||||
|
//By default, when another AWS account uploads an object to S3 bucket,
|
||||||
|
//that account (the object writer) owns the object, has access to it, and
|
||||||
|
//can grant other users access to it through ACLs. You can use Object Ownership
|
||||||
|
//to change this default behavior so that ACLs are disabled and you, as the
|
||||||
|
//bucket owner, automatically own every object in your bucket.
|
||||||
|
ObjectOwnership string
|
||||||
|
|
||||||
|
// Container for the bucket owner's display name and ID.
|
||||||
|
Owner *s3.Owner `type:"structure"`
|
||||||
|
|
||||||
|
// A list of grants for access controls.
|
||||||
|
Acl []*s3.Grant `locationName:"AccessControlList" locationNameList:"Grant" type:"list"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type BucketRegistry struct {
|
||||||
|
metadataCache map[string]*BucketMetaData
|
||||||
|
metadataCacheLock sync.RWMutex
|
||||||
|
|
||||||
|
notFound map[string]struct{}
|
||||||
|
notFoundLock sync.RWMutex
|
||||||
|
s3a *S3ApiServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry {
|
||||||
|
br := &BucketRegistry{
|
||||||
|
metadataCache: make(map[string]*BucketMetaData),
|
||||||
|
notFound: make(map[string]struct{}),
|
||||||
|
s3a: s3a,
|
||||||
|
}
|
||||||
|
err := br.init()
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatal("init bucket registry failed", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return br
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) init() error {
|
||||||
|
err := filer_pb.List(r.s3a, r.s3a.option.BucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||||
|
r.LoadBucketMetadata(entry)
|
||||||
|
return nil
|
||||||
|
}, "", false, math.MaxUint32)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) {
|
||||||
|
bucketMetadata := buildBucketMetadata(entry)
|
||||||
|
r.metadataCacheLock.Lock()
|
||||||
|
defer r.metadataCacheLock.Unlock()
|
||||||
|
r.metadataCache[entry.Name] = bucketMetadata
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData {
|
||||||
|
entryJson, _ := json.Marshal(entry)
|
||||||
|
glog.V(3).Infof("build bucket metadata,entry=%s", entryJson)
|
||||||
|
bucketMetadata := &BucketMetaData{
|
||||||
|
Name: entry.Name,
|
||||||
|
|
||||||
|
//Default ownership: OwnershipBucketOwnerEnforced, which means Acl is disabled
|
||||||
|
ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
|
||||||
|
|
||||||
|
// Default owner: `AccountAdmin`
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if entry.Extended != nil {
|
||||||
|
//ownership control
|
||||||
|
ownership, ok := entry.Extended[s3_constants.ExtOwnershipKey]
|
||||||
|
if ok {
|
||||||
|
ownership := string(ownership)
|
||||||
|
valid := s3_constants.ValidateOwnership(ownership)
|
||||||
|
if valid {
|
||||||
|
bucketMetadata.ObjectOwnership = ownership
|
||||||
|
} else {
|
||||||
|
glog.Warningf("Invalid ownership: %s, bucket: %s", ownership, bucketMetadata.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//access control policy
|
||||||
|
acpBytes, ok := entry.Extended[s3_constants.ExtAcpKey]
|
||||||
|
if ok {
|
||||||
|
var acp s3.AccessControlPolicy
|
||||||
|
err := jsonutil.UnmarshalJSON(&acp, bytes.NewReader(acpBytes))
|
||||||
|
if err == nil {
|
||||||
|
//validate owner
|
||||||
|
if acp.Owner != nil && acp.Owner.ID != nil {
|
||||||
|
bucketMetadata.Owner = acp.Owner
|
||||||
|
} else {
|
||||||
|
glog.Warningf("bucket ownerId is empty! bucket: %s", bucketMetadata.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
//acl
|
||||||
|
bucketMetadata.Acl = acp.Grants
|
||||||
|
} else {
|
||||||
|
glog.Warningf("Unmarshal ACP: %s(%v), bucket: %s", string(acpBytes), err, bucketMetadata.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bucketMetadata
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) {
|
||||||
|
r.removeMetadataCache(entry.Name)
|
||||||
|
r.unMarkNotFound(entry.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
|
||||||
|
r.metadataCacheLock.RLock()
|
||||||
|
bucketMetadata, ok := r.metadataCache[bucketName]
|
||||||
|
r.metadataCacheLock.RUnlock()
|
||||||
|
if ok {
|
||||||
|
return bucketMetadata, s3err.ErrNone
|
||||||
|
}
|
||||||
|
|
||||||
|
r.notFoundLock.RLock()
|
||||||
|
_, ok = r.notFound[bucketName]
|
||||||
|
r.notFoundLock.RUnlock()
|
||||||
|
if ok {
|
||||||
|
return nil, s3err.ErrNoSuchBucket
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketMetadata, errCode := r.LoadBucketMetadataFromFiler(bucketName)
|
||||||
|
if errCode != s3err.ErrNone {
|
||||||
|
return nil, errCode
|
||||||
|
}
|
||||||
|
|
||||||
|
r.setMetadataCache(bucketMetadata)
|
||||||
|
r.unMarkNotFound(bucketName)
|
||||||
|
return bucketMetadata, s3err.ErrNone
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
|
||||||
|
r.notFoundLock.Lock()
|
||||||
|
defer r.notFoundLock.Unlock()
|
||||||
|
|
||||||
|
//check if already exists
|
||||||
|
r.metadataCacheLock.RLock()
|
||||||
|
bucketMetaData, ok := r.metadataCache[bucketName]
|
||||||
|
r.metadataCacheLock.RUnlock()
|
||||||
|
if ok {
|
||||||
|
return bucketMetaData, s3err.ErrNone
|
||||||
|
}
|
||||||
|
|
||||||
|
//if not exists, load from filer
|
||||||
|
bucketMetadata, err := loadBucketMetadataFromFiler(r, bucketName)
|
||||||
|
if err != nil {
|
||||||
|
if err == filer_pb.ErrNotFound {
|
||||||
|
// The bucket doesn't actually exist and should no longer loaded from the filer
|
||||||
|
r.notFound[bucketName] = struct{}{}
|
||||||
|
return nil, s3err.ErrNoSuchBucket
|
||||||
|
}
|
||||||
|
return nil, s3err.ErrInternalError
|
||||||
|
}
|
||||||
|
return bucketMetadata, s3err.ErrNone
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) setMetadataCache(metadata *BucketMetaData) {
|
||||||
|
r.metadataCacheLock.Lock()
|
||||||
|
defer r.metadataCacheLock.Unlock()
|
||||||
|
r.metadataCache[metadata.Name] = metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) removeMetadataCache(bucket string) {
|
||||||
|
r.metadataCacheLock.Lock()
|
||||||
|
defer r.metadataCacheLock.Unlock()
|
||||||
|
delete(r.metadataCache, bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) markNotFound(bucket string) {
|
||||||
|
r.notFoundLock.Lock()
|
||||||
|
defer r.notFoundLock.Unlock()
|
||||||
|
r.notFound[bucket] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BucketRegistry) unMarkNotFound(bucket string) {
|
||||||
|
r.notFoundLock.Lock()
|
||||||
|
defer r.notFoundLock.Unlock()
|
||||||
|
delete(r.notFound, bucket)
|
||||||
|
}
|
236
weed/s3api/bucket_metadata_test.go
Normal file
236
weed/s3api/bucket_metadata_test.go
Normal file
|
@ -0,0 +1,236 @@
|
||||||
|
package s3api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/aws/aws-sdk-go/private/protocol/json/jsonutil"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BucketMetadataTestCase struct {
|
||||||
|
filerEntry *filer_pb.Entry
|
||||||
|
expectBucketMetadata *BucketMetaData
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
//bad entry
|
||||||
|
badEntry = &filer_pb.Entry{
|
||||||
|
Name: "badEntry",
|
||||||
|
}
|
||||||
|
|
||||||
|
//good entry
|
||||||
|
goodEntryAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
},
|
||||||
|
Grants: s3_constants.PublicRead,
|
||||||
|
})
|
||||||
|
goodEntry = &filer_pb.Entry{
|
||||||
|
Name: "entryWithValidAcp",
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced),
|
||||||
|
s3_constants.ExtAcpKey: goodEntryAcp,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
//ownership is ""
|
||||||
|
ownershipEmptyStr = &filer_pb.Entry{
|
||||||
|
Name: "ownershipEmptyStr",
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtOwnershipKey: []byte(""),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
//ownership valid
|
||||||
|
ownershipValid = &filer_pb.Entry{
|
||||||
|
Name: "ownershipValid",
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
//acp is ""
|
||||||
|
acpEmptyStr = &filer_pb.Entry{
|
||||||
|
Name: "acpEmptyStr",
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtAcpKey: []byte(""),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
//acp is empty object
|
||||||
|
acpEmptyObjectAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{
|
||||||
|
Owner: nil,
|
||||||
|
Grants: nil,
|
||||||
|
})
|
||||||
|
acpEmptyObject = &filer_pb.Entry{
|
||||||
|
Name: "acpEmptyObject",
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtAcpKey: acpEmptyObjectAcp,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
//acp owner is nil
|
||||||
|
acpOwnerNilAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{
|
||||||
|
Owner: nil,
|
||||||
|
Grants: make([]*s3.Grant, 1),
|
||||||
|
})
|
||||||
|
acpOwnerNil = &filer_pb.Entry{
|
||||||
|
Name: "acpOwnerNil",
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtAcpKey: acpOwnerNilAcp,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
//load filer is
|
||||||
|
loadFilerBucket = make(map[string]int, 1)
|
||||||
|
//override `loadBucketMetadataFromFiler` to avoid really load from filer
|
||||||
|
)
|
||||||
|
|
||||||
|
var tcs = []*BucketMetadataTestCase{
|
||||||
|
{
|
||||||
|
badEntry, &BucketMetaData{
|
||||||
|
Name: badEntry.Name,
|
||||||
|
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
},
|
||||||
|
Acl: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
goodEntry, &BucketMetaData{
|
||||||
|
Name: goodEntry.Name,
|
||||||
|
ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
},
|
||||||
|
Acl: s3_constants.PublicRead,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ownershipEmptyStr, &BucketMetaData{
|
||||||
|
Name: ownershipEmptyStr.Name,
|
||||||
|
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
},
|
||||||
|
Acl: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ownershipValid, &BucketMetaData{
|
||||||
|
Name: ownershipValid.Name,
|
||||||
|
ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
},
|
||||||
|
Acl: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
acpEmptyStr, &BucketMetaData{
|
||||||
|
Name: acpEmptyStr.Name,
|
||||||
|
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
},
|
||||||
|
Acl: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
acpEmptyObject, &BucketMetaData{
|
||||||
|
Name: acpEmptyObject.Name,
|
||||||
|
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
},
|
||||||
|
Acl: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
acpOwnerNil, &BucketMetaData{
|
||||||
|
Name: acpOwnerNil.Name,
|
||||||
|
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
|
||||||
|
Owner: &s3.Owner{
|
||||||
|
DisplayName: &AccountAdmin.Name,
|
||||||
|
ID: &AccountAdmin.Id,
|
||||||
|
},
|
||||||
|
Acl: make([]*s3.Grant, 0),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildBucketMetadata(t *testing.T) {
|
||||||
|
for _, tc := range tcs {
|
||||||
|
resultBucketMetadata := buildBucketMetadata(tc.filerEntry)
|
||||||
|
if !reflect.DeepEqual(resultBucketMetadata, tc.expectBucketMetadata) {
|
||||||
|
t.Fatalf("result is unexpect: \nresult: %v, \nexpect: %v", resultBucketMetadata, tc.expectBucketMetadata)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetBucketMetadata(t *testing.T) {
|
||||||
|
loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
loadFilerBucket[bucketName] = loadFilerBucket[bucketName] + 1
|
||||||
|
return &BucketMetaData{
|
||||||
|
Name: bucketName,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
br := &BucketRegistry{
|
||||||
|
metadataCache: make(map[string]*BucketMetaData),
|
||||||
|
notFound: make(map[string]struct{}),
|
||||||
|
s3a: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
//start 40 goroutine for
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
closeCh := make(chan struct{})
|
||||||
|
for i := 0; i < 40; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
outLoop:
|
||||||
|
for {
|
||||||
|
for j := 0; j < 5; j++ {
|
||||||
|
select {
|
||||||
|
case <-closeCh:
|
||||||
|
break outLoop
|
||||||
|
default:
|
||||||
|
reqBucket := fmt.Sprintf("%c", 67+j)
|
||||||
|
_, errCode := br.GetBucketMetadata(reqBucket)
|
||||||
|
if errCode != s3err.ErrNone {
|
||||||
|
close(closeCh)
|
||||||
|
t.Error("not expect")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Microsecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
close(closeCh)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
//Each bucket is loaded from the filer only once
|
||||||
|
for bucketName, loadCount := range loadFilerBucket {
|
||||||
|
if loadCount != 1 {
|
||||||
|
t.Fatalf("lock is uneffict: %s, %d", bucketName, loadCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
65
weed/s3api/s3_constants/acp_canned_acl.go
Normal file
65
weed/s3api/s3_constants/acp_canned_acl.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package s3_constants
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
CannedAclPrivate = "private"
|
||||||
|
CannedAclPublicRead = "public-read"
|
||||||
|
CannedAclPublicReadWrite = "public-read-write"
|
||||||
|
CannedAclAuthenticatedRead = "authenticated-read"
|
||||||
|
CannedAclLogDeliveryWrite = "log-delivery-write"
|
||||||
|
CannedAclBucketOwnerRead = "bucket-owner-read"
|
||||||
|
CannedAclBucketOwnerFullControl = "bucket-owner-full-control"
|
||||||
|
CannedAclAwsExecRead = "aws-exec-read"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
PublicRead = []*s3.Grant{
|
||||||
|
{
|
||||||
|
Grantee: &s3.Grantee{
|
||||||
|
Type: &GrantTypeGroup,
|
||||||
|
URI: &GranteeGroupAllUsers,
|
||||||
|
},
|
||||||
|
Permission: &PermissionRead,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
PublicReadWrite = []*s3.Grant{
|
||||||
|
{
|
||||||
|
Grantee: &s3.Grantee{
|
||||||
|
Type: &GrantTypeGroup,
|
||||||
|
URI: &GranteeGroupAllUsers,
|
||||||
|
},
|
||||||
|
Permission: &PermissionRead,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Grantee: &s3.Grantee{
|
||||||
|
Type: &GrantTypeGroup,
|
||||||
|
URI: &GranteeGroupAllUsers,
|
||||||
|
},
|
||||||
|
Permission: &PermissionWrite,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
AuthenticatedRead = []*s3.Grant{
|
||||||
|
{
|
||||||
|
Grantee: &s3.Grantee{
|
||||||
|
Type: &GrantTypeGroup,
|
||||||
|
URI: &GranteeGroupAuthenticatedUsers,
|
||||||
|
},
|
||||||
|
Permission: &PermissionRead,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
LogDeliveryWrite = []*s3.Grant{
|
||||||
|
{
|
||||||
|
Grantee: &s3.Grantee{
|
||||||
|
Type: &GrantTypeGroup,
|
||||||
|
URI: &GranteeGroupLogDelivery,
|
||||||
|
},
|
||||||
|
Permission: &PermissionWrite,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
8
weed/s3api/s3_constants/acp_grantee_group.go
Normal file
8
weed/s3api/s3_constants/acp_grantee_group.go
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
package s3_constants
|
||||||
|
|
||||||
|
//Amazon S3 predefined groups
|
||||||
|
var (
|
||||||
|
GranteeGroupAllUsers = "http://acs.amazonaws.com/groups/global/AllUsers"
|
||||||
|
GranteeGroupAuthenticatedUsers = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
|
||||||
|
GranteeGroupLogDelivery = "http://acs.amazonaws.com/groups/s3/LogDelivery"
|
||||||
|
)
|
7
weed/s3api/s3_constants/acp_grantee_type.go
Normal file
7
weed/s3api/s3_constants/acp_grantee_type.go
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
package s3_constants
|
||||||
|
|
||||||
|
var (
|
||||||
|
GrantTypeCanonicalUser = "CanonicalUser"
|
||||||
|
GrantTypeAmazonCustomerByEmail = "AmazonCustomerByEmail"
|
||||||
|
GrantTypeGroup = "Group"
|
||||||
|
)
|
18
weed/s3api/s3_constants/acp_ownership.go
Normal file
18
weed/s3api/s3_constants/acp_ownership.go
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
package s3_constants
|
||||||
|
|
||||||
|
var (
|
||||||
|
OwnershipBucketOwnerPreferred = "BucketOwnerPreferred"
|
||||||
|
OwnershipObjectWriter = "ObjectWriter"
|
||||||
|
OwnershipBucketOwnerEnforced = "BucketOwnerEnforced"
|
||||||
|
|
||||||
|
DefaultOwnershipForCreate = OwnershipObjectWriter
|
||||||
|
DefaultOwnershipForExists = OwnershipBucketOwnerEnforced
|
||||||
|
)
|
||||||
|
|
||||||
|
func ValidateOwnership(ownership string) bool {
|
||||||
|
if ownership == "" || (ownership != OwnershipBucketOwnerPreferred && ownership != OwnershipObjectWriter && ownership != OwnershipBucketOwnerEnforced) {
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
9
weed/s3api/s3_constants/acp_permisson.go
Normal file
9
weed/s3api/s3_constants/acp_permisson.go
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
package s3_constants
|
||||||
|
|
||||||
|
var (
|
||||||
|
PermissionFullControl = "FULL_CONTROL"
|
||||||
|
PermissionRead = "READ"
|
||||||
|
PermissionWrite = "WRITE"
|
||||||
|
PermissionReadAcp = "READ_ACP"
|
||||||
|
PermissionWriteAcp = "WRITE_ACP"
|
||||||
|
)
|
6
weed/s3api/s3_constants/extend_key.go
Normal file
6
weed/s3api/s3_constants/extend_key.go
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
package s3_constants
|
||||||
|
|
||||||
|
const (
|
||||||
|
ExtAcpKey = "Seaweed-X-Amz-Acp"
|
||||||
|
ExtOwnershipKey = "Seaweed-X-Amz-Ownership"
|
||||||
|
)
|
|
@ -41,6 +41,7 @@ type S3ApiServer struct {
|
||||||
filerGuard *security.Guard
|
filerGuard *security.Guard
|
||||||
client *http.Client
|
client *http.Client
|
||||||
accountManager *AccountManager
|
accountManager *AccountManager
|
||||||
|
bucketRegistry *BucketRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
|
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
|
||||||
|
@ -61,6 +62,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
|
||||||
cb: NewCircuitBreaker(option),
|
cb: NewCircuitBreaker(option),
|
||||||
}
|
}
|
||||||
s3ApiServer.accountManager = NewAccountManager(s3ApiServer)
|
s3ApiServer.accountManager = NewAccountManager(s3ApiServer)
|
||||||
|
s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer)
|
||||||
if option.LocalFilerSocket == "" {
|
if option.LocalFilerSocket == "" {
|
||||||
s3ApiServer.client = &http.Client{Transport: &http.Transport{
|
s3ApiServer.client = &http.Client{Transport: &http.Transport{
|
||||||
MaxIdleConns: 1024,
|
MaxIdleConns: 1024,
|
||||||
|
@ -78,7 +80,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
|
||||||
|
|
||||||
s3ApiServer.registerRouter(router)
|
s3ApiServer.registerRouter(router)
|
||||||
|
|
||||||
go s3ApiServer.subscribeMetaEvents("s3", filer.DirectoryEtcRoot, time.Now().UnixNano())
|
go s3ApiServer.subscribeMetaEvents("s3", time.Now().UnixNano(), filer.DirectoryEtcRoot, []string{option.BucketsPath})
|
||||||
return s3ApiServer, nil
|
return s3ApiServer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package weed_server
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
//"github.com/seaweedfs/seaweedfs/weed/s3api"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -374,6 +375,12 @@ func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//acp
|
||||||
|
acp := r.Header.Get(s3_constants.ExtAcpKey)
|
||||||
|
if len(acp) > 0 {
|
||||||
|
metadata[s3_constants.ExtAcpKey] = []byte(acp)
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue