filer: stream read from volume server, reduce memory usage

This commit is contained in:
Chris Lu 2021-08-13 11:00:11 -07:00
parent f4decf02df
commit b961fcd338
2 changed files with 37 additions and 8 deletions

View file

@ -132,6 +132,41 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
} }
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
var shouldRetry bool
var written int
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
writer.Write(data)
written += len(data)
})
if !shouldRetry {
break
}
if err != nil {
glog.V(0).Infof("read %s failed, err: %v", urlString, err)
if written > 0 {
break
}
} else {
break
}
}
if err != nil && shouldRetry && written > 0 {
glog.V(0).Infof("retry reading in %v", waitTime)
time.Sleep(waitTime)
} else {
break
}
}
return err
}
func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest) return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
} }

View file

@ -16,7 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/chrislusf/seaweedfs/weed/wdclient"
) )
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks) glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
@ -40,18 +40,12 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
urlStrings := fileId2Url[chunkView.FileId] urlStrings := fileId2Url[chunkView.FileId]
start := time.Now() start := time.Now()
data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
if err != nil { if err != nil {
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
return fmt.Errorf("read chunk: %v", err) return fmt.Errorf("read chunk: %v", err)
} }
_, err = w.Write(data)
if err != nil {
stats.FilerRequestCounter.WithLabelValues("chunkDownloadedError").Inc()
return fmt.Errorf("write chunk: %v", err)
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
} }