From da3d330616663e1c843514bd7a7a2c1833bf42d1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 7 Mar 2022 02:00:14 -0800 Subject: [PATCH] s3 and filer transport using unix domain socket instead of tcp --- weed/command/filer.go | 21 +++++++++++++++++++++ weed/command/s3.go | 2 ++ weed/command/server.go | 2 ++ weed/s3api/s3api_object_handlers.go | 15 ++------------- weed/s3api/s3api_server.go | 18 ++++++++++++++++++ 5 files changed, 45 insertions(+), 13 deletions(-) diff --git a/weed/command/filer.go b/weed/command/filer.go index f886f1258..07fedd72d 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "net" "net/http" "os" "time" @@ -51,6 +52,7 @@ type FilerOptions struct { concurrentUploadLimitMB *int debug *bool debugPort *int + localSocket *string } func init() { @@ -76,6 +78,7 @@ func init() { f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:/debug/pprof/goroutine?debug=2") f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") + f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-.sock") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -139,11 +142,14 @@ func runFiler(cmd *Command, args []string) bool { if *filerStartS3 { filerS3Options.filer = &filerAddress filerS3Options.bindIp = f.bindIp + filerS3Options.localFilerSocket = f.localSocket go func() { time.Sleep(startDelay * time.Second) filerS3Options.startS3Server() }() startDelay++ + } else { + f.localSocket = nil } if *filerStartWebDav { @@ -230,6 +236,18 @@ func (fo *FilerOptions) startFiler() { glog.Fatalf("Filer listener error: %v", e) } + // start on local unix socket + if *fo.localSocket == "" { + *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port) + if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error()) + } + } + filerSocketListener, err := net.Listen("unix", *fo.localSocket) + if err != nil { + glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err) + } + // starting grpc server grpcPort := *fo.portGrpc grpcL, err := util.NewListener(util.JoinHostPort(*fo.bindIp, grpcPort), 0) @@ -242,6 +260,9 @@ func (fo *FilerOptions) startFiler() { go grpcS.Serve(grpcL) httpS := &http.Server{Handler: defaultMux} + go func() { + httpS.Serve(filerSocketListener) + }() if err := httpS.Serve(filerListener); err != nil { glog.Fatalf("Filer Fail to serve: %v", e) } diff --git a/weed/command/s3.go b/weed/command/s3.go index ee726fcec..3ce48ccb8 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -34,6 +34,7 @@ type S3Options struct { metricsHttpPort *int allowEmptyFolder *bool auditLogConfig *string + localFilerSocket *string } func init() { @@ -184,6 +185,7 @@ func (s3opt *S3Options) startS3Server() bool { BucketsPath: filerBucketsPath, GrpcDialOption: grpcDialOption, AllowEmptyFolder: *s3opt.allowEmptyFolder, + LocalFilerSocket: s3opt.localFilerSocket, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/server.go b/weed/command/server.go index 1c0927c76..45fb80b7a 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -112,6 +112,7 @@ func init() { filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size") + filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-.sock") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") @@ -245,6 +246,7 @@ func runServer(cmd *Command, args []string) bool { if *isStartingS3 { go func() { time.Sleep(2 * time.Second) + s3Options.localFilerSocket = filerOptions.localSocket s3Options.startS3Server() }() } diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index fb330b471..6bcf2266f 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -27,17 +27,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -var ( - client *http.Client -) - -func init() { - client = &http.Client{Transport: &http.Transport{ - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} -} - func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { mimeBuffer := make([]byte, 512) size, _ := dataReader.Read(mimeBuffer) @@ -335,7 +324,7 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des // ensure that the Authorization header is overriding any previous // Authorization header which might be already present in proxyReq s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite) - resp, postErr := client.Do(proxyReq) + resp, postErr := s3a.client.Do(proxyReq) if postErr != nil { glog.Errorf("post to filer: %v", postErr) @@ -401,7 +390,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader // ensure that the Authorization header is overriding any previous // Authorization header which might be already present in proxyReq s3a.maybeAddFilerJwtAuthorization(proxyReq, true) - resp, postErr := client.Do(proxyReq) + resp, postErr := s3a.client.Do(proxyReq) if postErr != nil { glog.Errorf("post to filer: %v", postErr) diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index fc5c47c43..fe069595d 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -1,7 +1,9 @@ package s3api import ( + "context" "fmt" + "net" "net/http" "strings" "time" @@ -24,6 +26,7 @@ type S3ApiServerOption struct { BucketsPath string GrpcDialOption grpc.DialOption AllowEmptyFolder bool + LocalFilerSocket *string } type S3ApiServer struct { @@ -31,6 +34,7 @@ type S3ApiServer struct { iam *IdentityAccessManagement randomClientId int32 filerGuard *security.Guard + client *http.Client } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -49,6 +53,20 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer randomClientId: util.RandomInt32(), filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), } + if option.LocalFilerSocket == nil { + s3ApiServer.client = &http.Client{Transport: &http.Transport{ + MaxIdleConns: 1024, + MaxIdleConnsPerHost: 1024, + }} + } else { + s3ApiServer.client = &http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", *option.LocalFilerSocket) + }, + }, + } + } s3ApiServer.registerRouter(router)