s3: add s3 copy

fix https://github.com/chrislusf/seaweedfs/issues/1190
This commit is contained in:
Chris Lu 2020-01-31 00:11:08 -08:00
parent 3b043ead49
commit a80ecbfe84
6 changed files with 225 additions and 6 deletions

View file

@ -41,6 +41,8 @@ const (
ErrInvalidPartNumberMarker
ErrInvalidPart
ErrInternalError
ErrInvalidCopyDest
ErrInvalidCopySource
ErrNotImplemented
)
@ -118,6 +120,18 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidCopyDest: {
Code: "InvalidRequest",
Description: "This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidCopySource: {
Code: "InvalidArgument",
Description: "Copy Source must mention the source bucket and key: sourcebucket/sourcekey.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrNotImplemented: {
Code: "NotImplemented",
Description: "A header you provided implies functionality that is not implemented",

View file

@ -0,0 +1,152 @@
package s3api
import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
dstBucket := vars["bucket"]
dstObject := getObject(vars)
// Copy source path.
cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
if err != nil {
// Save unescaped string as is.
cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
}
srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
writeErrorResponse(w, ErrInvalidCopySource, r.URL)
return
}
if srcBucket == dstBucket && srcObject == dstObject {
writeErrorResponse(w, ErrInvalidCopySource, r.URL)
return
}
dstUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
s3a.option.Filer, s3a.option.BucketsPath, dstBucket, dstObject, dstBucket)
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer, s3a.option.BucketsPath, srcBucket, srcObject)
_, _, dataReader, err := util.DownloadFile(srcUrl)
if err != nil {
writeErrorResponse(w, ErrInvalidCopySource, r.URL)
return
}
etag, errCode := s3a.putToFiler(r, dstUrl, dataReader)
println("srcUrl:", srcUrl)
println("dstUrl:", dstUrl)
if errCode != ErrNone {
writeErrorResponse(w, errCode, r.URL)
return
}
setEtag(w, etag)
response := CopyObjectResult{
ETag: etag,
LastModified: time.Now(),
}
writeSuccessResponseXML(w, encodeResponse(response))
}
func pathToBucketAndObject(path string) (bucket, object string) {
path = strings.TrimPrefix(path, "/")
parts := strings.SplitN(path, "/", 2)
if len(parts) == 2 {
return parts[0], "/" + parts[1]
}
return parts[0], "/"
}
type CopyPartResult struct {
LastModified time.Time `xml:"LastModified"`
ETag string `xml:"ETag"`
}
func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
// https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
vars := mux.Vars(r)
dstBucket := vars["bucket"]
// dstObject := getObject(vars)
// Copy source path.
cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
if err != nil {
// Save unescaped string as is.
cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
}
srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
writeErrorResponse(w, ErrInvalidCopySource, r.URL)
return
}
uploadID := r.URL.Query().Get("uploadId")
partIDString := r.URL.Query().Get("partNumber")
partID, err := strconv.Atoi(partIDString)
if err != nil {
writeErrorResponse(w, ErrInvalidPart, r.URL)
return
}
// check partID with maximum part ID for multipart objects
if partID > 10000 {
writeErrorResponse(w, ErrInvalidMaxParts, r.URL)
return
}
rangeHeader := r.Header.Get("x-amz-copy-source-range")
dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
s3a.option.Filer, s3a.genUploadsFolder(dstBucket), uploadID, partID-1, dstBucket)
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer, s3a.option.BucketsPath, srcBucket, srcObject)
dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, rangeHeader)
if err != nil {
writeErrorResponse(w, ErrInvalidCopySource, r.URL)
return
}
etag, errCode := s3a.putToFiler(r, dstUrl, dataReader)
if errCode != ErrNone {
writeErrorResponse(w, errCode, r.URL)
return
}
setEtag(w, etag)
response := CopyPartResult{
ETag: etag,
LastModified: time.Now(),
}
writeSuccessResponseXML(w, encodeResponse(response))
}

View file

@ -9,9 +9,10 @@ import (
"net/http"
"strings"
"github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/gorilla/mux"
)
var (

View file

@ -44,6 +44,8 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// HeadBucket
bucket.Methods("HEAD").HandlerFunc(s3a.HeadBucketHandler)
// CopyObjectPart
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(s3a.CopyObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// PutObjectPart
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// CompleteMultipartUpload
@ -57,6 +59,8 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// ListMultipartUploads
bucket.Methods("GET").HandlerFunc(s3a.ListMultipartUploadsHandler).Queries("uploads", "")
// CopyObject
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(s3a.CopyObjectHandler)
// PutObject
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectHandler)
// PutBucket
@ -77,11 +81,6 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// DeleteMultipleObjects
bucket.Methods("POST").HandlerFunc(s3a.DeleteMultipleObjectsHandler).Queries("delete", "")
/*
// CopyObject
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(s3a.CopyObjectHandler)
// CopyObjectPart
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(s3a.CopyObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// not implemented
// GetBucketLocation

32
weed/s3api/s3api_test.go Normal file
View file

@ -0,0 +1,32 @@
package s3api
import (
"testing"
"time"
)
func TestCopyObjectResponse(t *testing.T) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
response := CopyObjectResult{
ETag: "12345678",
LastModified: time.Now(),
}
println(string(encodeResponse(response)))
}
func TestCopyPartResponse(t *testing.T) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
response := CopyPartResult{
ETag: "12345678",
LastModified: time.Now(),
}
println(string(encodeResponse(response)))
}

View file

@ -286,3 +286,24 @@ func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte
}
}
func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
return nil, err
}
if rangeHeader != "" {
req.Header.Add("Range", rangeHeader)
}
r, err := client.Do(req)
if err != nil {
return nil, err
}
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
return r.Body, nil
}