partially done

This commit is contained in:
Chris Lu 2020-02-10 20:23:04 -08:00
parent 29945fad51
commit ac3fc92256
24 changed files with 1095 additions and 121 deletions

5
go.mod
View file

@ -9,6 +9,7 @@ require (
github.com/DataDog/zstd v1.4.1 // indirect github.com/DataDog/zstd v1.4.1 // indirect
github.com/Shopify/sarama v1.23.1 github.com/Shopify/sarama v1.23.1
github.com/aws/aws-sdk-go v1.23.13 github.com/aws/aws-sdk-go v1.23.13
github.com/chrislusf/glog v0.0.0-20180213072942-e58d74c04389
github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92 github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92
github.com/coreos/etcd v3.3.15+incompatible // indirect github.com/coreos/etcd v3.3.15+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect
@ -36,7 +37,8 @@ require (
github.com/jcmturner/gofork v1.0.0 // indirect github.com/jcmturner/gofork v1.0.0 // indirect
github.com/karlseguin/ccache v2.0.3+incompatible github.com/karlseguin/ccache v2.0.3+incompatible
github.com/karlseguin/expect v1.0.1 // indirect github.com/karlseguin/expect v1.0.1 // indirect
github.com/klauspost/cpuid v1.2.1 // indirect github.com/klauspost/compress v1.9.8 // indirect
github.com/klauspost/cpuid v1.2.3 // indirect
github.com/klauspost/crc32 v1.2.0 github.com/klauspost/crc32 v1.2.0
github.com/klauspost/reedsolomon v1.9.2 github.com/klauspost/reedsolomon v1.9.2
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
@ -76,6 +78,7 @@ require (
github.com/uber-go/atomic v1.4.0 // indirect github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.17.0+incompatible // indirect github.com/uber/jaeger-client-go v2.17.0+incompatible // indirect
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
github.com/valyala/fasthttp v1.9.0
github.com/willf/bitset v1.1.10 // indirect github.com/willf/bitset v1.1.10 // indirect
github.com/willf/bloom v2.0.3+incompatible github.com/willf/bloom v2.0.3+incompatible
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 // indirect github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 // indirect

14
go.sum
View file

@ -64,6 +64,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR
github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4= github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chrislusf/glog v0.0.0-20180213072942-e58d74c04389 h1:05igzs+7KihLMjeY2TiINo4eE1kqgEKGpL+5XhFXtHs=
github.com/chrislusf/glog v0.0.0-20180213072942-e58d74c04389/go.mod h1:LBseU6dj13CwRT1XVQQKuT+dRXuzClILSvalxNpFao0=
github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92 h1:lM9SFsh0EPXkyJyrTJqLZPAIJBtNFP6LNkYXu2MnSZI= github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92 h1:lM9SFsh0EPXkyJyrTJqLZPAIJBtNFP6LNkYXu2MnSZI=
github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92/go.mod h1:4jyiUCD5y548+yKW+oiHtccBiMaLCCbFBpK2t7X4eUo= github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92/go.mod h1:4jyiUCD5y548+yKW+oiHtccBiMaLCCbFBpK2t7X4eUo=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
@ -282,9 +284,15 @@ github.com/karlseguin/expect v1.0.1/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHY
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/crc32 v1.2.0 h1:0VuyqOCruD33/lJ/ojXNvzVyl8Zr5zdTmj9l9qLZ86I= github.com/klauspost/crc32 v1.2.0 h1:0VuyqOCruD33/lJ/ojXNvzVyl8Zr5zdTmj9l9qLZ86I=
github.com/klauspost/crc32 v1.2.0/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/crc32 v1.2.0/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/reedsolomon v1.9.2 h1:E9CMS2Pqbv+C7tsrYad4YC9MfhnMVWhMRsTi7U0UB18= github.com/klauspost/reedsolomon v1.9.2 h1:E9CMS2Pqbv+C7tsrYad4YC9MfhnMVWhMRsTi7U0UB18=
@ -545,6 +553,11 @@ github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d h1:ggUgChAeyge4NZ4
github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg= github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.9.0 h1:hNpmUdy/+ZXYpGy0OBfm7K0UQTzb73W0T0U4iJIVrMw=
github.com/valyala/fasthttp v1.9.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc= github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc=
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA= github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA=
@ -626,6 +639,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc= golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=

View file

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/valyala/fasthttp"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
@ -165,6 +166,8 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.fileSizeLimitMB, *v.fileSizeLimitMB,
) )
go fasthttp.ListenAndServe(":8081", volumeServer.HandleFastHTTP)
// starting grpc server // starting grpc server
grpcS := v.startGrpcService(volumeServer) grpcS := v.startGrpcService(volumeServer)

View file

@ -1,12 +1,15 @@
package security package security
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"strings" "strings"
"github.com/valyala/fasthttp"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
) )
@ -62,7 +65,7 @@ func NewGuard(whiteList []string, signingKey string, expiresAfterSec int, readSi
return g return g
} }
func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { func (g *Guard) OldWhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
if !g.isWriteActive { if !g.isWriteActive {
//if no security needed, just skip all checking //if no security needed, just skip all checking
return f return f
@ -76,6 +79,20 @@ func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w
} }
} }
func (g *Guard) WhiteList(f func(ctx *fasthttp.RequestCtx)) func(ctx *fasthttp.RequestCtx) {
if !g.isWriteActive {
//if no security needed, just skip all checking
return f
}
return func(ctx *fasthttp.RequestCtx) {
if err := g.fastCheckWhiteList(ctx); err != nil {
ctx.SetStatusCode(http.StatusUnauthorized)
return
}
f(ctx)
}
}
func GetActualRemoteHost(r *http.Request) (host string, err error) { func GetActualRemoteHost(r *http.Request) (host string, err error) {
host = r.Header.Get("HTTP_X_FORWARDED_FOR") host = r.Header.Get("HTTP_X_FORWARDED_FOR")
if host == "" { if host == "" {
@ -90,6 +107,22 @@ func GetActualRemoteHost(r *http.Request) (host string, err error) {
return return
} }
func FastGetActualRemoteHost(ctx *fasthttp.RequestCtx) (theHost string, err error) {
host := ctx.Request.Header.Peek("HTTP_X_FORWARDED_FOR")
if host == nil {
host = ctx.Request.Header.Peek("X-FORWARDED-FOR")
}
commaIndex := bytes.IndexByte(host, ',')
if commaIndex >= 0 {
host = host[0:commaIndex]
}
if host == nil {
shost, _, serr := net.SplitHostPort(ctx.RemoteAddr().String())
return shost, serr
}
return string(host), nil
}
func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error { func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error {
if len(g.whiteList) == 0 { if len(g.whiteList) == 0 {
return nil return nil
@ -125,3 +158,39 @@ func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error {
glog.V(0).Infof("Not in whitelist: %s", r.RemoteAddr) glog.V(0).Infof("Not in whitelist: %s", r.RemoteAddr)
return fmt.Errorf("Not in whitelis: %s", r.RemoteAddr) return fmt.Errorf("Not in whitelis: %s", r.RemoteAddr)
} }
func (g *Guard) fastCheckWhiteList(ctx *fasthttp.RequestCtx) error {
if len(g.whiteList) == 0 {
return nil
}
host, err := FastGetActualRemoteHost(ctx)
if err == nil {
for _, ip := range g.whiteList {
// If the whitelist entry contains a "/" it
// is a CIDR range, and we should check the
// remote host is within it
if strings.Contains(ip, "/") {
_, cidrnet, err := net.ParseCIDR(ip)
if err != nil {
panic(err)
}
remote := net.ParseIP(host)
if cidrnet.Contains(remote) {
return nil
}
}
//
// Otherwise we're looking for a literal match.
//
if ip == host {
return nil
}
}
}
glog.V(0).Infof("Not in whitelist: %s", ctx.RemoteAddr())
return fmt.Errorf("Not in whitelis: %s", ctx.RemoteAddr())
}

View file

@ -1,13 +1,16 @@
package security package security
import ( import (
"bytes"
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
"time" "time"
"github.com/dgrijalva/jwt-go"
"github.com/valyala/fasthttp"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
jwt "github.com/dgrijalva/jwt-go"
) )
type EncodedJwt string type EncodedJwt string
@ -39,7 +42,7 @@ func GenJwt(signingKey SigningKey, expiresAfterSec int, fileId string) EncodedJw
return EncodedJwt(encoded) return EncodedJwt(encoded)
} }
func GetJwt(r *http.Request) EncodedJwt { func OldGetJwt(r *http.Request) EncodedJwt {
// Get token from query params // Get token from query params
tokenStr := r.URL.Query().Get("jwt") tokenStr := r.URL.Query().Get("jwt")
@ -55,6 +58,22 @@ func GetJwt(r *http.Request) EncodedJwt {
return EncodedJwt(tokenStr) return EncodedJwt(tokenStr)
} }
func GetJwt(ctx *fasthttp.RequestCtx) EncodedJwt {
// Get token from query params
tokenStr := ctx.FormValue("jwt")
// Get token from authorization header
if tokenStr == nil {
bearer := ctx.Request.Header.Peek("Authorization")
if len(bearer) > 7 && string(bytes.ToUpper(bearer[0:6])) == "BEARER" {
tokenStr = bearer[7:]
}
}
return EncodedJwt(tokenStr)
}
func DecodeJwt(signingKey SigningKey, tokenString EncodedJwt) (token *jwt.Token, err error) { func DecodeJwt(signingKey SigningKey, tokenString EncodedJwt) (token *jwt.Token, err error) {
// check exp, nbf // check exp, nbf
return jwt.ParseWithClaims(string(tokenString), &SeaweedFileIdClaims{}, func(token *jwt.Token) (interface{}, error) { return jwt.ParseWithClaims(string(tokenString), &SeaweedFileIdClaims{}, func(token *jwt.Token) (interface{}, error) {

View file

@ -11,6 +11,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/valyala/fasthttp"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -35,7 +36,47 @@ func init() {
statikFS, _ = statik.New() statikFS, _ = statik.New()
} }
func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
func writeJson(ctx *fasthttp.RequestCtx, httpStatus int, obj interface{}) (err error) {
var bytes []byte
if ctx.FormValue("pretty") != nil {
bytes, err = json.MarshalIndent(obj, "", " ")
} else {
bytes, err = json.Marshal(obj)
}
if err != nil {
return
}
callback := ctx.FormValue("callback")
if callback == nil {
ctx.Response.Header.Set("Content-Type", "application/json")
ctx.SetStatusCode(httpStatus)
if httpStatus == http.StatusNotModified {
return
}
_, err = ctx.Response.BodyWriter().Write(bytes)
} else {
ctx.Response.Header.Set("Content-Type", "application/javascript")
ctx.SetStatusCode(httpStatus)
if httpStatus == http.StatusNotModified {
return
}
if _, err = ctx.Response.BodyWriter().Write([]uint8(callback)); err != nil {
return
}
if _, err = ctx.Response.BodyWriter().Write([]uint8("(")); err != nil {
return
}
fmt.Fprint(ctx.Response.BodyWriter(), string(bytes))
if _, err = ctx.Response.BodyWriter().Write([]uint8(")")); err != nil {
return
}
}
return
}
func oldWriteJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
var bytes []byte var bytes []byte
if r.FormValue("pretty") != "" { if r.FormValue("pretty") != "" {
bytes, err = json.MarshalIndent(obj, "", " ") bytes, err = json.MarshalIndent(obj, "", " ")
@ -74,17 +115,32 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter
return return
} }
// wrapper for writeJson - just logs errors // wrapper for oldWriteJson - just logs errors
func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) { func oldWriteJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) {
if err := writeJson(w, r, httpStatus, obj); err != nil { if err := oldWriteJson(w, r, httpStatus, obj); err != nil {
glog.V(0).Infof("error writing JSON status %d: %v", httpStatus, err) glog.V(0).Infof("error writing JSON status %d: %v", httpStatus, err)
glog.V(1).Infof("JSON content: %+v", obj) glog.V(1).Infof("JSON content: %+v", obj)
} }
} }
func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
// wrapper for oldWriteJson - just logs errors
func writeJsonQuiet(ctx *fasthttp.RequestCtx, httpStatus int, obj interface{}) {
if err := writeJson(ctx, httpStatus, obj); err != nil {
glog.V(0).Infof("error writing JSON status %d: %v", httpStatus, err)
glog.V(1).Infof("JSON content: %+v", obj)
}
}
func oldWriteJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["error"] = err.Error() m["error"] = err.Error()
writeJsonQuiet(w, r, httpStatus, m) oldWriteJsonQuiet(w, r, httpStatus, m)
}
func writeJsonError(ctx *fasthttp.RequestCtx, httpStatus int, err error) {
m := make(map[string]interface{})
m["error"] = err.Error()
writeJsonQuiet(ctx, httpStatus, m)
} }
func debug(params ...interface{}) { func debug(params ...interface{}) {
@ -94,14 +150,14 @@ func debug(params ...interface{}) {
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) { func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) {
m := make(map[string]interface{}) m := make(map[string]interface{})
if r.Method != "POST" { if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) oldWriteJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
return return
} }
debug("parsing upload file...") debug("parsing upload file...")
fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r, 256*1024*1024) fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.OldParseUpload(r, 256*1024*1024)
if pe != nil { if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe) oldWriteJsonError(w, r, http.StatusBadRequest, pe)
return return
} }
@ -111,7 +167,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
if r.FormValue("count") != "" { if r.FormValue("count") != "" {
count, pe = strconv.ParseUint(r.FormValue("count"), 10, 32) count, pe = strconv.ParseUint(r.FormValue("count"), 10, 32)
if pe != nil { if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe) oldWriteJsonError(w, r, http.StatusBadRequest, pe)
return return
} }
} }
@ -124,7 +180,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
} }
assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar) assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
if ae != nil { if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae) oldWriteJsonError(w, r, http.StatusInternalServerError, ae)
return return
} }
@ -136,7 +192,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
debug("upload file to store", url) debug("upload file to store", url)
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, assignResult.Auth) uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, assignResult.Auth)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) oldWriteJsonError(w, r, http.StatusInternalServerError, err)
return return
} }
@ -145,7 +201,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid
m["size"] = originalDataSize m["size"] = originalDataSize
m["eTag"] = uploadResult.ETag m["eTag"] = uploadResult.ETag
writeJsonQuiet(w, r, http.StatusCreated, m) oldWriteJsonQuiet(w, r, http.StatusCreated, m)
return return
} }
@ -185,20 +241,20 @@ func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly b
func statsHealthHandler(w http.ResponseWriter, r *http.Request) { func statsHealthHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Version"] = util.VERSION m["Version"] = util.VERSION
writeJsonQuiet(w, r, http.StatusOK, m) oldWriteJsonQuiet(w, r, http.StatusOK, m)
} }
func statsCounterHandler(w http.ResponseWriter, r *http.Request) { func statsCounterHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Version"] = util.VERSION m["Version"] = util.VERSION
m["Counters"] = serverStats m["Counters"] = serverStats
writeJsonQuiet(w, r, http.StatusOK, m) oldWriteJsonQuiet(w, r, http.StatusOK, m)
} }
func statsMemoryHandler(w http.ResponseWriter, r *http.Request) { func statsMemoryHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Version"] = util.VERSION m["Version"] = util.VERSION
m["Memory"] = stats.MemStat() m["Memory"] = stats.MemStat()
writeJsonQuiet(w, r, http.StatusOK, m) oldWriteJsonQuiet(w, r, http.StatusOK, m)
} }
func handleStaticResources(defaultMux *http.ServeMux) { func handleStaticResources(defaultMux *http.ServeMux) {

View file

@ -69,7 +69,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
if r.Method == "HEAD" { if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat)) w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
setEtag(w, filer2.ETag(entry.Chunks)) oldSetEtag(w, filer2.ETag(entry.Chunks))
return return
} }
@ -122,7 +122,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request,
resp, do_err := util.Do(request) resp, do_err := util.Do(request)
if do_err != nil { if do_err != nil {
glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
writeJsonError(w, r, http.StatusInternalServerError, do_err) oldWriteJsonError(w, r, http.StatusInternalServerError, do_err)
return return
} }
defer func() { defer func() {
@ -150,7 +150,7 @@ func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Reque
if mimeType != "" { if mimeType != "" {
w.Header().Set("Content-Type", mimeType) w.Header().Set("Content-Type", mimeType)
} }
setEtag(w, filer2.ETag(entry.Chunks)) oldSetEtag(w, filer2.ETag(entry.Chunks))
totalSize := int64(filer2.TotalSize(entry.Chunks)) totalSize := int64(filer2.TotalSize(entry.Chunks))

View file

@ -52,7 +52,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
glog.V(4).Infof("listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries)) glog.V(4).Infof("listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries))
if r.Header.Get("Accept") == "application/json" { if r.Header.Get("Accept") == "application/json" {
writeJsonQuiet(w, r, http.StatusOK, struct { oldWriteJsonQuiet(w, r, http.StatusOK, struct {
Path string Path string
Entries interface{} Entries interface{}
Limit int Limit int

View file

@ -65,7 +65,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil { if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae) glog.Errorf("failing to assign a file id: %v", ae)
writeJsonError(w, r, http.StatusInternalServerError, ae) oldWriteJsonError(w, r, http.StatusInternalServerError, ae)
err = ae err = ae
return return
} }
@ -135,8 +135,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
Fid: fileId, Fid: fileId,
Url: urlLocation, Url: urlLocation,
} }
setEtag(w, ret.ETag) oldSetEtag(w, ret.ETag)
writeJsonQuiet(w, r, http.StatusCreated, reply) oldWriteJsonQuiet(w, r, http.StatusCreated, reply)
} }
// update metadata in filer store // update metadata in filer store
@ -196,7 +196,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil { if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
fs.filer.DeleteChunks(entry.Chunks) fs.filer.DeleteChunks(entry.Chunks)
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
writeJsonError(w, r, http.StatusInternalServerError, dbErr) oldWriteJsonError(w, r, http.StatusInternalServerError, dbErr)
err = dbErr err = dbErr
return return
} }
@ -228,7 +228,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
resp, doErr := util.Do(request) resp, doErr := util.Do(request)
if doErr != nil { if doErr != nil {
glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
writeJsonError(w, r, http.StatusInternalServerError, doErr) oldWriteJsonError(w, r, http.StatusInternalServerError, doErr)
err = doErr err = doErr
return return
} }
@ -240,7 +240,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
respBody, raErr := ioutil.ReadAll(resp.Body) respBody, raErr := ioutil.ReadAll(resp.Body)
if raErr != nil { if raErr != nil {
glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
writeJsonError(w, r, http.StatusInternalServerError, raErr) oldWriteJsonError(w, r, http.StatusInternalServerError, raErr)
err = raErr err = raErr
return return
} }
@ -248,14 +248,14 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
unmarshalErr := json.Unmarshal(respBody, &ret) unmarshalErr := json.Unmarshal(respBody, &ret)
if unmarshalErr != nil { if unmarshalErr != nil {
glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) oldWriteJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
err = unmarshalErr err = unmarshalErr
return return
} }
if ret.Error != "" { if ret.Error != "" {
err = errors.New(ret.Error) err = errors.New(ret.Error)
glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
writeJsonError(w, r, http.StatusInternalServerError, err) oldWriteJsonError(w, r, http.StatusInternalServerError, err)
return return
} }
// find correct final path // find correct final path
@ -267,7 +267,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
err = fmt.Errorf("can not to write to folder %s without a file name", path) err = fmt.Errorf("can not to write to folder %s without a file name", path)
fs.filer.DeleteFileByFileId(fileId) fs.filer.DeleteFileByFileId(fileId)
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
writeJsonError(w, r, http.StatusInternalServerError, err) oldWriteJsonError(w, r, http.StatusInternalServerError, err)
return return
} }
} }
@ -299,7 +299,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
if err == filer2.ErrNotFound { if err == filer2.ErrNotFound {
httpStatus = http.StatusNotFound httpStatus = http.StatusNotFound
} }
writeJsonError(w, r, httpStatus, err) oldWriteJsonError(w, r, httpStatus, err)
return return
} }

