diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 202374e1b..d7e8a4162 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -18,7 +18,7 @@ type MockClient struct { } func (m *MockClient) Do(req *http.Request) (*http.Response, error) { - n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024) + n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024, &bytes.Buffer{}) if m.needleHandling != nil { m.needleHandling(n, originalSize, err) } diff --git a/weed/server/common.go b/weed/server/common.go index 571944c10..2e0ae4058 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "encoding/json" "errors" "fmt" @@ -104,7 +105,9 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope } debug("parsing upload file...") - pu, pe := needle.ParseUpload(r, 256*1024*1024) + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) + pu, pe := needle.ParseUpload(r, 256*1024*1024, bytesBuffer) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 8334d1618..acaa8f5ab 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "context" "fmt" "net/http" @@ -30,7 +31,10 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024 - pu, err := needle.ParseUpload(r, sizeLimit) + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) + + pu, err := needle.ParseUpload(r, sizeLimit, bytesBuffer) uncompressedData := pu.Data if pu.IsGzipped { uncompressedData = pu.UncompressedData diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 540def563..7082ab0f8 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net/http" "strings" + "sync" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -19,6 +20,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +var bufPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { var fileChunks []*filer_pb.FileChunk @@ -28,21 +35,28 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque chunkOffset := int64(0) var smallContent []byte + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) for { limitedReader := io.LimitReader(partReader, int64(chunkSize)) - data, err := ioutil.ReadAll(limitedReader) + bytesBuffer.Reset() + + dataSize, err := bytesBuffer.ReadFrom(limitedReader) + + // data, err := ioutil.ReadAll(limitedReader) if err != nil { return nil, nil, 0, err, nil } if chunkOffset == 0 && !isAppend(r) { - if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { - smallContent = data - chunkOffset += int64(len(data)) + if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && dataSize < 4*1024 { + chunkOffset += dataSize + smallContent = make([]byte, dataSize) + bytesBuffer.Write(smallContent) break } } - dataReader := util.NewBytesReader(data) + dataReader := util.NewBytesReader(bytesBuffer.Bytes()) // retry to assign a different file id var fileId, urlLocation string diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 3d752eda6..58212e8ff 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "errors" "fmt" "net/http" @@ -42,7 +43,10 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) + + reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes, bytesBuffer) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index 34d29ab6e..845ffdb24 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -1,6 +1,7 @@ package needle import ( + "bytes" "encoding/json" "fmt" "net/http" @@ -48,9 +49,9 @@ func (n *Needle) String() (str string) { return } -func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) { +func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64, bytesBuffer *bytes.Buffer) (n *Needle, originalSize int, contentMd5 string, e error) { n = new(Needle) - pu, e := ParseUpload(r, sizeLimit) + pu, e := ParseUpload(r, sizeLimit, bytesBuffer) if e != nil { return } diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index 7201503f1..0888c6b7a 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -1,6 +1,7 @@ package needle import ( + "bytes" "crypto/md5" "encoding/base64" "fmt" @@ -18,11 +19,12 @@ import ( ) type ParsedUpload struct { - FileName string - Data []byte - MimeType string - PairMap map[string]string - IsGzipped bool + FileName string + Data []byte + bytesBuffer *bytes.Buffer + MimeType string + PairMap map[string]string + IsGzipped bool // IsZstd bool OriginalDataSize int ModifiedTime uint64 @@ -32,8 +34,9 @@ type ParsedUpload struct { ContentMd5 string } -func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { - pu = &ParsedUpload{} +func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (pu *ParsedUpload, e error) { + bytesBuffer.Reset() + pu = &ParsedUpload{bytesBuffer: bytesBuffer} pu.PairMap = make(map[string]string) for k, v := range r.Header { if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) { @@ -72,14 +75,16 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { if mimeType == "application/octet-stream" { mimeType = "" } - if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure { - // println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType) - if compressedData, err := util.GzipData(pu.Data); err == nil { - if len(compressedData)*10 < len(pu.Data)*9 { - pu.Data = compressedData - pu.IsGzipped = true + if false { + if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure { + // println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType) + if compressedData, err := util.GzipData(pu.Data); err == nil { + if len(compressedData)*10 < len(pu.Data)*9 { + pu.Data = compressedData + pu.IsGzipped = true + } + // println("gzipped data size", len(compressedData)) } - // println("gzipped data size", len(compressedData)) } } } @@ -98,15 +103,16 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { return } -func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { +func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) error { pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip" // pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd" pu.MimeType = r.Header.Get("Content-Type") pu.FileName = "" - pu.Data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1)) - if e == io.EOF || int64(pu.OriginalDataSize) == sizeLimit+1 { + dataSize, err := pu.bytesBuffer.ReadFrom(io.LimitReader(r.Body, sizeLimit+1)) + if err == io.EOF || dataSize == sizeLimit+1 { io.Copy(ioutil.Discard, r.Body) } + pu.Data = pu.bytesBuffer.Bytes() r.Body.Close() return nil } @@ -138,15 +144,17 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error pu.FileName = path.Base(pu.FileName) } - pu.Data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1)) + var dataSize int64 + dataSize, e = pu.bytesBuffer.ReadFrom(io.LimitReader(part, sizeLimit+1)) if e != nil { glog.V(0).Infoln("Reading Content [ERROR]", e) return } - if len(pu.Data) == int(sizeLimit)+1 { + if dataSize == sizeLimit+1 { e = fmt.Errorf("file over the limited %d bytes", sizeLimit) return } + pu.Data = pu.bytesBuffer.Bytes() // if the filename is empty string, do a search on the other multi-part items for pu.FileName == "" { @@ -159,19 +167,20 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error // found the first multi-part has filename if fName != "" { - data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1)) + pu.bytesBuffer.Reset() + dataSize2, fe2 := pu.bytesBuffer.ReadFrom(io.LimitReader(part2, sizeLimit+1)) if fe2 != nil { glog.V(0).Infoln("Reading Content [ERROR]", fe2) e = fe2 return } - if len(data2) == int(sizeLimit)+1 { + if dataSize2 == sizeLimit+1 { e = fmt.Errorf("file over the limited %d bytes", sizeLimit) return } // update - pu.Data = data2 + pu.Data = pu.bytesBuffer.Bytes() pu.FileName = path.Base(fName) break } diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 16c2fd06b..d208404a8 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -1,6 +1,7 @@ package needle import ( + "bytes" "errors" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" @@ -9,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "io" "math" + "sync" ) const ( @@ -29,10 +31,14 @@ func (n *Needle) DiskSize(version Version) int64 { return GetActualSize(n.Size, version) } -func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error) { - - writeBytes := make([]byte, 0) +var bufPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} +func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (Size, int64, error) { + writeBytes.Reset() switch version { case Version1: header := make([]byte, NeedleHeaderSize) @@ -42,12 +48,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) size := n.Size actualSize := NeedleHeaderSize + int64(n.Size) - writeBytes = append(writeBytes, header...) - writeBytes = append(writeBytes, n.Data...) + writeBytes.Write(header) + writeBytes.Write(n.Data) padding := PaddingLength(n.Size, version) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...) - return writeBytes, size, actualSize, nil + writeBytes.Write(header[0:NeedleChecksumSize+padding]) + return size, actualSize, nil case Version2, Version3: header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation CookieToBytes(header[0:CookieSize], n.Cookie) @@ -79,51 +85,51 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error n.Size = 0 } SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) - writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...) + writeBytes.Write(header[0:NeedleHeaderSize]) if n.DataSize > 0 { util.Uint32toBytes(header[0:4], n.DataSize) - writeBytes = append(writeBytes, header[0:4]...) - writeBytes = append(writeBytes, n.Data...) + writeBytes.Write(header[0:4]) + writeBytes.Write(n.Data) util.Uint8toBytes(header[0:1], n.Flags) - writeBytes = append(writeBytes, header[0:1]...) + writeBytes.Write(header[0:1]) if n.HasName() { util.Uint8toBytes(header[0:1], n.NameSize) - writeBytes = append(writeBytes, header[0:1]...) - writeBytes = append(writeBytes, n.Name[:n.NameSize]...) + writeBytes.Write(header[0:1]) + writeBytes.Write(n.Name[:n.NameSize]) } if n.HasMime() { util.Uint8toBytes(header[0:1], n.MimeSize) - writeBytes = append(writeBytes, header[0:1]...) - writeBytes = append(writeBytes, n.Mime...) + writeBytes.Write(header[0:1]) + writeBytes.Write(n.Mime) } if n.HasLastModifiedDate() { util.Uint64toBytes(header[0:8], n.LastModified) - writeBytes = append(writeBytes, header[8-LastModifiedBytesLength:8]...) + writeBytes.Write(header[8-LastModifiedBytesLength:8]) } if n.HasTtl() && n.Ttl != nil { n.Ttl.ToBytes(header[0:TtlBytesLength]) - writeBytes = append(writeBytes, header[0:TtlBytesLength]...) + writeBytes.Write(header[0:TtlBytesLength]) } if n.HasPairs() { util.Uint16toBytes(header[0:2], n.PairsSize) - writeBytes = append(writeBytes, header[0:2]...) - writeBytes = append(writeBytes, n.Pairs...) + writeBytes.Write(header[0:2]) + writeBytes.Write(n.Pairs) } } padding := PaddingLength(n.Size, version) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) if version == Version2 { - writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...) + writeBytes.Write(header[0:NeedleChecksumSize+padding]) } else { // version3 util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs) - writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...) + writeBytes.Write(header[0:NeedleChecksumSize+TimestampSize+padding]) } - return writeBytes, Size(n.DataSize), GetActualSize(n.Size, version), nil + return Size(n.DataSize), GetActualSize(n.Size, version), nil } - return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) + return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) } func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) { @@ -146,10 +152,13 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u return } - bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version) + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) + + size, actualSize, err = n.prepareWriteBuffer(version, bytesBuffer) if err == nil { - _, err = w.WriteAt(bytesToWrite, int64(offset)) + _, err = w.WriteAt(bytesBuffer.Bytes(), int64(offset)) } return offset, size, actualSize, err