mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge branch 'master' of https://github.com/chrislusf/seaweedfs
This commit is contained in:
commit
a8abab2412
|
@ -28,6 +28,13 @@ cd $GOPATH/src/github.com/chrislusf/seaweedfs/docker
|
||||||
make
|
make
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### S3 cmd
|
||||||
|
|
||||||
|
list
|
||||||
|
```
|
||||||
|
s3cmd --no-ssl --host=127.0.0.1:8333 ls s3://
|
||||||
|
```
|
||||||
|
|
||||||
## Build and push a multiarch build
|
## Build and push a multiarch build
|
||||||
|
|
||||||
Make sure that `docker buildx` is supported (might be an experimental docker feature)
|
Make sure that `docker buildx` is supported (might be an experimental docker feature)
|
||||||
|
|
|
@ -29,7 +29,7 @@ services:
|
||||||
- 8111:8111
|
- 8111:8111
|
||||||
- 8888:8888
|
- 8888:8888
|
||||||
- 18888:18888
|
- 18888:18888
|
||||||
command: '-v=1 filer -master="master:9333" -iam'
|
command: '-v=1 filer -ip.bind=0.0.0.0 -master="master:9333" -iam -iam.ip=filer'
|
||||||
depends_on:
|
depends_on:
|
||||||
- master
|
- master
|
||||||
- volume
|
- volume
|
||||||
|
@ -41,7 +41,7 @@ services:
|
||||||
image: chrislusf/seaweedfs:local
|
image: chrislusf/seaweedfs:local
|
||||||
ports:
|
ports:
|
||||||
- 8333:8333
|
- 8333:8333
|
||||||
command: '-v=1 s3 -filer="filer:8888"'
|
command: '-v=1 s3 -filer="filer:8888" -ip.bind=s3'
|
||||||
depends_on:
|
depends_on:
|
||||||
- master
|
- master
|
||||||
- volume
|
- volume
|
||||||
|
|
|
@ -20,7 +20,7 @@ services:
|
||||||
ports:
|
ports:
|
||||||
- 8888:8888
|
- 8888:8888
|
||||||
- 18888:18888
|
- 18888:18888
|
||||||
command: 'filer -master="master:9333"'
|
command: 'filer -master="master:9333" -ip.bind=0.0.0.0'
|
||||||
depends_on:
|
depends_on:
|
||||||
- master
|
- master
|
||||||
- volume
|
- volume
|
||||||
|
@ -28,7 +28,7 @@ services:
|
||||||
image: chrislusf/seaweedfs:dev # use a remote dev image
|
image: chrislusf/seaweedfs:dev # use a remote dev image
|
||||||
ports:
|
ports:
|
||||||
- 8333:8333
|
- 8333:8333
|
||||||
command: 's3 -filer="filer:8888"'
|
command: 's3 -filer="filer:8888" -ip.bind=0.0.0.0'
|
||||||
depends_on:
|
depends_on:
|
||||||
- master
|
- master
|
||||||
- volume
|
- volume
|
||||||
|
|
|
@ -46,8 +46,6 @@ func NewCluster() *Cluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers {
|
func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers {
|
||||||
cluster.filersLock.Lock()
|
|
||||||
defer cluster.filersLock.Unlock()
|
|
||||||
filers, found := cluster.filerGroup2filers[filerGroup]
|
filers, found := cluster.filerGroup2filers[filerGroup]
|
||||||
if !found && createIfNotFound {
|
if !found && createIfNotFound {
|
||||||
filers = &Filers{
|
filers = &Filers{
|
||||||
|
@ -63,6 +61,8 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd
|
||||||
filerGroup := FilerGroup(ns)
|
filerGroup := FilerGroup(ns)
|
||||||
switch nodeType {
|
switch nodeType {
|
||||||
case FilerType:
|
case FilerType:
|
||||||
|
cluster.filersLock.Lock()
|
||||||
|
defer cluster.filersLock.Unlock()
|
||||||
filers := cluster.getFilers(filerGroup, true)
|
filers := cluster.getFilers(filerGroup, true)
|
||||||
if existingNode, found := filers.filers[address]; found {
|
if existingNode, found := filers.filers[address]; found {
|
||||||
existingNode.counter++
|
existingNode.counter++
|
||||||
|
@ -115,6 +115,8 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb
|
||||||
filerGroup := FilerGroup(ns)
|
filerGroup := FilerGroup(ns)
|
||||||
switch nodeType {
|
switch nodeType {
|
||||||
case FilerType:
|
case FilerType:
|
||||||
|
cluster.filersLock.Lock()
|
||||||
|
defer cluster.filersLock.Unlock()
|
||||||
filers := cluster.getFilers(filerGroup, false)
|
filers := cluster.getFilers(filerGroup, false)
|
||||||
if filers == nil {
|
if filers == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -165,12 +167,12 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb
|
||||||
func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) {
|
func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) {
|
||||||
switch nodeType {
|
switch nodeType {
|
||||||
case FilerType:
|
case FilerType:
|
||||||
|
cluster.filersLock.RLock()
|
||||||
|
defer cluster.filersLock.RUnlock()
|
||||||
filers := cluster.getFilers(filerGroup, false)
|
filers := cluster.getFilers(filerGroup, false)
|
||||||
if filers == nil {
|
if filers == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cluster.filersLock.RLock()
|
|
||||||
defer cluster.filersLock.RUnlock()
|
|
||||||
for _, node := range filers.filers {
|
for _, node := range filers.filers {
|
||||||
nodes = append(nodes, node)
|
nodes = append(nodes, node)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,8 @@ package cluster
|
||||||
import (
|
import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -45,3 +47,35 @@ func TestClusterAddRemoveNodes(t *testing.T) {
|
||||||
c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1"))
|
c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1"))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrentAddRemoveNodes(t *testing.T) {
|
||||||
|
c := NewCluster()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
address := strconv.Itoa(i)
|
||||||
|
c.AddClusterNode("", "filer", pb.ServerAddress(address), "23.45")
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
address := strconv.Itoa(i)
|
||||||
|
node := c.RemoveClusterNode("", "filer", pb.ServerAddress(address))
|
||||||
|
|
||||||
|
if len(node) == 0 {
|
||||||
|
t.Errorf("TestConcurrentAddRemoveNodes: node[%s] not found", address)
|
||||||
|
return
|
||||||
|
} else if node[0].ClusterNodeUpdate.Address != address {
|
||||||
|
t.Errorf("TestConcurrentAddRemoveNodes: expect:%s, actual:%s", address, node[0].ClusterNodeUpdate.Address)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
//go:build elastic && ydb && gocdk && hdfs
|
//go:build elastic && ydb && gocdk && tikv
|
||||||
// +build elastic,ydb,gocdk,hdfs
|
// +build elastic,ydb,gocdk,tikv
|
||||||
|
|
||||||
package command
|
package command
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,12 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
||||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
entry.Extended = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
|
entry.Extended, err = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, err)
|
||||||
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
|
||||||
|
return
|
||||||
|
}
|
||||||
err = s3a.touch(dir, name, entry)
|
err = s3a.touch(dir, name, entry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
||||||
|
@ -252,7 +257,7 @@ func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTaggin
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte) {
|
func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte, err error) {
|
||||||
metadata = make(map[string][]byte)
|
metadata = make(map[string][]byte)
|
||||||
|
|
||||||
if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
|
if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
|
||||||
|
@ -277,16 +282,18 @@ func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, rep
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if replaceTagging {
|
if replaceTagging {
|
||||||
if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" {
|
if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" {
|
||||||
for _, v := range strings.Split(tags, "&") {
|
parsedTags, err := parseTagsHeader(tags)
|
||||||
tag := strings.Split(v, "=")
|
if err != nil {
|
||||||
if len(tag) == 2 {
|
return nil, err
|
||||||
metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
|
}
|
||||||
} else if len(tag) == 1 {
|
err = ValidateTags(parsedTags)
|
||||||
metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = nil
|
if err != nil {
|
||||||
}
|
return nil, err
|
||||||
|
}
|
||||||
|
for k, v := range parsedTags {
|
||||||
|
metadata[s3_constants.AmzObjectTagging+"-"+k] = []byte(v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -332,6 +332,19 @@ var processMetadataBytesTestCases = []struct {
|
||||||
"X-Amz-Tagging-type": "request",
|
"X-Amz-Tagging-type": "request",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
108,
|
||||||
|
H{
|
||||||
|
"User-Agent": "firefox",
|
||||||
|
"X-Amz-Meta-My-Meta": "request",
|
||||||
|
"X-Amz-Tagging": "A=B&a=b&type=request*",
|
||||||
|
s3_constants.AmzUserMetaDirective: DirectiveReplace,
|
||||||
|
s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
|
||||||
|
},
|
||||||
|
H{},
|
||||||
|
H{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessMetadata(t *testing.T) {
|
func TestProcessMetadata(t *testing.T) {
|
||||||
|
@ -339,7 +352,6 @@ func TestProcessMetadata(t *testing.T) {
|
||||||
reqHeader := transferHToHeader(tc.request)
|
reqHeader := transferHToHeader(tc.request)
|
||||||
existing := transferHToHeader(tc.existing)
|
existing := transferHToHeader(tc.existing)
|
||||||
replaceMeta, replaceTagging := replaceDirective(reqHeader)
|
replaceMeta, replaceTagging := replaceDirective(reqHeader)
|
||||||
|
|
||||||
err := processMetadata(reqHeader, existing, replaceMeta, replaceTagging, func(_ string, _ string) (tags map[string]string, err error) {
|
err := processMetadata(reqHeader, existing, replaceMeta, replaceTagging, func(_ string, _ string) (tags map[string]string, err error) {
|
||||||
return tc.getTags, nil
|
return tc.getTags, nil
|
||||||
}, "", "")
|
}, "", "")
|
||||||
|
@ -367,7 +379,7 @@ func TestProcessMetadataBytes(t *testing.T) {
|
||||||
reqHeader := transferHToHeader(tc.request)
|
reqHeader := transferHToHeader(tc.request)
|
||||||
existing := transferHToBytesArr(tc.existing)
|
existing := transferHToBytesArr(tc.existing)
|
||||||
replaceMeta, replaceTagging := replaceDirective(reqHeader)
|
replaceMeta, replaceTagging := replaceDirective(reqHeader)
|
||||||
extends := processMetadataBytes(reqHeader, existing, replaceMeta, replaceTagging)
|
extends, _ := processMetadataBytes(reqHeader, existing, replaceMeta, replaceTagging)
|
||||||
|
|
||||||
result := transferBytesArrToH(extends)
|
result := transferBytesArrToH(extends)
|
||||||
fmtTagging(result, tc.want)
|
fmtTagging(result, tc.want)
|
||||||
|
|
|
@ -62,23 +62,12 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tags := tagging.ToTags()
|
tags := tagging.ToTags()
|
||||||
if len(tags) > 10 {
|
err = ValidateTags(tags)
|
||||||
glog.Errorf("PutObjectTaggingHandler tags %s: %d tags more than 10", r.URL, len(tags))
|
if err != nil {
|
||||||
|
glog.Errorf("PutObjectTaggingHandler ValidateTags error %s: %v", r.URL, err)
|
||||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for k, v := range tags {
|
|
||||||
if len(k) > 128 {
|
|
||||||
glog.Errorf("PutObjectTaggingHandler tags %s: tag key %s longer than 128", r.URL, k)
|
|
||||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(v) > 256 {
|
|
||||||
glog.Errorf("PutObjectTaggingHandler tags %s: tag value %s longer than 256", r.URL, v)
|
|
||||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = s3a.setTags(dir, name, tagging.ToTags()); err != nil {
|
if err = s3a.setTags(dir, name, tagging.ToTags()); err != nil {
|
||||||
if err == filer_pb.ErrNotFound {
|
if err == filer_pb.ErrNotFound {
|
||||||
|
|
|
@ -2,6 +2,9 @@ package s3api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Tag struct {
|
type Tag struct {
|
||||||
|
@ -37,3 +40,40 @@ func FromTags(tags map[string]string) (t *Tagging) {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseTagsHeader(tags string) (map[string]string, error) {
|
||||||
|
parsedTags := make(map[string]string)
|
||||||
|
for _, v := range strings.Split(tags, "&") {
|
||||||
|
tag := strings.Split(v, "=")
|
||||||
|
if len(tag) == 2 {
|
||||||
|
parsedTags[tag[0]] = tag[1]
|
||||||
|
} else if len(tag) == 1 {
|
||||||
|
parsedTags[tag[0]] = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return parsedTags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ValidateTags(tags map[string]string) error {
|
||||||
|
if len(tags) > 10 {
|
||||||
|
return fmt.Errorf("validate tags: %d tags more than 10", len(tags))
|
||||||
|
}
|
||||||
|
for k, v := range tags {
|
||||||
|
if len(k) > 128 {
|
||||||
|
return fmt.Errorf("validate tags: tag key longer than 128")
|
||||||
|
}
|
||||||
|
validateKey, err := regexp.MatchString(`^([\p{L}\p{Z}\p{N}_.:/=+\-@]*)$`, k)
|
||||||
|
if !validateKey || err != nil {
|
||||||
|
return fmt.Errorf("validate tags key %s error, incorrect key", k)
|
||||||
|
}
|
||||||
|
if len(v) > 256 {
|
||||||
|
return fmt.Errorf("validate tags: tag value longer than 256")
|
||||||
|
}
|
||||||
|
validateValue, err := regexp.MatchString(`^([\p{L}\p{Z}\p{N}_.:/=+\-@]*)$`, v)
|
||||||
|
if !validateValue || err != nil {
|
||||||
|
return fmt.Errorf("validate tags value %s error, incorrect value", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -50,3 +50,65 @@ func TestXMLMarshall(t *testing.T) {
|
||||||
assert.Equal(t, expected, actual)
|
assert.Equal(t, expected, actual)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TestTags map[string]string
|
||||||
|
|
||||||
|
var ValidateTagsTestCases = []struct {
|
||||||
|
testCaseID int
|
||||||
|
tags TestTags
|
||||||
|
wantErrString string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
1,
|
||||||
|
TestTags{"key-1": "value-1"},
|
||||||
|
"",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
2,
|
||||||
|
TestTags{"key-1": "valueOver256R59YI9bahPwAVqvLeKCvM2S1RjzgP8fNDKluCbol0XTTFY6VcMwTBmdnqjsddilXztSGfEoZS1wDAIMBA0rW0CLNSoE2zNg4TT0vDbLHEtZBoZjdZ5E0JNIAqwb9ptIk2VizYmhWjb1G4rJ0CqDGWxcy3usXaQg6Dk6kU8N4hlqwYWeGw7uqdghcQ3ScfF02nHW9QFMN7msLR5fe90mbFBBp3Tjq34i0LEr4By2vxoRa2RqdBhEJhi23Tm"},
|
||||||
|
"validate tags: tag value longer than 256",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
3,
|
||||||
|
TestTags{"keyLenOver128a5aUUGcPexMELsz3RyROzIzfO6BKABeApH2nbbagpOxZh2MgBWYDZtFxQaCuQeP1xR7dUJLwfFfDHguVIyxvTStGDk51BemKETIwZ0zkhR7lhfHBp2y0nFnV": "value-1"},
|
||||||
|
"validate tags: tag key longer than 128",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
4,
|
||||||
|
TestTags{"key-1*": "value-1"},
|
||||||
|
"validate tags key key-1* error, incorrect key",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
5,
|
||||||
|
TestTags{"key-1": "value-1?"},
|
||||||
|
"validate tags value value-1? error, incorrect value",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
6,
|
||||||
|
TestTags{
|
||||||
|
"key-1": "value",
|
||||||
|
"key-2": "value",
|
||||||
|
"key-3": "value",
|
||||||
|
"key-4": "value",
|
||||||
|
"key-5": "value",
|
||||||
|
"key-6": "value",
|
||||||
|
"key-7": "value",
|
||||||
|
"key-8": "value",
|
||||||
|
"key-9": "value",
|
||||||
|
"key-10": "value",
|
||||||
|
"key-11": "value",
|
||||||
|
},
|
||||||
|
"validate tags: 11 tags more than 10",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateTags(t *testing.T) {
|
||||||
|
for _, testCase := range ValidateTagsTestCases {
|
||||||
|
err := ValidateTags(testCase.tags)
|
||||||
|
if testCase.wantErrString == "" {
|
||||||
|
assert.NoErrorf(t, err, "no error")
|
||||||
|
} else {
|
||||||
|
assert.EqualError(t, err, testCase.wantErrString)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue