volume: add option to limit file size

This commit is contained in:
Chris Lu 2020-01-03 00:37:24 -08:00
parent a3a2e69900
commit 3eafec4b29
7 changed files with 46 additions and 20 deletions

View file

@ -89,6 +89,7 @@ func init() {
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets") s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets")

View file

@ -10,17 +10,19 @@ import (
"strings" "strings"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util/httpdown"
"github.com/spf13/viper" "github.com/spf13/viper"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util/httpdown"
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc/reflection"
) )
var ( var (
@ -47,6 +49,7 @@ type VolumeServerOptions struct {
cpuProfile *string cpuProfile *string
memProfile *string memProfile *string
compactionMBPerSecond *int compactionMBPerSecond *int
fileSizeLimitMB *int
} }
func init() { func init() {
@ -67,6 +70,7 @@ func init() {
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
} }
var cmdVolume = &Command{ var cmdVolume = &Command{
@ -158,6 +162,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
v.whiteList, v.whiteList,
*v.fixJpgOrientation, *v.readRedirect, *v.fixJpgOrientation, *v.readRedirect,
*v.compactionMBPerSecond, *v.compactionMBPerSecond,
*v.fileSizeLimitMB,
) )
// starting grpc server // starting grpc server

View file

@ -98,7 +98,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
} }
debug("parsing upload file...") debug("parsing upload file...")
fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r) fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r, 256*1024*1024)
if pe != nil { if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe) writeJsonError(w, r, http.StatusBadRequest, pe)
return return

View file

@ -4,13 +4,15 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/chrislusf/seaweedfs/weed/stats"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/spf13/viper"
) )
type VolumeServer struct { type VolumeServer struct {
@ -29,6 +31,7 @@ type VolumeServer struct {
compactionBytePerSecond int64 compactionBytePerSecond int64
MetricsAddress string MetricsAddress string
MetricsIntervalSec int MetricsIntervalSec int
fileSizeLimitBytes int64
} }
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@ -41,6 +44,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fixJpgOrientation bool, fixJpgOrientation bool,
readRedirect bool, readRedirect bool,
compactionMBPerSecond int, compactionMBPerSecond int,
fileSizeLimitMB int,
) *VolumeServer { ) *VolumeServer {
v := viper.GetViper() v := viper.GetViper()
@ -62,6 +66,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
ReadRedirect: readRedirect, ReadRedirect: readRedirect,
grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"), grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
} }
vs.SeedMasterNodes = masterNodes vs.SeedMasterNodes = masterNodes
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)

View file

@ -43,7 +43,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation) needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
if ne != nil { if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne) writeJsonError(w, r, http.StatusBadRequest, ne)
return return

View file

@ -3,13 +3,13 @@ package needle
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"io/ioutil"
"github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/images"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
@ -50,7 +50,7 @@ func (n *Needle) String() (str string) {
return return
} }
func ParseUpload(r *http.Request) ( func ParseUpload(r *http.Request, sizeLimit int64) (
fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int, fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int,
modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
pairMap = make(map[string]string) pairMap = make(map[string]string)
@ -61,13 +61,17 @@ func ParseUpload(r *http.Request) (
} }
if r.Method == "POST" { if r.Method == "POST" {
fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r) fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r, sizeLimit)
} else { } else {
isGzipped = false isGzipped = false
mimeType = r.Header.Get("Content-Type") mimeType = r.Header.Get("Content-Type")
fileName = "" fileName = ""
data, e = ioutil.ReadAll(r.Body) data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1))
originalDataSize = len(data) originalDataSize = len(data)
if e == io.EOF || int64(originalDataSize) == sizeLimit+1 {
io.Copy(ioutil.Discard, r.Body)
}
r.Body.Close()
} }
if e != nil { if e != nil {
return return
@ -78,11 +82,11 @@ func ParseUpload(r *http.Request) (
return return
} }
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle, originalSize int, e error) { func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) {
var pairMap map[string]string var pairMap map[string]string
fname, mimeType, isGzipped, isChunkedFile := "", "", false, false fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
n = new(Needle) n = new(Needle)
fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r) fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r, sizeLimit)
if e != nil { if e != nil {
return return
} }

View file

@ -1,9 +1,7 @@
package needle package needle
import ( import (
"github.com/chrislusf/seaweedfs/weed/glog" "fmt"
"github.com/chrislusf/seaweedfs/weed/util"
"io" "io"
"io/ioutil" "io/ioutil"
"mime" "mime"
@ -11,9 +9,12 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func parseMultipart(r *http.Request) ( func parseMultipart(r *http.Request, sizeLimit int64) (
fileName string, data []byte, mimeType string, isGzipped bool, originalDataSize int, isChunkedFile bool, e error) { fileName string, data []byte, mimeType string, isGzipped bool, originalDataSize int, isChunkedFile bool, e error) {
defer func() { defer func() {
if e != nil && r.Body != nil { if e != nil && r.Body != nil {
@ -41,11 +42,17 @@ func parseMultipart(r *http.Request) (
fileName = path.Base(fileName) fileName = path.Base(fileName)
} }
data, e = ioutil.ReadAll(part) println("reading part", sizeLimit)
data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1))
if e != nil { if e != nil {
glog.V(0).Infoln("Reading Content [ERROR]", e) glog.V(0).Infoln("Reading Content [ERROR]", e)
return return
} }
if len(data) == int(sizeLimit)+1 {
e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
return
}
//if the filename is empty string, do a search on the other multi-part items //if the filename is empty string, do a search on the other multi-part items
for fileName == "" { for fileName == "" {
@ -58,12 +65,16 @@ func parseMultipart(r *http.Request) (
//found the first <file type> multi-part has filename //found the first <file type> multi-part has filename
if fName != "" { if fName != "" {
data2, fe2 := ioutil.ReadAll(part2) data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1))
if fe2 != nil { if fe2 != nil {
glog.V(0).Infoln("Reading Content [ERROR]", fe2) glog.V(0).Infoln("Reading Content [ERROR]", fe2)
e = fe2 e = fe2
return return
} }
if len(data) == int(sizeLimit)+1 {
e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
return
}
//update //update
data = data2 data = data2