fix the problem of metadata and tagging loss when files are copied

by adding processing of metadata and tagging in s3 api CopyObject (judging whether to copy or overwrite according to the directive header)
This commit is contained in:
shichanglin5 2022-05-13 18:14:39 +08:00
parent e41b11b004
commit 1166dead00
2 changed files with 132 additions and 8 deletions

View file

@ -29,9 +29,12 @@ const (
// S3 user-defined metadata
AmzUserMetaPrefix = "X-Amz-Meta-"
AmzUserMetaDirective = "X-Amz-Metadata-Directive"
// S3 object tagging
AmzObjectTagging = "X-Amz-Tagging"
AmzObjectTaggingPrefix = "X-Amz-Tagging-"
AmzObjectTaggingDirective = "X-Amz-Tagging-Directive"
AmzTagCount = "x-amz-tagging-count"
)

View file

@ -3,9 +3,10 @@ package s3api
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
headers "github.com/chrislusf/seaweedfs/weed/s3api/http"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
weed_server "github.com/chrislusf/seaweedfs/weed/server"
"modernc.org/strutil"
"net/http"
"net/url"
"strconv"
@ -30,7 +31,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject)
if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && isReplace(r) {
replaceMeta, replaceTagging := replaceDirective(r)
if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) {
fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
dir, name := fullPath.DirAndName()
entry, err := s3a.getEntry(dir, name)
@ -38,7 +41,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
entry.Extended = weed_server.SaveAmzMetaData(r, entry.Extended, isReplace(r))
entry.Extended = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
err = s3a.touch(dir, name, entry)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
@ -80,6 +83,11 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
}
defer util.CloseResponse(resp)
tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name)
if tagErr != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body)
@ -182,6 +190,119 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
}
func isReplace(r *http.Request) bool {
return r.Header.Get("X-Amz-Metadata-Directive") == "REPLACE"
func replaceDirective(r *http.Request) (replaceMeta, replaceTagging bool) {
return r.Header.Get(headers.AmzUserMetaDirective) == "REPLACE", r.Header.Get(headers.AmzObjectTaggingDirective) == "REPLACE"
}
func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) {
if sc := reqHeader.Get(xhttp.AmzStorageClass); len(sc) == 0 {
if sc := existing[xhttp.AmzStorageClass]; len(sc) > 0 {
reqHeader[xhttp.AmzStorageClass] = sc
}
}
if !replaceMeta {
for header, _ := range reqHeader {
if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) {
delete(reqHeader, header)
}
}
for k, v := range existing {
if strings.HasPrefix(k, xhttp.AmzUserMetaPrefix) {
reqHeader[k] = v
}
}
}
if replaceTagging {
if tags := reqHeader.Get(xhttp.AmzObjectTagging); tags != "" {
for _, v := range strings.Split(tags, "&") {
tag := strings.Split(v, "=")
if len(tag) == 2 {
reqHeader[xhttp.AmzObjectTagging+"-"+tag[0]] = []string{tag[1]}
} else if len(tag) == 1 {
reqHeader[xhttp.AmzObjectTagging+"-"+tag[0]] = []string{}
}
}
}
delete(reqHeader, xhttp.AmzObjectTagging)
} else {
for header, _ := range reqHeader {
if strings.HasPrefix(header, xhttp.AmzObjectTagging) {
delete(reqHeader, header)
}
}
found := false
for k, _ := range existing {
if strings.HasPrefix(k, xhttp.AmzObjectTaggingPrefix) {
found = true
break
}
}
if found {
tags, err := getTags(dir, name)
if err != nil {
return err
}
var tagArr []string
for k, v := range tags {
tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v))
}
tagStr := strutil.JoinFields(tagArr, "&")
reqHeader.Set(xhttp.AmzObjectTagging, tagStr)
}
}
return
}
func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte) {
metadata = make(map[string][]byte)
if sc := existing[xhttp.AmzStorageClass]; len(sc) > 0 {
metadata[xhttp.AmzStorageClass] = sc
}
if sc := reqHeader.Get(xhttp.AmzStorageClass); len(sc) > 0 {
metadata[xhttp.AmzStorageClass] = []byte(sc)
}
if replaceMeta {
for k, v := range existing {
if strings.HasPrefix(k, xhttp.AmzUserMetaPrefix) {
metadata[k] = v
}
}
} else {
for header, values := range reqHeader {
if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) {
for _, value := range values {
metadata[header] = []byte(value)
}
}
}
}
if replaceTagging {
if tags := reqHeader.Get(xhttp.AmzObjectTagging); tags != "" {
for _, v := range strings.Split(tags, "&") {
tag := strings.Split(v, "=")
if len(tag) == 2 {
metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
} else if len(tag) == 1 {
metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = nil
}
}
}
} else {
for k, v := range existing {
if strings.HasPrefix(k, xhttp.AmzObjectTagging) {
metadata[k] = v
}
}
delete(metadata, xhttp.AmzTagCount)
}
return
}