mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Add download speed limit support (#3408)
This commit is contained in:
parent
b278bb24d3
commit
84ec68e11a
|
@ -59,6 +59,7 @@ type FilerOptions struct {
|
||||||
debugPort *int
|
debugPort *int
|
||||||
localSocket *string
|
localSocket *string
|
||||||
showUIDirectoryDelete *bool
|
showUIDirectoryDelete *bool
|
||||||
|
downloadMaxMBps *int
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -87,6 +88,7 @@ func init() {
|
||||||
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
|
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
|
||||||
f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
|
f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
|
||||||
f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button")
|
f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button")
|
||||||
|
f.downloadMaxMBps = cmdFiler.Flag.Int("downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
|
||||||
|
|
||||||
// start s3 on filer
|
// start s3 on filer
|
||||||
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
|
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
|
||||||
|
@ -239,6 +241,7 @@ func (fo *FilerOptions) startFiler() {
|
||||||
SaveToFilerLimit: int64(*fo.saveToFilerLimit),
|
SaveToFilerLimit: int64(*fo.saveToFilerLimit),
|
||||||
ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
|
ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
|
||||||
ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
|
ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
|
||||||
|
DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
|
||||||
})
|
})
|
||||||
if nfs_err != nil {
|
if nfs_err != nil {
|
||||||
glog.Fatalf("Filer startup error: %v", nfs_err)
|
glog.Fatalf("Filer startup error: %v", nfs_err)
|
||||||
|
|
|
@ -115,6 +115,7 @@ func init() {
|
||||||
filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
|
filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
|
||||||
filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
|
filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
|
||||||
filerOptions.showUIDirectoryDelete = cmdServer.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button")
|
filerOptions.showUIDirectoryDelete = cmdServer.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button")
|
||||||
|
filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
|
||||||
|
|
||||||
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
|
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
|
||||||
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
|
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
|
||||||
|
|
|
@ -68,6 +68,10 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
|
||||||
}
|
}
|
||||||
|
|
||||||
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
||||||
|
return StreamContentWithThrottler(masterClient, writer, chunks, offset, size, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) error {
|
||||||
|
|
||||||
glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
|
glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
|
||||||
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
|
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
|
||||||
|
@ -95,6 +99,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
|
||||||
fileId2Url[chunkView.FileId] = urlStrings
|
fileId2Url[chunkView.FileId] = urlStrings
|
||||||
}
|
}
|
||||||
|
|
||||||
|
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
|
||||||
remaining := size
|
remaining := size
|
||||||
for _, chunkView := range chunkViews {
|
for _, chunkView := range chunkViews {
|
||||||
if offset < chunkView.LogicOffset {
|
if offset < chunkView.LogicOffset {
|
||||||
|
@ -118,6 +123,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
|
||||||
return fmt.Errorf("read chunk: %v", err)
|
return fmt.Errorf("read chunk: %v", err)
|
||||||
}
|
}
|
||||||
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
|
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
|
||||||
|
downloadThrottler.MaybeSlowdown(int64(chunkView.Size))
|
||||||
}
|
}
|
||||||
if remaining > 0 {
|
if remaining > 0 {
|
||||||
glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
|
glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
|
||||||
|
|
|
@ -68,6 +68,7 @@ type FilerOption struct {
|
||||||
SaveToFilerLimit int64
|
SaveToFilerLimit int64
|
||||||
ConcurrentUploadLimit int64
|
ConcurrentUploadLimit int64
|
||||||
ShowUIDirectoryDelete bool
|
ShowUIDirectoryDelete bool
|
||||||
|
DownloadMaxBytesPs int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type FilerServer struct {
|
type FilerServer struct {
|
||||||
|
|
|
@ -238,7 +238,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size)
|
err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
|
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
|
||||||
glog.Errorf("failed to stream content %s: %v", r.URL, err)
|
glog.Errorf("failed to stream content %s: %v", r.URL, err)
|
||||||
|
|
Loading…
Reference in a new issue