From 17e91d29179a41131e2a707ac2c5e63be4a21530 Mon Sep 17 00:00:00 2001 From: SmsS4 <36403983+SmsS4@users.noreply.github.com> Date: Tue, 16 May 2023 20:09:43 +0330 Subject: [PATCH] Use filerGroup for s3 buckets collection prefix (#4465) * Use filerGroup for s3 buckets collection prefix * Fix templates * Remove flags * Remove s3CollectionPrefix --- k8s/charts/seaweedfs/templates/filer-statefulset.yaml | 3 +++ k8s/charts/seaweedfs/values.yaml | 1 + weed/command/s3.go | 3 +++ weed/s3api/filer_util.go | 7 +++++++ weed/s3api/s3api_bucket_handlers.go | 6 +++--- weed/s3api/s3api_object_copy_handlers.go | 4 ++-- weed/s3api/s3api_object_handlers.go | 10 ++++++++-- weed/s3api/s3api_object_handlers_postpolicy.go | 2 +- weed/s3api/s3api_object_multipart_handlers.go | 2 +- weed/s3api/s3api_server.go | 1 + weed/shell/command_s3_bucket_delete.go | 2 +- weed/shell/command_s3_bucket_list.go | 2 +- weed/shell/command_s3_bucket_quota_check.go | 2 +- weed/shell/commands.go | 7 +++++++ 14 files changed, 40 insertions(+), 12 deletions(-) diff --git a/k8s/charts/seaweedfs/templates/filer-statefulset.yaml b/k8s/charts/seaweedfs/templates/filer-statefulset.yaml index 33e882f85..f6a6d1940 100644 --- a/k8s/charts/seaweedfs/templates/filer-statefulset.yaml +++ b/k8s/charts/seaweedfs/templates/filer-statefulset.yaml @@ -148,6 +148,9 @@ spec: -encryptVolumeData \ {{- end }} -ip=${POD_IP} \ + {{- if .Values.filer.filerGroup}} + -filerGroup={{ .Values.filer.filerGroup}} \ + {{- end }} {{- if .Values.filer.s3.enabled }} -s3 \ -s3.port={{ .Values.filer.s3.port }} \ diff --git a/k8s/charts/seaweedfs/values.yaml b/k8s/charts/seaweedfs/values.yaml index 1e2801e42..2a6a136fc 100644 --- a/k8s/charts/seaweedfs/values.yaml +++ b/k8s/charts/seaweedfs/values.yaml @@ -273,6 +273,7 @@ filer: grpcPort: 18888 metricsPort: 9327 loggingOverrideLevel: null + filerGroup: "" # replication type is XYZ: # X number of replica in other data centers # Y number of replica in other racks in the same data center diff --git a/weed/command/s3.go b/weed/command/s3.go index 39d1c6fce..8f82ac946 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -155,6 +155,7 @@ func (s3opt *S3Options) startS3Server() bool { filerAddress := pb.ServerAddress(*s3opt.filer) filerBucketsPath := "/buckets" + filerGroup := "" grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") @@ -169,6 +170,7 @@ func (s3opt *S3Options) startS3Server() bool { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } filerBucketsPath = resp.DirBuckets + filerGroup = resp.FilerGroup metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec) glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath) return nil @@ -200,6 +202,7 @@ func (s3opt *S3Options) startS3Server() bool { AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty, LocalFilerSocket: localFilerSocket, DataCenter: *s3opt.dataCenter, + FilerGroup: filerGroup, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index c2276b89a..8ae8f780a 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -107,6 +107,13 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_ return err } +func (s3a *S3ApiServer) getCollectionName(bucket string) string { + if s3a.option.FilerGroup != "" { + return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket) + } + return bucket +} + func objectKey(key *string) *string { if strings.HasPrefix(*key, "/") { t := (*key)[1:] diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 9a430b337..d4d81905d 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -104,7 +104,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) return fmt.Errorf("list collections: %v", err) } else { for _, c := range resp.Collections { - if bucket == c.Name { + if s3a.getCollectionName(bucket) == c.Name { errCode = s3err.ErrBucketAlreadyExists break } @@ -174,7 +174,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque // delete collection deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{ - Collection: bucket, + Collection: s3a.getCollectionName(bucket), } glog.V(1).Infof("delete collection: %v", deleteCollectionRequest) @@ -304,7 +304,7 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - ttls := fc.GetCollectionTtls(bucket) + ttls := fc.GetCollectionTtls(s3a.getCollectionName(bucket)) if len(ttls) == 0 { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration) return diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go index 881d840ee..8dc33f213 100644 --- a/weed/s3api/s3api_object_copy_handlers.go +++ b/weed/s3api/s3api_object_copy_handlers.go @@ -100,7 +100,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject) - etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination) + etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination, dstBucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -185,7 +185,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject) - etag, errCode := s3a.putToFiler(r, dstUrl, dataReader, destination) + etag, errCode := s3a.putToFiler(r, dstUrl, dataReader, destination, dstBucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 059ee41b1..33de2a13e 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -115,7 +115,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) dataReader = mimeDetect(r, dataReader) } - etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "") + etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -457,7 +457,7 @@ func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (s return statusCode } -func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string) (etag string, code s3err.ErrorCode) { +func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string) (etag string, code s3err.ErrorCode) { hash := md5.New() var body = io.TeeReader(dataReader, hash) @@ -474,6 +474,12 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination) } + if s3a.option.FilerGroup != "" { + query := proxyReq.URL.Query() + query.Add("collection", s3a.getCollectionName(bucket)) + proxyReq.URL.RawQuery = query.Encode() + } + for header, values := range r.Header { for _, value := range values { proxyReq.Header.Add(header, value) diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index 5acd61ff4..8dd3900ed 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -115,7 +115,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object)) - etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody, "") + etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody, "", bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 24d7656b5..187022079 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -255,7 +255,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ } destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination) + etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index a8816424d..66d176010 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -31,6 +31,7 @@ type S3ApiServerOption struct { AllowDeleteBucketNotEmpty bool LocalFilerSocket string DataCenter string + FilerGroup string } type S3ApiServer struct { diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go index d0b4cb505..3b2a9c6e9 100644 --- a/weed/shell/command_s3_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -54,7 +54,7 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer // delete the collection directly first err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ - Name: *bucketName, + Name: getCollectionName(commandEnv, *bucketName), }) return err }) diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go index 3b607a799..bf21a3a29 100644 --- a/weed/shell/command_s3_bucket_list.go +++ b/weed/shell/command_s3_bucket_list.go @@ -57,7 +57,7 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i if !entry.IsDirectory { return nil } - collection := entry.Name + collection := getCollectionName(commandEnv, entry.Name) var collectionSize, fileCount float64 if collectionInfo, found := collectionInfos[collection]; found { collectionSize = collectionInfo.Size diff --git a/weed/shell/command_s3_bucket_quota_check.go b/weed/shell/command_s3_bucket_quota_check.go index 27f0aaee5..bc0d838f7 100644 --- a/weed/shell/command_s3_bucket_quota_check.go +++ b/weed/shell/command_s3_bucket_quota_check.go @@ -65,7 +65,7 @@ func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, if !entry.IsDirectory { return nil } - collection := entry.Name + collection := getCollectionName(commandEnv, entry.Name) var collectionSize float64 if collectionInfo, found := collectionInfos[collection]; found { collectionSize = collectionInfo.Size diff --git a/weed/shell/commands.go b/weed/shell/commands.go index e67394e42..b1722edfb 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -184,3 +184,10 @@ func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.Serv ) return } + +func getCollectionName(commandEnv *CommandEnv, bucket string) string { + if *commandEnv.option.FilerGroup != "" { + return fmt.Sprintf("%s_%s", *commandEnv.option.FilerGroup, bucket) + } + return bucket +}