From 59732a052996d57692d2179525d7b89387130ba7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Aug 2021 15:35:27 -0700 Subject: [PATCH] refactoring --- weed/filer/stream.go | 100 ++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 197b87ab8..2ea8ce493 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "sort" "strings" "time" @@ -88,65 +89,61 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView - totalSize int64 - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferPos int - chunkIndex int - lookupFileId wdclient.LookupFileIdFunctionType + chunkViews []*ChunkView + totalSize int64 + buffer []byte + bufferOffset int64 + bufferPos int + nextChunkViewIndex int + lookupFileId wdclient.LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) +func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + sort.Slice(chunkViews, func(i, j int) bool { + return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset + }) + + var totalSize int64 + for _, chunk := range chunkViews { + totalSize += int64(chunk.Size) + } + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: lookupFileIdFn, + totalSize: totalSize, + } +} + func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { return masterClient.LookupFileId(fileId) } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - - var totalSize int64 - for _, chunk := range chunkViews { - totalSize += int64(chunk.Size) - } - - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, - totalSize: totalSize, - } + return doNewChunkStreamReader(lookupFileIdFn, chunks) } func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := LookupFn(filerClient) - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - - var totalSize int64 - for _, chunk := range chunkViews { - totalSize += int64(chunk.Size) - } - - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, - totalSize: totalSize, - } + return doNewChunkStreamReader(lookupFileIdFn, chunks) } func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { for n < len(p) { if c.isBufferEmpty() { - if c.chunkIndex >= len(c.chunkViews) { + if c.nextChunkViewIndex >= len(c.chunkViews) { return n, io.EOF } - chunkView := c.chunkViews[c.chunkIndex] + chunkView := c.chunkViews[c.nextChunkViewIndex] c.fetchChunkToBuffer(chunkView) - c.chunkIndex++ + c.nextChunkViewIndex++ } t := copy(p[n:], c.buffer[c.bufferPos:]) c.bufferPos += t @@ -173,16 +170,33 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { err = io.ErrUnexpectedEOF } - for i, chunk := range c.chunkViews { - if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) - c.chunkIndex = i + 1 - break - } + // stay in the same chunk + if !c.isBufferEmpty() { + if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { + c.bufferPos = int(offset - c.bufferOffset) + return offset, nil } } - c.bufferPos = int(offset - c.bufferOffset) + + // need to seek to a different chunk + currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { + return c.chunkViews[i].LogicOffset <= offset + }) + if currentChunkIndex == len(c.chunkViews) { + return 0, io.EOF + } + + // positioning within the new chunk + chunk := c.chunkViews[currentChunkIndex] + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { + c.fetchChunkToBuffer(chunk) + c.nextChunkViewIndex = currentChunkIndex + 1 + } + c.bufferPos = int(offset - c.bufferOffset) + } else { + return 0, io.ErrUnexpectedEOF + } return offset, err