View file

@ -57,9 +57,9 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter) reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) oldWriteJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil { } else if reply != nil {
writeJsonQuiet(w, r, http.StatusCreated, reply) oldWriteJsonQuiet(w, r, http.StatusCreated, reply)
} }
return true return true
} }

View file

@ -107,17 +107,17 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
handleStaticResources2(r) handleStaticResources2(r)
r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler)) r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
r.HandleFunc("/ui/index.html", ms.uiStatusHandler) r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.OldWhiteList(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler)) r.HandleFunc("/dir/lookup", ms.guard.OldWhiteList(ms.dirLookupHandler))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.OldWhiteList(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.OldWhiteList(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.OldWhiteList(ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.OldWhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.OldWhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) r.HandleFunc("/submit", ms.guard.OldWhiteList(ms.submitFromMasterServerHandler))
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) r.HandleFunc("/stats/health", ms.guard.OldWhiteList(statsHealthHandler))
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) r.HandleFunc("/stats/counter", ms.guard.OldWhiteList(statsCounterHandler))
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) r.HandleFunc("/stats/memory", ms.guard.OldWhiteList(statsMemoryHandler))
r.HandleFunc("/{fileId}", ms.redirectHandler) r.HandleFunc("/{fileId}", ms.redirectHandler)
} }
@ -157,7 +157,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
defer func() { <-ms.bounedLeaderChan }() defer func() { <-ms.bounedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, oldWriteJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err))
return return
} }
@ -175,7 +175,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.ServeHTTP(w, r) proxy.ServeHTTP(w, r)
} else { } else {
// drop it to the floor // drop it to the floor
// writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) // oldWriteJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
} }
} }
} }

