mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
add s3 upload, and removing mono and multi part upload analyzer
removing mono and multi part upload analyzer, which were used just to determine the file name
This commit is contained in:
parent
80d80daf64
commit
8480008a9a
|
@ -59,8 +59,9 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
|
|||
values.Add("dataNode", request.DataNode)
|
||||
}
|
||||
|
||||
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
|
||||
glog.V(2).Infof("assign result from %s : %s", server, string(jsonBlob))
|
||||
postUrl := fmt.Sprintf("http://%s/dir/assign", server)
|
||||
jsonBlob, err := util.Post(postUrl, values)
|
||||
glog.V(2).Infof("assign %d result from %s %+v : %s", i, postUrl, values, string(jsonBlob))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -31,8 +31,9 @@ const (
|
|||
ErrBucketNotEmpty
|
||||
ErrBucketAlreadyExists
|
||||
ErrBucketAlreadyOwnedByYou
|
||||
ErrInvalidBucketName
|
||||
ErrNoSuchBucket
|
||||
ErrInvalidBucketName
|
||||
ErrInvalidDigest
|
||||
ErrInternalError
|
||||
)
|
||||
|
||||
|
@ -64,6 +65,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
|
|||
Description: "The specified bucket is not valid.",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrInvalidDigest: {
|
||||
Code: "InvalidDigest",
|
||||
Description: "The Content-Md5 you specified is not valid.",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrNoSuchBucket: {
|
||||
Code: "NoSuchBucket",
|
||||
Description: "The specified bucket does not exist",
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"encoding/base64"
|
||||
)
|
||||
|
||||
type mimeType string
|
||||
|
@ -93,3 +94,14 @@ func writeSuccessResponseXML(w http.ResponseWriter, response []byte) {
|
|||
func writeSuccessResponseEmpty(w http.ResponseWriter) {
|
||||
writeResponse(w, http.StatusOK, nil, mimeNone)
|
||||
}
|
||||
|
||||
func validateContentMd5(h http.Header) ([]byte, error) {
|
||||
md5B64, ok := h["Content-Md5"]
|
||||
if ok {
|
||||
if md5B64[0] == "" {
|
||||
return nil, fmt.Errorf("Content-Md5 header set to empty value")
|
||||
}
|
||||
return base64.StdEncoding.DecodeString(md5B64[0])
|
||||
}
|
||||
return []byte{}, nil
|
||||
}
|
||||
|
|
90
weed/s3api/s3api_object_handlers.go
Normal file
90
weed/s3api/s3api_object_handlers.go
Normal file
|
@ -0,0 +1,90 @@
|
|||
package s3api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
"io/ioutil"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
var (
|
||||
client *http.Client
|
||||
)
|
||||
|
||||
func init() {
|
||||
client = &http.Client{Transport: &http.Transport{
|
||||
MaxIdleConnsPerHost: 1024,
|
||||
}}
|
||||
}
|
||||
|
||||
type UploadResult struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Size uint32 `json:"size,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
object := vars["object"]
|
||||
|
||||
_, err := validateContentMd5(r.Header)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, ErrInvalidDigest, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s",
|
||||
s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
|
||||
proxyReq, err := http.NewRequest("PUT", uploadUrl, r.Body)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
proxyReq.Header.Set("Host", s3a.option.Filer)
|
||||
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
|
||||
|
||||
for header, values := range r.Header {
|
||||
for _, value := range values {
|
||||
proxyReq.Header.Add(header, value)
|
||||
}
|
||||
}
|
||||
|
||||
resp, postErr := client.Do(proxyReq)
|
||||
|
||||
if postErr != nil {
|
||||
glog.Errorf("post to filer: %v", postErr)
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
resp_body, ra_err := ioutil.ReadAll(resp.Body)
|
||||
if ra_err != nil {
|
||||
glog.Errorf("upload to filer response read: %v", ra_err)
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
var ret UploadResult
|
||||
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
||||
if unmarshal_err != nil {
|
||||
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
if ret.Error != "" {
|
||||
glog.Errorf("upload to filer error: %v", ret.Error)
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseEmpty(w)
|
||||
}
|
|
@ -42,6 +42,10 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
|
|||
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
|
||||
|
||||
for _, bucket := range routers {
|
||||
|
||||
// PutObject
|
||||
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectHandler)
|
||||
|
||||
// PutBucket
|
||||
bucket.Methods("PUT").HandlerFunc(s3a.PutBucketHandler)
|
||||
// DeleteBucket
|
||||
|
|
|
@ -70,9 +70,10 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
|
|||
DataCenter: "",
|
||||
}
|
||||
}
|
||||
|
||||
assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest)
|
||||
if ae != nil {
|
||||
glog.V(0).Infoln("failing to assign a file id", ae.Error())
|
||||
glog.Errorf("failing to assign a file id: %v", ae)
|
||||
writeJsonError(w, r, http.StatusInternalServerError, ae)
|
||||
err = ae
|
||||
return
|
||||
|
@ -102,20 +103,24 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
var fileId, urlLocation string
|
||||
var err error
|
||||
|
||||
/*
|
||||
var path string
|
||||
if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") {
|
||||
fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
path, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter)
|
||||
} else {
|
||||
fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter)
|
||||
if err != nil || fileId == "" {
|
||||
return
|
||||
}
|
||||
path, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter)
|
||||
}
|
||||
*/
|
||||
|
||||
fileId, urlLocation, err := fs.queryFileInfoByPath(w, r, r.URL.Path)
|
||||
if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, r.URL.Path); err == nil && fileId == "" {
|
||||
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
|
||||
}
|
||||
if err != nil || fileId == "" || urlLocation == "" {
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(0).Infof("request header %+v, urlLocation: %v", r.Header, urlLocation)
|
||||
|
||||
u, _ := url.Parse(urlLocation)
|
||||
|
||||
|
@ -142,7 +147,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
resp, do_err := util.Do(request)
|
||||
if do_err != nil {
|
||||
glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
|
||||
glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method)
|
||||
writeJsonError(w, r, http.StatusInternalServerError, do_err)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -73,15 +73,8 @@ func checkContentMD5(w http.ResponseWriter, r *http.Request) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
|
||||
/*
|
||||
Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html]
|
||||
There is a long way to provide a completely compatibility against all Amazon S3 API, I just made
|
||||
a simple data stream adapter between S3 PUT API and seaweedfs's volume storage Write API
|
||||
1. The request url format should be http://$host:$port/$bucketName/$objectName
|
||||
2. bucketName will be mapped to seaweedfs's collection name
|
||||
3. You could customize and make your enhancement.
|
||||
*/
|
||||
func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (path string, err error) {
|
||||
|
||||
lastPos := strings.LastIndex(r.URL.Path, "/")
|
||||
if lastPos == -1 || lastPos == 0 || lastPos == len(r.URL.Path)-1 {
|
||||
glog.V(0).Infof("URL Path [%s] is invalid, could not retrieve file name", r.URL.Path)
|
||||
|
@ -99,13 +92,8 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R
|
|||
return
|
||||
}
|
||||
|
||||
secondPos := strings.Index(r.URL.Path[1:], "/") + 1
|
||||
collection = r.URL.Path[1:secondPos]
|
||||
path := r.URL.Path
|
||||
path = r.URL.Path
|
||||
|
||||
if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path); err == nil && fileId == "" {
|
||||
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -1,39 +1,13 @@
|
|||
package weed_server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
)
|
||||
|
||||
func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
|
||||
func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (path string, err error) {
|
||||
//Default handle way for http multipart
|
||||
if r.Method == "PUT" {
|
||||
buf, _ := ioutil.ReadAll(r.Body)
|
||||
r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
|
||||
fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r)
|
||||
if pe != nil {
|
||||
glog.V(0).Infoln("failing to parse post body", pe.Error())
|
||||
writeJsonError(w, r, http.StatusInternalServerError, pe)
|
||||
err = pe
|
||||
return
|
||||
}
|
||||
//reconstruct http request body for following new request to volume server
|
||||
r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
|
||||
|
||||
path := r.URL.Path
|
||||
if strings.HasSuffix(path, "/") {
|
||||
if fileName != "" {
|
||||
path += fileName
|
||||
}
|
||||
}
|
||||
fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path)
|
||||
} else {
|
||||
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
|
||||
path = r.URL.Path
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue