Hadoop: add BufferedByteBufferReadableInputStream

fix https://github.com/chrislusf/seaweedfs/issues/1645
This commit is contained in:
Chris Lu 2020-12-03 00:08:05 -08:00
parent 3857f9c840
commit 4d2855476c
5 changed files with 53 additions and 9 deletions

View file

@ -0,0 +1,25 @@
package seaweed.hdfs;
import org.apache.hadoop.fs.*;
import java.io.IOException;
import java.nio.ByteBuffer;
public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
super(in, size);
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
}
}
@Override
public int read(ByteBuffer buf) throws IOException {
if (this.in instanceof ByteBufferReadable) {
return ((ByteBufferReadable)this.in).read(buf);
} else {
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
}
}
}

View file

@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;

View file

@ -0,0 +1,25 @@
package seaweed.hdfs;
import org.apache.hadoop.fs.*;
import java.io.IOException;
import java.nio.ByteBuffer;
public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
super(in, size);
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
}
}
@Override
public int read(ByteBuffer buf) throws IOException {
if (this.in instanceof ByteBufferReadable) {
return ((ByteBufferReadable)this.in).read(buf);
} else {
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
}
}
}

View file

@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;

View file

@ -65,14 +65,8 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
}
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
public int read(final byte[] b, final int off, final int len) throws IOException {
if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset");
}
if (position >= contentLength) {
return -1; // Hadoop prefers -1 to EOFException
}
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}