From ecb234f75aadfcaf64638fdd9532e3bc95d0e4ce Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Aug 2021 14:46:23 -0700 Subject: [PATCH 1/4] refactor --- weed/filer/stream.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 70a278ca5..197b87ab8 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -89,6 +89,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { chunkViews []*ChunkView + totalSize int64 logicOffset int64 buffer []byte bufferOffset int64 @@ -107,9 +108,15 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + var totalSize int64 + for _, chunk := range chunkViews { + totalSize += int64(chunk.Size) + } + return &ChunkStreamReader{ chunkViews: chunkViews, lookupFileId: lookupFileIdFn, + totalSize: totalSize, } } @@ -119,9 +126,15 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + var totalSize int64 + for _, chunk := range chunkViews { + totalSize += int64(chunk.Size) + } + return &ChunkStreamReader{ chunkViews: chunkViews, lookupFileId: lookupFileIdFn, + totalSize: totalSize, } } @@ -148,20 +161,15 @@ func (c *ChunkStreamReader) isBufferEmpty() bool { func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { - var totalSize int64 - for _, chunk := range c.chunkViews { - totalSize += int64(chunk.Size) - } - var err error switch whence { case io.SeekStart: case io.SeekCurrent: offset += c.bufferOffset + int64(c.bufferPos) case io.SeekEnd: - offset = totalSize + offset + offset = c.totalSize + offset } - if offset > totalSize { + if offset > c.totalSize { err = io.ErrUnexpectedEOF } From d2b3416d1c7c1a75d1f535658a65337109aa24f8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Aug 2021 14:54:00 -0700 Subject: [PATCH 2/4] java: use empty value as replication default --- .../src/main/java/seaweedfs/client/SeaweedOutputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 3bec05796..7a94acbd0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -32,10 +32,10 @@ public class SeaweedOutputStream extends OutputStream { private long lastTotalAppendOffset = 0; private ByteBuffer buffer; private long outputIndex; - private String replication = "000"; + private String replication = ""; public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { - this(filerClient, fullpath, "000"); + this(filerClient, fullpath, ""); } public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) { From 59732a052996d57692d2179525d7b89387130ba7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Aug 2021 15:35:27 -0700 Subject: [PATCH 3/4] refactoring --- weed/filer/stream.go | 100 ++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 197b87ab8..2ea8ce493 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "sort" "strings" "time" @@ -88,65 +89,61 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView - totalSize int64 - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferPos int - chunkIndex int - lookupFileId wdclient.LookupFileIdFunctionType + chunkViews []*ChunkView + totalSize int64 + buffer []byte + bufferOffset int64 + bufferPos int + nextChunkViewIndex int + lookupFileId wdclient.LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) +func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + sort.Slice(chunkViews, func(i, j int) bool { + return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset + }) + + var totalSize int64 + for _, chunk := range chunkViews { + totalSize += int64(chunk.Size) + } + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: lookupFileIdFn, + totalSize: totalSize, + } +} + func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { return masterClient.LookupFileId(fileId) } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - - var totalSize int64 - for _, chunk := range chunkViews { - totalSize += int64(chunk.Size) - } - - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, - totalSize: totalSize, - } + return doNewChunkStreamReader(lookupFileIdFn, chunks) } func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := LookupFn(filerClient) - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - - var totalSize int64 - for _, chunk := range chunkViews { - totalSize += int64(chunk.Size) - } - - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, - totalSize: totalSize, - } + return doNewChunkStreamReader(lookupFileIdFn, chunks) } func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { for n < len(p) { if c.isBufferEmpty() { - if c.chunkIndex >= len(c.chunkViews) { + if c.nextChunkViewIndex >= len(c.chunkViews) { return n, io.EOF } - chunkView := c.chunkViews[c.chunkIndex] + chunkView := c.chunkViews[c.nextChunkViewIndex] c.fetchChunkToBuffer(chunkView) - c.chunkIndex++ + c.nextChunkViewIndex++ } t := copy(p[n:], c.buffer[c.bufferPos:]) c.bufferPos += t @@ -173,16 +170,33 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { err = io.ErrUnexpectedEOF } - for i, chunk := range c.chunkViews { - if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) - c.chunkIndex = i + 1 - break - } + // stay in the same chunk + if !c.isBufferEmpty() { + if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { + c.bufferPos = int(offset - c.bufferOffset) + return offset, nil } } - c.bufferPos = int(offset - c.bufferOffset) + + // need to seek to a different chunk + currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { + return c.chunkViews[i].LogicOffset <= offset + }) + if currentChunkIndex == len(c.chunkViews) { + return 0, io.EOF + } + + // positioning within the new chunk + chunk := c.chunkViews[currentChunkIndex] + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { + c.fetchChunkToBuffer(chunk) + c.nextChunkViewIndex = currentChunkIndex + 1 + } + c.bufferPos = int(offset - c.bufferOffset) + } else { + return 0, io.ErrUnexpectedEOF + } return offset, err From de730b079da4d9e046c4816a4f74a19c4ddf6712 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Aug 2021 15:41:07 -0700 Subject: [PATCH 4/4] ChunkStreamReader implenents io.ReaderAt --- weed/filer/stream.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 2ea8ce493..3859f9a67 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -99,6 +99,7 @@ type ChunkStreamReader struct { } var _ = io.ReadSeeker(&ChunkStreamReader{}) +var _ = io.ReaderAt(&ChunkStreamReader{}) func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { @@ -135,6 +136,14 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F return doNewChunkStreamReader(lookupFileIdFn, chunks) } +func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { + _, err = c.Seek(off, io.SeekStart) + if err != nil { + return + } + return c.Read(p) +} + func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { for n < len(p) { if c.isBufferEmpty() {