diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 3bec05796..7a94acbd0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -32,10 +32,10 @@ public class SeaweedOutputStream extends OutputStream { private long lastTotalAppendOffset = 0; private ByteBuffer buffer; private long outputIndex; - private String replication = "000"; + private String replication = ""; public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { - this(filerClient, fullpath, "000"); + this(filerClient, fullpath, ""); } public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) { diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 70a278ca5..3859f9a67 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "sort" "strings" "time" @@ -88,16 +89,36 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferPos int - chunkIndex int - lookupFileId wdclient.LookupFileIdFunctionType + chunkViews []*ChunkView + totalSize int64 + buffer []byte + bufferOffset int64 + bufferPos int + nextChunkViewIndex int + lookupFileId wdclient.LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) +var _ = io.ReaderAt(&ChunkStreamReader{}) + +func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + sort.Slice(chunkViews, func(i, j int) bool { + return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset + }) + + var totalSize int64 + for _, chunk := range chunkViews { + totalSize += int64(chunk.Size) + } + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: lookupFileIdFn, + totalSize: totalSize, + } +} func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { @@ -105,35 +126,33 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [ return masterClient.LookupFileId(fileId) } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, - } + return doNewChunkStreamReader(lookupFileIdFn, chunks) } func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := LookupFn(filerClient) - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + return doNewChunkStreamReader(lookupFileIdFn, chunks) +} - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, +func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { + _, err = c.Seek(off, io.SeekStart) + if err != nil { + return } + return c.Read(p) } func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { for n < len(p) { if c.isBufferEmpty() { - if c.chunkIndex >= len(c.chunkViews) { + if c.nextChunkViewIndex >= len(c.chunkViews) { return n, io.EOF } - chunkView := c.chunkViews[c.chunkIndex] + chunkView := c.chunkViews[c.nextChunkViewIndex] c.fetchChunkToBuffer(chunkView) - c.chunkIndex++ + c.nextChunkViewIndex++ } t := copy(p[n:], c.buffer[c.bufferPos:]) c.bufferPos += t @@ -148,33 +167,45 @@ func (c *ChunkStreamReader) isBufferEmpty() bool { func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { - var totalSize int64 - for _, chunk := range c.chunkViews { - totalSize += int64(chunk.Size) - } - var err error switch whence { case io.SeekStart: case io.SeekCurrent: offset += c.bufferOffset + int64(c.bufferPos) case io.SeekEnd: - offset = totalSize + offset + offset = c.totalSize + offset } - if offset > totalSize { + if offset > c.totalSize { err = io.ErrUnexpectedEOF } - for i, chunk := range c.chunkViews { - if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) - c.chunkIndex = i + 1 - break - } + // stay in the same chunk + if !c.isBufferEmpty() { + if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { + c.bufferPos = int(offset - c.bufferOffset) + return offset, nil } } - c.bufferPos = int(offset - c.bufferOffset) + + // need to seek to a different chunk + currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { + return c.chunkViews[i].LogicOffset <= offset + }) + if currentChunkIndex == len(c.chunkViews) { + return 0, io.EOF + } + + // positioning within the new chunk + chunk := c.chunkViews[currentChunkIndex] + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { + c.fetchChunkToBuffer(chunk) + c.nextChunkViewIndex = currentChunkIndex + 1 + } + c.bufferPos = int(offset - c.bufferOffset) + } else { + return 0, io.ErrUnexpectedEOF + } return offset, err