View file

@ -55,7 +55,7 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
isRead := forRead == "yes" isRead := forRead == "yes"
ms.maybeAddJwtAuthorization(w, fileId, !isRead) ms.maybeAddJwtAuthorization(w, fileId, !isRead)
} }
writeJsonQuiet(w, r, httpStatus, location) oldWriteJsonQuiet(w, r, httpStatus, location)
} }
// findVolumeLocation finds the volume location from master topo if it is leader, // findVolumeLocation finds the volume location from master topo if it is leader,
@ -107,20 +107,20 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
option, err := ms.getVolumeGrowOption(r) option, err := ms.getVolumeGrowOption(r)
if err != nil { if err != nil {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) oldWriteJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
return return
} }
if !ms.Topo.HasWritableVolume(option) { if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.FreeSpace() <= 0 { if ms.Topo.FreeSpace() <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"}) oldWriteJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"})
return return
} }
ms.vgLock.Lock() ms.vgLock.Lock()
defer ms.vgLock.Unlock() defer ms.vgLock.Unlock()
if !ms.Topo.HasWritableVolume(option) { if !ms.Topo.HasWritableVolume(option) {
if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, writableVolumeCount); err != nil { if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, writableVolumeCount); err != nil {
writeJsonError(w, r, http.StatusInternalServerError, oldWriteJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Cannot grow volume group! %v", err)) fmt.Errorf("Cannot grow volume group! %v", err))
return return
} }
@ -129,9 +129,9 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
if err == nil { if err == nil {
ms.maybeAddJwtAuthorization(w, fid, true) ms.maybeAddJwtAuthorization(w, fid, true)
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) oldWriteJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
} else { } else {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) oldWriteJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
} }
} }

