refactoring SeaweedInputStream

This commit is contained in:
Chris Lu 2021-01-30 06:16:02 -08:00
parent 043c2d7960
commit 6f4aab51f9
7 changed files with 317 additions and 343 deletions

View file

@ -1,28 +1,22 @@
package seaweed.hdfs;
package seaweedfs.client;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
public class SeaweedInputStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!");
private final FilerGrpcClient filerGrpcClient;
private final Statistics statistics;
private final String path;
private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
@ -34,11 +28,9 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient,
final Statistics statistics,
final String path,
final FilerProto.Entry entry) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry);
@ -86,7 +78,6 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
}
// implement ByteBufferReadable
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
if (position < 0) {
@ -111,45 +102,32 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
if (bytesRead > 0) {
this.position += bytesRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
}
return (int) bytesRead;
}
/**
* Seek to given position in stream.
*
* @param n position to seek to
* @throws IOException if there is an error
* @throws EOFException if attempting to seek past end of file
*/
@Override
public synchronized void seek(long n) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
throw EXCEPTION_STREAM_IS_CLOSED;
}
if (n < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
throw new EOFException("Cannot seek to a negative offset");
}
if (n > contentLength) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
throw new EOFException("Attempted to seek or read past the end of the file");
}
this.position = n;
}
@Override
public synchronized long skip(long n) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
throw EXCEPTION_STREAM_IS_CLOSED;
}
if (this.position == contentLength) {
if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
throw new EOFException("Attempted to seek or read past the end of the file");
}
}
long newPos = this.position + n;
@ -177,10 +155,9 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
@Override
public synchronized int available() throws IOException {
if (closed) {
throw new IOException(
FSExceptionMessages.STREAM_IS_CLOSED);
throw EXCEPTION_STREAM_IS_CLOSED;
}
final long remaining = this.contentLength - this.getPos();
final long remaining = this.contentLength - this.position;
return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE;
}
@ -195,65 +172,21 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
*/
public long length() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
throw EXCEPTION_STREAM_IS_CLOSED;
}
return contentLength;
}
/**
* Return the current offset from the start of the file
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public synchronized long getPos() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
throw EXCEPTION_STREAM_IS_CLOSED;
}
return position;
}
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized void close() throws IOException {
closed = true;
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*
* @param readlimit ignored
*/
@Override
public synchronized void mark(int readlimit) {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*/
@Override
public synchronized void reset() throws IOException {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
*
* @return always {@code false}
*/
@Override
public boolean markSupported() {
return false;
}
}

View file

@ -1,6 +1,6 @@
package com.seaweedfs.examples;
import seaweed.hdfs.SeaweedInputStream;
import seaweed.hdfs.SeaweedHadoopInputStream;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerGrpcClient;
@ -24,7 +24,7 @@ public class UnzipFile {
long localProcessTime = startTime2 - startTime;
SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
SeaweedHadoopInputStream seaweedInputStream = new SeaweedHadoopInputStream(
filerGrpcClient,
new org.apache.hadoop.fs.FileSystem.Statistics(""),
"/",

View file

@ -230,7 +230,7 @@ public class SeaweedFileSystemStore {
throw new FileNotFoundException("read non-exist file " + path);
}
return new SeaweedInputStream(filerGrpcClient,
return new SeaweedHadoopInputStream(filerGrpcClient,
statistics,
path.toUri().getPath(),
entry);

View file

@ -0,0 +1,150 @@
package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
private final SeaweedInputStream seaweedInputStream;
private final Statistics statistics;
public SeaweedHadoopInputStream(
final FilerGrpcClient filerGrpcClient,
final Statistics statistics,
final String path,
final FilerProto.Entry entry) throws IOException {
this.seaweedInputStream = new SeaweedInputStream(filerGrpcClient, path, entry);
this.statistics = statistics;
}
@Override
public int read() throws IOException {
return seaweedInputStream.read();
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
return seaweedInputStream.read(b, off, len);
}
// implement ByteBufferReadable
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
int bytesRead = seaweedInputStream.read(buf);
if (bytesRead > 0) {
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
}
return bytesRead;
}
/**
* Seek to given position in stream.
*
* @param n position to seek to
* @throws IOException if there is an error
* @throws EOFException if attempting to seek past end of file
*/
@Override
public synchronized void seek(long n) throws IOException {
seaweedInputStream.seek(n);
}
@Override
public synchronized long skip(long n) throws IOException {
return seaweedInputStream.skip(n);
}
/**
* Return the size of the remaining available bytes
* if the size is less than or equal to {@link Integer#MAX_VALUE},
* otherwise, return {@link Integer#MAX_VALUE}.
* <p>
* This is to match the behavior of DFSInputStream.available(),
* which some clients may rely on (HBase write-ahead log reading in
* particular).
*/
@Override
public synchronized int available() throws IOException {
return seaweedInputStream.available();
}
/**
* Returns the length of the file that this stream refers to. Note that the length returned is the length
* as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
* @throws IOException if the stream is closed
*/
public long length() throws IOException {
return seaweedInputStream.length();
}
/**
* Return the current offset from the start of the file
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public synchronized long getPos() throws IOException {
return seaweedInputStream.getPos();
}
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized void close() throws IOException {
seaweedInputStream.close();
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*
* @param readlimit ignored
*/
@Override
public synchronized void mark(int readlimit) {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*/
@Override
public synchronized void reset() throws IOException {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
*
* @return always {@code false}
*/
@Override
public boolean markSupported() {
return false;
}
}

View file

@ -230,7 +230,7 @@ public class SeaweedFileSystemStore {
throw new FileNotFoundException("read non-exist file " + path);
}
return new SeaweedInputStream(filerGrpcClient,
return new SeaweedHadoopInputStream(filerGrpcClient,
statistics,
path.toUri().getPath(),
entry);

View file

@ -0,0 +1,150 @@
package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
private final SeaweedInputStream seaweedInputStream;
private final Statistics statistics;
public SeaweedHadoopInputStream(
final FilerGrpcClient filerGrpcClient,
final Statistics statistics,
final String path,
final FilerProto.Entry entry) throws IOException {
this.seaweedInputStream = new SeaweedInputStream(filerGrpcClient, path, entry);
this.statistics = statistics;
}
@Override
public int read() throws IOException {
return seaweedInputStream.read();
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
return seaweedInputStream.read(b, off, len);
}
// implement ByteBufferReadable
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
int bytesRead = seaweedInputStream.read(buf);
if (bytesRead > 0) {
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
}
return bytesRead;
}
/**
* Seek to given position in stream.
*
* @param n position to seek to
* @throws IOException if there is an error
* @throws EOFException if attempting to seek past end of file
*/
@Override
public synchronized void seek(long n) throws IOException {
seaweedInputStream.seek(n);
}
@Override
public synchronized long skip(long n) throws IOException {
return seaweedInputStream.skip(n);
}
/**
* Return the size of the remaining available bytes
* if the size is less than or equal to {@link Integer#MAX_VALUE},
* otherwise, return {@link Integer#MAX_VALUE}.
* <p>
* This is to match the behavior of DFSInputStream.available(),
* which some clients may rely on (HBase write-ahead log reading in
* particular).
*/
@Override
public synchronized int available() throws IOException {
return seaweedInputStream.available();
}
/**
* Returns the length of the file that this stream refers to. Note that the length returned is the length
* as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
* @throws IOException if the stream is closed
*/
public long length() throws IOException {
return seaweedInputStream.length();
}
/**
* Return the current offset from the start of the file
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public synchronized long getPos() throws IOException {
return seaweedInputStream.getPos();
}
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized void close() throws IOException {
seaweedInputStream.close();
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*
* @param readlimit ignored
*/
@Override
public synchronized void mark(int readlimit) {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*/
@Override
public synchronized void reset() throws IOException {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
*
* @return always {@code false}
*/
@Override
public boolean markSupported() {
return false;
}
}

View file

@ -1,259 +0,0 @@
package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
private final FilerGrpcClient filerGrpcClient;
private final Statistics statistics;
private final String path;
private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength;
private long position = 0; // cursor of the file
private boolean closed = false;
public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient,
final Statistics statistics,
final String path,
final FilerProto.Entry entry) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry);
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
}
public String getPath() {
return path;
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
int numberOfBytesRead = read(b, 0, 1);
if (numberOfBytesRead < 0) {
return -1;
} else {
return (b[0] & 0xFF);
}
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
if (off >= b.length) {
throw new IllegalArgumentException("offset greater than length of array");
}
if (len < 0) {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (len > (b.length - off)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
return read(buf);
}
// implement ByteBufferReadable
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset");
}
if (position >= contentLength) {
return -1; // Hadoop prefers -1 to EOFException
}
long bytesRead = 0;
int len = buf.remaining();
int start = (int) this.position;
if (start+len <= entry.getContent().size()) {
entry.getContent().substring(start, start+len).copyTo(buf);
} else {
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
}
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
if (bytesRead > 0) {
this.position += bytesRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
}
return (int) bytesRead;
}
/**
* Seek to given position in stream.
*
* @param n position to seek to
* @throws IOException if there is an error
* @throws EOFException if attempting to seek past end of file
*/
@Override
public synchronized void seek(long n) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
if (n < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (n > contentLength) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
this.position = n;
}
@Override
public synchronized long skip(long n) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
if (this.position == contentLength) {
if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
}
long newPos = this.position + n;
if (newPos < 0) {
newPos = 0;
n = newPos - this.position;
}
if (newPos > contentLength) {
newPos = contentLength;
n = newPos - this.position;
}
seek(newPos);
return n;
}
/**
* Return the size of the remaining available bytes
* if the size is less than or equal to {@link Integer#MAX_VALUE},
* otherwise, return {@link Integer#MAX_VALUE}.
* <p>
* This is to match the behavior of DFSInputStream.available(),
* which some clients may rely on (HBase write-ahead log reading in
* particular).
*/
@Override
public synchronized int available() throws IOException {
if (closed) {
throw new IOException(
FSExceptionMessages.STREAM_IS_CLOSED);
}
final long remaining = this.contentLength - this.getPos();
return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE;
}
/**
* Returns the length of the file that this stream refers to. Note that the length returned is the length
* as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
* @throws IOException if the stream is closed
*/
public long length() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return contentLength;
}
/**
* Return the current offset from the start of the file
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public synchronized long getPos() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return position;
}
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized void close() throws IOException {
closed = true;
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*
* @param readlimit ignored
*/
@Override
public synchronized void mark(int readlimit) {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*/
@Override
public synchronized void reset() throws IOException {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
*
* @return always {@code false}
*/
@Override
public boolean markSupported() {
return false;
}
}