mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #432 from sparklxb/master
support additional header name-value pairs
This commit is contained in:
commit
b14332df96
|
@ -155,7 +155,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
|
|||
cm.DeleteChunks(master)
|
||||
}
|
||||
} else {
|
||||
ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
|
||||
ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt)
|
||||
if e != nil {
|
||||
return 0, e
|
||||
}
|
||||
|
@ -180,7 +180,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
|
|||
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
|
||||
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
||||
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
|
||||
"application/octet-stream", jwt)
|
||||
"application/octet-stream", nil, jwt)
|
||||
if uploadError != nil {
|
||||
return fid, 0, uploadError
|
||||
}
|
||||
|
@ -198,6 +198,6 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
|
|||
q := u.Query()
|
||||
q.Set("cm", "true")
|
||||
u.RawQuery = q.Encode()
|
||||
_, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt)
|
||||
_, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
|
||||
return e
|
||||
}
|
||||
|
|
|
@ -36,13 +36,13 @@ func init() {
|
|||
|
||||
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
|
||||
|
||||
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
|
||||
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
|
||||
return upload_content(uploadUrl, func(w io.Writer) (err error) {
|
||||
_, err = io.Copy(w, reader)
|
||||
return
|
||||
}, filename, isGzipped, mtype, jwt)
|
||||
}, filename, isGzipped, mtype, pairMap, jwt)
|
||||
}
|
||||
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
|
||||
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
|
||||
body_buf := bytes.NewBufferString("")
|
||||
body_writer := multipart.NewWriter(body_buf)
|
||||
h := make(textproto.MIMEHeader)
|
||||
|
@ -59,6 +59,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
|
|||
if jwt != "" {
|
||||
h.Set("Authorization", "BEARER "+string(jwt))
|
||||
}
|
||||
|
||||
file_writer, cp_err := body_writer.CreatePart(h)
|
||||
if cp_err != nil {
|
||||
glog.V(0).Infoln("error creating form file", cp_err.Error())
|
||||
|
@ -73,7 +74,17 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
|
|||
glog.V(0).Infoln("error closing body", err)
|
||||
return nil, err
|
||||
}
|
||||
resp, post_err := client.Post(uploadUrl, content_type, body_buf)
|
||||
|
||||
req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
|
||||
if postErr != nil {
|
||||
glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error())
|
||||
return nil, postErr
|
||||
}
|
||||
req.Header.Set("Content-Type", content_type)
|
||||
for k, v := range pairMap {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
resp, post_err := client.Do(req)
|
||||
if post_err != nil {
|
||||
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
|
||||
return nil, post_err
|
||||
|
@ -86,7 +97,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
|
|||
var ret UploadResult
|
||||
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
||||
if unmarshal_err != nil {
|
||||
glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body))
|
||||
glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))
|
||||
return nil, unmarshal_err
|
||||
}
|
||||
if ret.Error != "" {
|
||||
|
|
|
@ -86,7 +86,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
|
|||
}
|
||||
|
||||
debug("parsing upload file...")
|
||||
fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
|
||||
fname, data, mimeType, pairMap, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
|
||||
if pe != nil {
|
||||
writeJsonError(w, r, http.StatusBadRequest, pe)
|
||||
return
|
||||
|
@ -112,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
|
|||
}
|
||||
|
||||
debug("upload file to store", url)
|
||||
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt)
|
||||
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, jwt)
|
||||
if err != nil {
|
||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||
return
|
||||
|
|
|
@ -13,6 +13,8 @@ import (
|
|||
"net/http"
|
||||
"net/textproto"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
|
@ -20,8 +22,6 @@ import (
|
|||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"path"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type FilerPostResult struct {
|
||||
|
@ -112,7 +112,7 @@ func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Re
|
|||
if r.Method == "PUT" {
|
||||
buf, _ := ioutil.ReadAll(r.Body)
|
||||
r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
|
||||
fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r)
|
||||
fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r)
|
||||
if pe != nil {
|
||||
glog.V(0).Infoln("failing to parse post body", pe.Error())
|
||||
writeJsonError(w, r, http.StatusInternalServerError, pe)
|
||||
|
@ -521,7 +521,7 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
|
|||
err = nil
|
||||
|
||||
ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
|
||||
uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId))
|
||||
uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId))
|
||||
if uploadResult != nil {
|
||||
glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
|
||||
}
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/images"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
|
@ -94,6 +96,17 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
w.Header().Set("Etag", etag)
|
||||
|
||||
if n.HasPairs() {
|
||||
pairMap := make(map[string]string)
|
||||
err = json.Unmarshal(n.Pairs, &pairMap)
|
||||
if err != nil {
|
||||
glog.V(0).Infoln("Unmarshal pairs error:", err)
|
||||
}
|
||||
for k, v := range pairMap {
|
||||
w.Header().Set(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
if vs.tryHandleChunkedFile(n, filename, w, r) {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
|
@ -22,6 +23,7 @@ const (
|
|||
NeedleChecksumSize = 4
|
||||
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
|
||||
TombstoneFileSize = math.MaxUint32
|
||||
PairNamePrefix = "Seaweed-"
|
||||
)
|
||||
|
||||
/*
|
||||
|
@ -40,6 +42,8 @@ type Needle struct {
|
|||
Name []byte `comment:"maximum 256 characters"` //version2
|
||||
MimeSize uint8 //version2
|
||||
Mime []byte `comment:"maximum 256 characters"` //version2
|
||||
PairsSize uint16 //version2
|
||||
Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
|
||||
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
|
||||
Ttl *TTL
|
||||
|
||||
|
@ -55,8 +59,15 @@ func (n *Needle) String() (str string) {
|
|||
}
|
||||
|
||||
func ParseUpload(r *http.Request) (
|
||||
fileName string, data []byte, mimeType string, isGzipped bool,
|
||||
fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool,
|
||||
modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
|
||||
pairMap = make(map[string]string)
|
||||
for k, v := range r.Header {
|
||||
if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
|
||||
pairMap[k] = v[0]
|
||||
}
|
||||
}
|
||||
|
||||
form, fe := r.MultipartReader()
|
||||
if fe != nil {
|
||||
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
|
||||
|
@ -109,8 +120,6 @@ func ParseUpload(r *http.Request) (
|
|||
}
|
||||
|
||||
isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
|
||||
isGzipped = false
|
||||
if !isChunkedFile {
|
||||
dotIndex := strings.LastIndex(fileName, ".")
|
||||
ext, mtype := "", ""
|
||||
if dotIndex > 0 {
|
||||
|
@ -122,6 +131,8 @@ func ParseUpload(r *http.Request) (
|
|||
mimeType = contentType //only return mime type if not deductable
|
||||
mtype = contentType
|
||||
}
|
||||
|
||||
if !isChunkedFile {
|
||||
if part.Header.Get("Content-Encoding") == "gzip" {
|
||||
isGzipped = true
|
||||
} else if operation.IsGzippable(ext, mtype) {
|
||||
|
@ -144,9 +155,10 @@ func ParseUpload(r *http.Request) (
|
|||
return
|
||||
}
|
||||
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
|
||||
var pairMap map[string]string
|
||||
fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
|
||||
n = new(Needle)
|
||||
fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
|
||||
fname, n.Data, mimeType, pairMap, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
|
||||
if e != nil {
|
||||
return
|
||||
}
|
||||
|
@ -158,6 +170,19 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
|
|||
n.Mime = []byte(mimeType)
|
||||
n.SetHasMime()
|
||||
}
|
||||
if len(pairMap) != 0 {
|
||||
trimmedPairMap := make(map[string]string)
|
||||
for k, v := range pairMap {
|
||||
trimmedPairMap[k[len(PairNamePrefix):]] = v
|
||||
}
|
||||
|
||||
pairs, _ := json.Marshal(trimmedPairMap)
|
||||
if len(pairs) < 65536 {
|
||||
n.Pairs = pairs
|
||||
n.PairsSize = uint16(len(pairs))
|
||||
n.SetHasPairs()
|
||||
}
|
||||
}
|
||||
if isGzipped {
|
||||
n.SetGzipped()
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ const (
|
|||
FlagHasMime = 0x04
|
||||
FlagHasLastModifiedDate = 0x08
|
||||
FlagHasTtl = 0x10
|
||||
FlagHasPairs = 0x20
|
||||
FlagIsChunkManifest = 0x80
|
||||
LastModifiedBytesLength = 5
|
||||
TtlBytesLength = 2
|
||||
|
@ -78,6 +79,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
|
|||
if n.HasTtl() {
|
||||
n.Size = n.Size + TtlBytesLength
|
||||
}
|
||||
if n.HasPairs() {
|
||||
n.Size += 2 + uint32(n.PairsSize)
|
||||
}
|
||||
} else {
|
||||
n.Size = 0
|
||||
}
|
||||
|
@ -128,6 +132,15 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
|
|||
return
|
||||
}
|
||||
}
|
||||
if n.HasPairs() {
|
||||
util.Uint16toBytes(header[0:2], n.PairsSize)
|
||||
if _, err = w.Write(header[0:2]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Pairs); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
|
||||
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
|
||||
|
@ -141,8 +154,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
|
|||
}
|
||||
|
||||
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
|
||||
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
|
||||
readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
|
||||
NeedleWithoutPaddingSize := NeedleHeaderSize + size + NeedleChecksumSize
|
||||
padding := NeedlePaddingSize - (NeedleWithoutPaddingSize % NeedlePaddingSize)
|
||||
readSize := NeedleWithoutPaddingSize + padding
|
||||
return getBytesForFileBlock(r, offset, int(readSize))
|
||||
}
|
||||
|
||||
|
@ -213,6 +227,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
|
|||
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
|
||||
index = index + TtlBytesLength
|
||||
}
|
||||
if index < lenBytes && n.HasPairs() {
|
||||
n.PairsSize = util.BytesToUint16(bytes[index : index+2])
|
||||
index += 2
|
||||
end := index + int(n.PairsSize)
|
||||
n.Pairs = bytes[index:end]
|
||||
index = end
|
||||
}
|
||||
}
|
||||
|
||||
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
|
||||
|
@ -296,3 +317,11 @@ func (n *Needle) IsChunkedManifest() bool {
|
|||
func (n *Needle) SetIsChunkManifest() {
|
||||
n.Flags = n.Flags | FlagIsChunkManifest
|
||||
}
|
||||
|
||||
func (n *Needle) HasPairs() bool {
|
||||
return n.Flags&FlagHasPairs != 0
|
||||
}
|
||||
|
||||
func (n *Needle) SetHasPairs() {
|
||||
n.Flags = n.Flags | FlagHasPairs
|
||||
}
|
||||
|
|
|
@ -303,6 +303,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
|
|||
err = fmt.Errorf("Volume %d is read only", i)
|
||||
return
|
||||
}
|
||||
// TODO: count needle size ahead
|
||||
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
|
||||
size, err = v.writeNeedle(n)
|
||||
} else {
|
||||
|
|
|
@ -2,14 +2,14 @@ package topology
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"net/url"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
|
@ -55,9 +55,18 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
|
|||
q.Set("cm", "true")
|
||||
}
|
||||
u.RawQuery = q.Encode()
|
||||
|
||||
pairMap := make(map[string]string)
|
||||
if needle.HasPairs() {
|
||||
err := json.Unmarshal(needle.Pairs, &pairMap)
|
||||
if err != nil {
|
||||
glog.V(0).Infoln("Unmarshal pairs error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err := operation.Upload(u.String(),
|
||||
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
|
||||
jwt)
|
||||
pairMap, jwt)
|
||||
return err
|
||||
}); err != nil {
|
||||
ret = 0
|
||||
|
|
Loading…
Reference in a new issue