View file

@ -21,7 +21,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
collectionName := r.FormValue("collection") collectionName := r.FormValue("collection")
collection, ok := ms.Topo.FindCollection(collectionName) collection, ok := ms.Topo.FindCollection(collectionName)
if !ok { if !ok {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName)) oldWriteJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
return return
} }
for _, server := range collection.ListVolumeServers() { for _, server := range collection.ListVolumeServers() {
@ -32,7 +32,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return deleteErr return deleteErr
}) })
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) oldWriteJsonError(w, r, http.StatusInternalServerError, err)
return return
} }
} }
@ -46,7 +46,7 @@ func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request)
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Version"] = util.VERSION m["Version"] = util.VERSION
m["Topology"] = ms.Topo.ToMap() m["Topology"] = ms.Topo.ToMap()
writeJsonQuiet(w, r, http.StatusOK, m) oldWriteJsonQuiet(w, r, http.StatusOK, m)
} }
func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
@ -57,7 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
gcThreshold, err = strconv.ParseFloat(gcString, 32) gcThreshold, err = strconv.ParseFloat(gcString, 32)
if err != nil { if err != nil {
glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err) glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString)) oldWriteJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString))
return return
} }
} }
@ -70,7 +70,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
count := 0 count := 0
option, err := ms.getVolumeGrowOption(r) option, err := ms.getVolumeGrowOption(r)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusNotAcceptable, err) oldWriteJsonError(w, r, http.StatusNotAcceptable, err)
return return
} }
@ -85,9 +85,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
} }
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusNotAcceptable, err) oldWriteJsonError(w, r, http.StatusNotAcceptable, err)
} else { } else {
writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count}) oldWriteJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count})
} }
} }
@ -95,7 +95,7 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Version"] = util.VERSION m["Version"] = util.VERSION
m["Volumes"] = ms.Topo.ToVolumeMap() m["Volumes"] = ms.Topo.ToVolumeMap()
writeJsonQuiet(w, r, http.StatusOK, m) oldWriteJsonQuiet(w, r, http.StatusOK, m)
} }
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
@ -112,7 +112,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
} }
http.Redirect(w, r, url, http.StatusMovedPermanently) http.Redirect(w, r, url, http.StatusMovedPermanently)
} else { } else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error)) oldWriteJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
} }
} }
@ -128,7 +128,7 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *
} else { } else {
masterUrl, err := ms.Topo.Leader() masterUrl, err := ms.Topo.Leader()
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) oldWriteJsonError(w, r, http.StatusInternalServerError, err)
} else { } else {
submitForClientHandler(w, r, masterUrl, ms.grpcDialOption) submitForClientHandler(w, r, masterUrl, ms.grpcDialOption)
} }

View file

@ -18,5 +18,5 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
if leader, e := s.topo.Leader(); e == nil { if leader, e := s.topo.Leader(); e == nil {
ret.Leader = leader ret.Leader = leader
} }
writeJsonQuiet(w, r, http.StatusOK, ret) oldWriteJsonQuiet(w, r, http.StatusOK, ret)
} }

View file

@ -76,16 +76,16 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
if signingKey == "" || enableUiAccess { if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments // only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) adminMux.HandleFunc("/status", vs.guard.OldWhiteList(vs.statusHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.OldWhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.OldWhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.OldWhiteList(vs.statsDiskHandler))
} }
adminMux.HandleFunc("/", vs.privateStoreHandler) adminMux.HandleFunc("/", vs.oldPrivateStoreHandler)
if publicMux != adminMux { if publicMux != adminMux {
// separated admin and public port // separated admin and public port
handleStaticResources(publicMux) handleStaticResources(publicMux)
publicMux.HandleFunc("/", vs.publicReadOnlyHandler) publicMux.HandleFunc("/", vs.oldPublicReadOnlyHandler)
} }
go vs.heartbeat() go vs.heartbeat()

View file

@ -0,0 +1,81 @@
package weed_server
import (
"strings"
"github.com/valyala/fasthttp"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
)
func (vs *VolumeServer) HandleFastHTTP(ctx *fasthttp.RequestCtx) {
switch string(ctx.Method()) {
case "GET", "HEAD":
vs.fastGetOrHeadHandler(ctx)
case "DELETE":
stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(ctx)
case "PUT", "POST":
stats.WriteRequest()
vs.guard.WhiteList(vs.fastPostHandler)(ctx)
}
}
func (vs *VolumeServer) publicReadOnlyHandler(ctx *fasthttp.RequestCtx) {
switch string(ctx.Method()) {
case "GET":
stats.ReadRequest()
vs.fastGetOrHeadHandler(ctx)
case "HEAD":
stats.ReadRequest()
vs.fastGetOrHeadHandler(ctx)
}
}
func (vs *VolumeServer) maybeCheckJwtAuthorization(ctx *fasthttp.RequestCtx, vid, fid string, isWrite bool) bool {
var signingKey security.SigningKey
if isWrite {
if len(vs.guard.SigningKey) == 0 {
return true
} else {
signingKey = vs.guard.SigningKey
}
} else {
if len(vs.guard.ReadSigningKey) == 0 {
return true
} else {
signingKey = vs.guard.ReadSigningKey
}
}
tokenStr := security.GetJwt(ctx)
if tokenStr == "" {
glog.V(1).Infof("missing jwt from %s", ctx.RemoteAddr())
return false
}
token, err := security.DecodeJwt(signingKey, tokenStr)
if err != nil {
glog.V(1).Infof("jwt verification error from %s: %v", ctx.RemoteAddr(), err)
return false
}
if !token.Valid {
glog.V(1).Infof("jwt invalid from %s: %v", ctx.RemoteAddr(), tokenStr)
return false
}
if sc, ok := token.Claims.(*security.SeaweedFileIdClaims); ok {
if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 {
fid = fid[:sepIndex]
}
return sc.Fid == vid+","+fid
}
glog.V(1).Infof("unexpected jwt from %s: %v", ctx.RemoteAddr(), tokenStr)
return false
}

View file

@ -0,0 +1,366 @@
package weed_server
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/valyala/fasthttp"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) fastGetOrHeadHandler(ctx *fasthttp.RequestCtx) {
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
requestPath := string(ctx.Path())
n := new(needle.Needle)
vid, fid, filename, ext, _ := parseURLPath(requestPath)
if !vs.maybeCheckJwtAuthorization(ctx, vid, fid, false) {
writeJsonError(ctx, http.StatusUnauthorized, errors.New("wrong jwt"))
return
}
volumeId, err := needle.NewVolumeId(vid)
if err != nil {
glog.V(2).Infof("parsing volumd id %s error: %v", err, requestPath)
ctx.SetStatusCode(http.StatusBadRequest)
return
}
err = n.ParsePath(fid)
if err != nil {
glog.V(2).Infof("parsing fid %s error: %v", err, requestPath)
ctx.SetStatusCode(http.StatusBadRequest)
return
}
// glog.V(4).Infoln("volume", volumeId, "reading", n)
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume {
if !vs.ReadRedirect {
glog.V(2).Infoln("volume is not local:", err, requestPath)
ctx.SetStatusCode(http.StatusNotFound)
return
}
lookupResult, err := operation.Lookup(vs.GetMaster(), volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
if c := ctx.FormValue("collection"); c != nil {
arg.Set("collection", string(c))
}
u.RawQuery = arg.Encode()
ctx.Redirect(u.String(), http.StatusMovedPermanently)
} else {
glog.V(2).Infof("lookup %s error: %v", requestPath, err)
ctx.SetStatusCode(http.StatusNotFound)
}
return
}
cookie := n.Cookie
var count int
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
}
// glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
glog.V(0).Infof("read %s isNormalVolume %v error: %v", requestPath, hasVolume, err)
ctx.SetStatusCode(http.StatusNotFound)
return
}
if n.Cookie != cookie {
glog.V(0).Infof("request %s with cookie:%x expected:%x agent %s", requestPath, cookie, n.Cookie, string(ctx.UserAgent()))
ctx.SetStatusCode(http.StatusNotFound)
return
}
if n.LastModified != 0 {
ctx.Response.Header.Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat))
if ctx.Response.Header.Peek("If-Modified-Since") != nil {
if t, parseError := time.Parse(http.TimeFormat, string(ctx.Response.Header.Peek("If-Modified-Since"))); parseError == nil {
if t.Unix() >= int64(n.LastModified) {
ctx.SetStatusCode(http.StatusNotModified)
return
}
}
}
}
if inm := ctx.Response.Header.Peek("If-None-Match"); inm != nil && string(inm) == "\""+n.Etag()+"\"" {
ctx.SetStatusCode(http.StatusNotModified)
return
}
eTagMd5 := ctx.Response.Header.Peek("ETag-MD5")
if eTagMd5 != nil && string(eTagMd5) == "True" {
fastSetEtag(ctx, n.MD5())
} else {
fastSetEtag(ctx, n.Etag())
}
if n.HasPairs() {
pairMap := make(map[string]string)
err = json.Unmarshal(n.Pairs, &pairMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
for k, v := range pairMap {
ctx.Response.Header.Set(k, v)
}
}
if vs.fastTryHandleChunkedFile(n, filename, ctx) {
return
}
if n.NameSize > 0 && filename == "" {
filename = string(n.Name)
if ext == "" {
ext = path.Ext(filename)
}
}
mtype := ""
if n.MimeSize > 0 {
mt := string(n.Mime)
if !strings.HasPrefix(mt, "application/octet-stream") {
mtype = mt
}
}
if ext != ".gz" {
if n.IsGzipped() {
acceptEncoding := ctx.Request.Header.Peek("Accept-Encoding")
if acceptEncoding != nil && strings.Contains(string(acceptEncoding), "gzip") {
ctx.Response.Header.Set("Content-Encoding", "gzip")
} else {
if n.Data, err = util.UnGzipData(n.Data); err != nil {
glog.V(0).Infoln("ungzip error:", err, requestPath)
}
}
}
}
rs := fastConditionallyResizeImages(bytes.NewReader(n.Data), ext, ctx)
if e := fastWriteResponseContent(filename, mtype, rs, ctx); e != nil {
glog.V(2).Infoln("response write error:", e)
}
}
func (vs *VolumeServer) fastTryHandleChunkedFile(n *needle.Needle, fileName string, ctx *fasthttp.RequestCtx) (processed bool) {
if !n.IsChunkedManifest() || string(ctx.FormValue("cm")) == "false" {
return false
}
chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
if e != nil {
glog.V(0).Infof("load chunked manifest (%s) error: %v", string(ctx.Path()), e)
return false
}
if fileName == "" && chunkManifest.Name != "" {
fileName = chunkManifest.Name
}
ext := path.Ext(fileName)
mType := ""
if chunkManifest.Mime != "" {
mt := chunkManifest.Mime
if !strings.HasPrefix(mt, "application/octet-stream") {
mType = mt
}
}
ctx.Response.Header.Set("X-File-Store", "chunked")
chunkedFileReader := &operation.ChunkedFileReader{
Manifest: chunkManifest,
Master: vs.GetMaster(),
}
defer chunkedFileReader.Close()
rs := fastConditionallyResizeImages(chunkedFileReader, ext, ctx)
if e := fastWriteResponseContent(fileName, mType, rs, ctx); e != nil {
glog.V(2).Infoln("response write error:", e)
}
return true
}
func fastConditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext string, ctx *fasthttp.RequestCtx) io.ReadSeeker {
rs := originalDataReaderSeeker
if len(ext) > 0 {
ext = strings.ToLower(ext)
}
if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" {
width, height := 0, 0
formWidth, formHeight := ctx.FormValue("width"), ctx.FormValue("height")
if formWidth != nil {
width, _ = strconv.Atoi(string(formWidth))
}
if formHeight != nil {
height, _ = strconv.Atoi(string(formHeight))
}
formMode := ctx.FormValue("mode")
rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, string(formMode))
}
return rs
}
func fastWriteResponseContent(filename, mimeType string, rs io.ReadSeeker, ctx *fasthttp.RequestCtx) error {
totalSize, e := rs.Seek(0, 2)
if mimeType == "" {
if ext := path.Ext(filename); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
ctx.Response.Header.Set("Content-Type", mimeType)
}
if filename != "" {
contentDisposition := "inline"
dlFormValue := ctx.FormValue("dl")
if dlFormValue != nil {
if dl, _ := strconv.ParseBool(string(dlFormValue)); dl {
contentDisposition = "attachment"
}
}
ctx.Response.Header.Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`)
}
ctx.Response.Header.Set("Accept-Ranges", "bytes")
if ctx.IsHead() {
ctx.Response.Header.Set("Content-Length", strconv.FormatInt(totalSize, 10))
return nil
}
rangeReq := ctx.FormValue("Range")
if rangeReq == nil {
ctx.Response.Header.Set("Content-Length", strconv.FormatInt(totalSize, 10))
if _, e = rs.Seek(0, 0); e != nil {
return e
}
_, e = io.Copy(ctx.Response.BodyWriter(), rs)
return e
}
//the rest is dealing with partial content request
//mostly copy from src/pkg/net/http/fs.go
ranges, err := parseRange(string(rangeReq), totalSize)
if err != nil {
ctx.Response.SetStatusCode(http.StatusRequestedRangeNotSatisfiable)
ctxError(ctx, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return nil
}
if sumRangesSize(ranges) > totalSize {
// The total number of bytes in all the ranges
// is larger than the size of the file by
// itself, so this is probably an attack, or a
// dumb client. Ignore the range request.
return nil
}
if len(ranges) == 0 {
return nil
}
if len(ranges) == 1 {
// RFC 2616, Section 14.16:
// "When an HTTP message includes the content of a single
// range (for example, a response to a request for a
// single range, or to a request for a set of ranges
// that overlap without any holes), this content is
// transmitted with a Content-Range header, and a
// Content-Length header showing the number of bytes
// actually transferred.
// ...
// A response to a request for a single range MUST NOT
// be sent using the multipart/byteranges media type."
ra := ranges[0]
ctx.Response.Header.Set("Content-Length", strconv.FormatInt(ra.length, 10))
ctx.Response.Header.Set("Content-Range", ra.contentRange(totalSize))
ctx.Response.SetStatusCode(http.StatusPartialContent)
if _, e = rs.Seek(ra.start, 0); e != nil {
return e
}
_, e = io.CopyN(ctx.Response.BodyWriter(), rs, ra.length)
return e
}
// process multiple ranges
for _, ra := range ranges {
if ra.start > totalSize {
ctxError(ctx, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
return nil
}
}
sendSize := rangesMIMESize(ranges, mimeType, totalSize)
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
ctx.Response.Header.Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
sendContent := pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
for _, ra := range ranges {
part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
if e != nil {
pw.CloseWithError(e)
return
}
if _, e = rs.Seek(ra.start, 0); e != nil {
pw.CloseWithError(e)
return
}
if _, e = io.CopyN(part, rs, ra.length); e != nil {
pw.CloseWithError(e)
return
}
}
mw.Close()
pw.Close()
}()
if ctx.Response.Header.Peek("Content-Encoding") == nil {
ctx.Response.Header.Set("Content-Length", strconv.FormatInt(sendSize, 10))
}
ctx.Response.Header.SetStatusCode(http.StatusPartialContent)
_, e = io.CopyN(ctx.Response.BodyWriter(), sendContent, sendSize)
return e
}
func fastSetEtag(ctx *fasthttp.RequestCtx, etag string) {
if etag != "" {
if strings.HasPrefix(etag, "\"") {
ctx.Response.Header.Set("ETag", etag)
} else {
ctx.Response.Header.Set("ETag", "\""+etag+"\"")
}
}
}
func ctxError(ctx *fasthttp.RequestCtx, error string, code int) {
ctx.Response.Header.Set("Content-Type", "text/plain; charset=utf-8")
ctx.Response.Header.Set("X-Content-Type-Options", "nosniff")
ctx.Response.SetStatusCode(code)
fmt.Fprintln(ctx.Response.BodyWriter(), error)
}

View file

@ -0,0 +1,156 @@
package weed_server
import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"time"
"github.com/valyala/fasthttp"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
func (vs *VolumeServer) fastPostHandler(ctx *fasthttp.RequestCtx) {
stats.VolumeServerRequestCounter.WithLabelValues("post").Inc()
start := time.Now()
defer func() {
stats.VolumeServerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}()
requestPath := string(ctx.Path())
vid, fid, _, _, _ := parseURLPath(requestPath)
volumeId, ve := needle.NewVolumeId(vid)
if ve != nil {
glog.V(0).Infoln("NewVolumeId error:", ve)
writeJsonError(ctx, http.StatusBadRequest, ve)
return
}
if !vs.maybeCheckJwtAuthorization(ctx, vid, fid, true) {
writeJsonError(ctx, http.StatusUnauthorized, errors.New("wrong jwt"))
return
}
needle, originalSize, ne := needle.CreateNeedleFromRequest(ctx, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
if ne != nil {
writeJsonError(ctx, http.StatusBadRequest, ne)
return
}
ret := operation.UploadResult{}
_, isUnchanged, writeError := topology.OldReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
// http 304 status code does not allow body
if writeError == nil && isUnchanged {
ctx.SetStatusCode(http.StatusNotModified)
return
}
httpStatus := http.StatusCreated
if writeError != nil {
httpStatus = http.StatusInternalServerError
ret.Error = writeError.Error()
}
if needle.HasName() {
ret.Name = string(needle.Name)
}
ret.Size = uint32(originalSize)
ret.ETag = needle.Etag()
fastSetEtag(ctx, ret.ETag)
writeJsonQuiet(ctx, httpStatus, ret)
}
func (vs *VolumeServer) DeleteHandler(ctx *fasthttp.RequestCtx) {
stats.VolumeServerRequestCounter.WithLabelValues("delete").Inc()
start := time.Now()
defer func() {
stats.VolumeServerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
}()
requestPath := string(ctx.Path())
n := new(needle.Needle)
vid, fid, _, _, _ := parseURLPath(requestPath)
volumeId, _ := needle.NewVolumeId(vid)
n.ParsePath(fid)
if !vs.maybeCheckJwtAuthorization(ctx, vid, fid, true) {
writeJsonError(ctx, http.StatusUnauthorized, errors.New("wrong jwt"))
return
}
// glog.V(2).Infof("volume %s deleting %s", vid, n)
cookie := n.Cookie
ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
if hasEcVolume {
count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie)
writeDeleteResult(err, count, ctx)
return
}
_, ok := vs.store.ReadVolumeNeedle(volumeId, n)
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
writeJsonQuiet(ctx, http.StatusNotFound, m)
return
}
if n.Cookie != cookie {
glog.V(0).Infof("delete %s with unmaching cookie from %s agent %s", requestPath, ctx.RemoteAddr(), ctx.UserAgent())
writeJsonError(ctx, http.StatusBadRequest, errors.New("File Random Cookie does not match."))
return
}
count := int64(n.Size)
if n.IsChunkedManifest() {
chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
if e != nil {
writeJsonError(ctx, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e))
return
}
// make sure all chunks had deleted before delete manifest
if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil {
writeJsonError(ctx, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return
}
count = chunkManifest.Size
}
n.LastModified = uint64(time.Now().Unix())
tsValue := ctx.FormValue("ts")
if tsValue != nil {
modifiedTime, err := strconv.ParseInt(string(tsValue), 10, 64)
if err == nil {
n.LastModified = uint64(modifiedTime)
}
}
_, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, ctx)
writeDeleteResult(err, count, ctx)
}
func writeDeleteResult(err error, count int64, ctx *fasthttp.RequestCtx) {
if err == nil {
m := make(map[string]int64)
m["size"] = count
writeJsonQuiet(ctx, http.StatusAccepted, m)
} else {
writeJsonQuiet(ctx, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err))
}
}

View file

@ -24,32 +24,32 @@ security settings:
*/ */
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) oldPrivateStoreHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method { switch r.Method {
case "GET", "HEAD": case "GET", "HEAD":
stats.ReadRequest() stats.ReadRequest()
vs.GetOrHeadHandler(w, r) vs.OldGetOrHeadHandler(w, r)
case "DELETE": case "DELETE":
stats.DeleteRequest() stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(w, r) vs.guard.OldWhiteList(vs.OldDeleteHandler)(w, r)
case "PUT", "POST": case "PUT", "POST":
stats.WriteRequest() stats.WriteRequest()
vs.guard.WhiteList(vs.PostHandler)(w, r) vs.guard.OldWhiteList(vs.OldPostHandler)(w, r)
} }
} }
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) oldPublicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method { switch r.Method {
case "GET": case "GET":
stats.ReadRequest() stats.ReadRequest()
vs.GetOrHeadHandler(w, r) vs.OldGetOrHeadHandler(w, r)
case "HEAD": case "HEAD":
stats.ReadRequest() stats.ReadRequest()
vs.GetOrHeadHandler(w, r) vs.OldGetOrHeadHandler(w, r)
} }
} }
func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid string, isWrite bool) bool { func (vs *VolumeServer) oldMaybeCheckJwtAuthorization(r *http.Request, vid, fid string, isWrite bool) bool {
var signingKey security.SigningKey var signingKey security.SigningKey
@ -67,7 +67,7 @@ func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid str
} }
} }
tokenStr := security.GetJwt(r) tokenStr := security.OldGetJwt(r)
if tokenStr == "" { if tokenStr == "" {
glog.V(1).Infof("missing jwt from %s", r.RemoteAddr) glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
return false return false

View file

@ -13,7 +13,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Version"] = util.VERSION m["Version"] = util.VERSION
m["Volumes"] = vs.store.VolumeInfos() m["Volumes"] = vs.store.VolumeInfos()
writeJsonQuiet(w, r, http.StatusOK, m) oldWriteJsonQuiet(w, r, http.StatusOK, m)
} }
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
@ -26,5 +26,5 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request)
} }
} }
m["DiskStatuses"] = ds m["DiskStatuses"] = ds
writeJsonQuiet(w, r, http.StatusOK, m) oldWriteJsonQuiet(w, r, http.StatusOK, m)
} }

View file

@ -27,7 +27,7 @@ import (
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) OldGetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now() start := time.Now()
@ -36,8 +36,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
n := new(needle.Needle) n := new(needle.Needle)
vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
if !vs.maybeCheckJwtAuthorization(r, vid, fid, false) { if !vs.oldMaybeCheckJwtAuthorization(r, vid, fid, false) {
writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) oldWriteJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
return return
} }
@ -115,9 +115,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return return
} }
if r.Header.Get("ETag-MD5") == "True" { if r.Header.Get("ETag-MD5") == "True" {
setEtag(w, n.MD5()) oldSetEtag(w, n.MD5())
} else { } else {
setEtag(w, n.Etag()) oldSetEtag(w, n.Etag())
} }
if n.HasPairs() { if n.HasPairs() {

View file

@ -16,7 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/topology"
) )
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) OldPostHandler(w http.ResponseWriter, r *http.Request) {
stats.VolumeServerRequestCounter.WithLabelValues("post").Inc() stats.VolumeServerRequestCounter.WithLabelValues("post").Inc()
start := time.Now() start := time.Now()
@ -26,7 +26,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if e := r.ParseForm(); e != nil { if e := r.ParseForm(); e != nil {
glog.V(0).Infoln("form parse error:", e) glog.V(0).Infoln("form parse error:", e)
writeJsonError(w, r, http.StatusBadRequest, e) oldWriteJsonError(w, r, http.StatusBadRequest, e)
return return
} }
@ -34,23 +34,23 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
volumeId, ve := needle.NewVolumeId(vid) volumeId, ve := needle.NewVolumeId(vid)
if ve != nil { if ve != nil {
glog.V(0).Infoln("NewVolumeId error:", ve) glog.V(0).Infoln("NewVolumeId error:", ve)
writeJsonError(w, r, http.StatusBadRequest, ve) oldWriteJsonError(w, r, http.StatusBadRequest, ve)
return return
} }
if !vs.maybeCheckJwtAuthorization(r, vid, fid, true) { if !vs.oldMaybeCheckJwtAuthorization(r, vid, fid, true) {
writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) oldWriteJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
return return
} }
needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) needle, originalSize, ne := needle.OldCreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
if ne != nil { if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne) oldWriteJsonError(w, r, http.StatusBadRequest, ne)
return return
} }
ret := operation.UploadResult{} ret := operation.UploadResult{}
_, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) _, isUnchanged, writeError := topology.OldReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
// http 304 status code does not allow body // http 304 status code does not allow body
if writeError == nil && isUnchanged { if writeError == nil && isUnchanged {
@ -68,11 +68,11 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
ret.Size = uint32(originalSize) ret.Size = uint32(originalSize)
ret.ETag = needle.Etag() ret.ETag = needle.Etag()
setEtag(w, ret.ETag) oldSetEtag(w, ret.ETag)
writeJsonQuiet(w, r, httpStatus, ret) oldWriteJsonQuiet(w, r, httpStatus, ret)
} }
func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) OldDeleteHandler(w http.ResponseWriter, r *http.Request) {
stats.VolumeServerRequestCounter.WithLabelValues("delete").Inc() stats.VolumeServerRequestCounter.WithLabelValues("delete").Inc()
start := time.Now() start := time.Now()
@ -85,8 +85,8 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
volumeId, _ := needle.NewVolumeId(vid) volumeId, _ := needle.NewVolumeId(vid)
n.ParsePath(fid) n.ParsePath(fid)
if !vs.maybeCheckJwtAuthorization(r, vid, fid, true) { if !vs.oldMaybeCheckJwtAuthorization(r, vid, fid, true) {
writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) oldWriteJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
return return
} }
@ -98,7 +98,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
if hasEcVolume { if hasEcVolume {
count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie) count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie)
writeDeleteResult(err, count, w, r) oldWriteDeleteResult(err, count, w, r)
return return
} }
@ -106,13 +106,13 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
if ok != nil { if ok != nil {
m := make(map[string]uint32) m := make(map[string]uint32)
m["size"] = 0 m["size"] = 0
writeJsonQuiet(w, r, http.StatusNotFound, m) oldWriteJsonQuiet(w, r, http.StatusNotFound, m)
return return
} }
if n.Cookie != cookie { if n.Cookie != cookie {
glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
writeJsonError(w, r, http.StatusBadRequest, errors.New("File Random Cookie does not match.")) oldWriteJsonError(w, r, http.StatusBadRequest, errors.New("File Random Cookie does not match."))
return return
} }
@ -121,12 +121,12 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
if n.IsChunkedManifest() { if n.IsChunkedManifest() {
chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped()) chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
if e != nil { if e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e)) oldWriteJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e))
return return
} }
// make sure all chunks had deleted before delete manifest // make sure all chunks had deleted before delete manifest
if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil { if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) oldWriteJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return return
} }
count = chunkManifest.Size count = chunkManifest.Size
@ -140,23 +140,23 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
_, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r) _, err := topology.OldReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r)
writeDeleteResult(err, count, w, r) oldWriteDeleteResult(err, count, w, r)
} }
func writeDeleteResult(err error, count int64, w http.ResponseWriter, r *http.Request) { func oldWriteDeleteResult(err error, count int64, w http.ResponseWriter, r *http.Request) {
if err == nil { if err == nil {
m := make(map[string]int64) m := make(map[string]int64)
m["size"] = count m["size"] = count
writeJsonQuiet(w, r, http.StatusAccepted, m) oldWriteJsonQuiet(w, r, http.StatusAccepted, m)
} else { } else {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err)) oldWriteJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err))
} }
} }
func setEtag(w http.ResponseWriter, etag string) { func oldSetEtag(w http.ResponseWriter, etag string) {
if etag != "" { if etag != "" {
if strings.HasPrefix(etag, "\"") { if strings.HasPrefix(etag, "\"") {
w.Header().Set("ETag", etag) w.Header().Set("ETag", etag)

View file

@ -1,6 +1,7 @@
package needle package needle
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -10,13 +11,18 @@ import (
"strings" "strings"
"time" "time"
"github.com/valyala/fasthttp"
"github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/images"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
const ( const (
NeedleChecksumSize = 4 NeedleChecksumSize = 4
PairNamePrefix = "Seaweed-" PairNamePrefix = "Seaweed-"
)
var (
PairNamePrefixBytes = []byte("Seaweed-")
) )
/* /*
@ -50,7 +56,7 @@ func (n *Needle) String() (str string) {
return return
} }
func ParseUpload(r *http.Request, sizeLimit int64) ( func OldParseUpload(r *http.Request, sizeLimit int64) (
fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int, fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int,
modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
pairMap = make(map[string]string) pairMap = make(map[string]string)
@ -82,11 +88,45 @@ func ParseUpload(r *http.Request, sizeLimit int64) (
return return
} }
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) {
func ParseUpload(ctx *fasthttp.RequestCtx, sizeLimit int64) (
fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int,
modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
pairMap = make(map[string]string)
ctx.Request.Header.VisitAll(func(k, v []byte) {
if len(v) > 0 && bytes.HasPrefix(k, PairNamePrefixBytes) {
pairMap[string(k)] = string(v)
}
})
if ctx.IsPost() {
fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r, sizeLimit)
} else {
isGzipped = false
mimeType = string(ctx.Request.Header.Peek("Content-Type"))
fileName = ""
data, e = ioutil.ReadAll(io.LimitReader(ctx.PostBody(), sizeLimit+1))
originalDataSize = len(data)
if e == io.EOF || int64(originalDataSize) == sizeLimit+1 {
io.Copy(ioutil.Discard, r.Body)
}
r.Body.Close()
}
if e != nil {
return
}
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
ttl, _ = ReadTTL(r.FormValue("ttl"))
return
}
func OldCreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) {
var pairMap map[string]string var pairMap map[string]string
fname, mimeType, isGzipped, isChunkedFile := "", "", false, false fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
n = new(Needle) n = new(Needle)
fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r, sizeLimit) fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = OldParseUpload(r, sizeLimit)
if e != nil { if e != nil {
return return
} }
@ -146,6 +186,72 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit
return return
} }
func CreateNeedleFromRequest(ctx *fasthttp.RequestCtx, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) {
var pairMap map[string]string
fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
n = new(Needle)
fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = OldParseUpload(r, sizeLimit)
if e != nil {
return
}
if len(fname) < 256 {
n.Name = []byte(fname)
n.SetHasName()
}
if len(mimeType) < 256 {
n.Mime = []byte(mimeType)
n.SetHasMime()
}
if len(pairMap) != 0 {
trimmedPairMap := make(map[string]string)
for k, v := range pairMap {
trimmedPairMap[k[len(PairNamePrefix):]] = v
}
pairs, _ := json.Marshal(trimmedPairMap)
if len(pairs) < 65536 {
n.Pairs = pairs
n.PairsSize = uint16(len(pairs))
n.SetHasPairs()
}
}
if isGzipped {
n.SetGzipped()
}
if n.LastModified == 0 {
n.LastModified = uint64(time.Now().Unix())
}
n.SetHasLastModifiedDate()
if n.Ttl != EMPTY_TTL {
n.SetHasTtl()
}
if isChunkedFile {
n.SetIsChunkManifest()
}
if fixJpgOrientation {
loweredName := strings.ToLower(fname)
if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") {
n.Data = images.FixJpgOrientation(n.Data)
}
}
n.Checksum = NewCRC(n.Data)
commaSep := strings.LastIndex(r.URL.Path, ",")
dotSep := strings.LastIndex(r.URL.Path, ".")
fid := r.URL.Path[commaSep+1:]
if dotSep > 0 {
fid = r.URL.Path[commaSep+1 : dotSep]
}
e = n.ParsePath(fid)
return
}
func (n *Needle) ParsePath(fid string) (err error) { func (n *Needle) ParsePath(fid string) (err error) {
length := len(fid) length := len(fid)
if length <= CookieSize*2 { if length <= CookieSize*2 {

View file

@ -10,6 +10,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/valyala/fasthttp"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
@ -18,12 +20,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
func ReplicatedWrite(masterNode string, s *storage.Store, func OldReplicatedWrite(masterNode string, s *storage.Store,
volumeId needle.VolumeId, n *needle.Needle, volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (size uint32, isUnchanged bool, err error) { r *http.Request) (size uint32, isUnchanged bool, err error) {
//check JWT //check JWT
jwt := security.GetJwt(r) jwt := security.OldGetJwt(r)
var remoteLocations []operation.Location var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
@ -85,12 +87,79 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
return return
} }
func ReplicatedDelete(masterNode string, store *storage.Store, func ReplicatedWrite(masterNode string, s *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
ctx *fasthttp.RequestCtx) (size uint32, isUnchanged bool, err error) {
//check JWT
jwt := security.GetJwt(ctx)
var remoteLocations []operation.Location
if string(ctx.FormValue("type")) != "replicate" {
remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
if err != nil {
glog.V(0).Infoln(err)
return
}
}
size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
if err != nil {
err = fmt.Errorf("failed to write to local disk: %v", err)
glog.V(0).Infoln(err)
return
}
if len(remoteLocations) > 0 { //send to other replica locations
if err = distributedOperation(remoteLocations, s, func(location operation.Location) error {
u := url.URL{
Scheme: "http",
Host: location.Url,
Path: string(ctx.Path()),
}
q := url.Values{
"type": {"replicate"},
"ttl": {n.Ttl.String()},
}
if n.LastModified > 0 {
q.Set("ts", strconv.FormatUint(n.LastModified, 10))
}
if n.IsChunkedManifest() {
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
pairMap := make(map[string]string)
if n.HasPairs() {
tmpMap := make(map[string]string)
err := json.Unmarshal(n.Pairs, &tmpMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
for k, v := range tmpMap {
pairMap[needle.PairNamePrefix+k] = v
}
}
_, err := operation.Upload(u.String(),
string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime),
pairMap, jwt)
return err
}); err != nil {
size = 0
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.V(0).Infoln(err)
}
}
return
}
func OldReplicatedDelete(masterNode string, store *storage.Store,
volumeId needle.VolumeId, n *needle.Needle, volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (size uint32, err error) { r *http.Request) (size uint32, err error) {
//check JWT //check JWT
jwt := security.GetJwt(r) jwt := security.OldGetJwt(r)
var remoteLocations []operation.Location var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
@ -117,6 +186,38 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
return return
} }
func ReplicatedDelete(masterNode string, store *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
ctx *fasthttp.RequestCtx) (size uint32, err error) {
//check JWT
jwt := security.GetJwt(ctx)
var remoteLocations []operation.Location
if string(ctx.FormValue("type")) != "replicate" {
remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode)
if err != nil {
glog.V(0).Infoln(err)
return
}
}
size, err = store.DeleteVolumeNeedle(volumeId, n)
if err != nil {
glog.V(0).Infoln("delete error:", err)
return
}
if len(remoteLocations) > 0 { //send to other replica locations
if err = distributedOperation(remoteLocations, store, func(location operation.Location) error {
return util.Delete("http://"+location.Url+string(ctx.Path())+"?type=replicate", string(jwt))
}); err != nil {
size = 0
}
}
return
}
type DistributedOperationResult map[string]error type DistributedOperationResult map[string]error
func (dr DistributedOperationResult) Error() error { func (dr DistributedOperationResult) Error() error {