mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
add read ahead input stream
This commit is contained in:
parent
53190a9972
commit
b3089dcc8e
|
@ -0,0 +1,404 @@
|
|||
package seaweedfs.client;
|
||||
|
||||
/*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
// package org.apache.spark.io;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* {@link InputStream} implementation which asynchronously reads ahead from the underlying input
|
||||
* stream when specified amount of data has been read from the current buffer. It does it by
|
||||
* maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data
|
||||
* which should be returned when a read() call is issued. The read ahead buffer is used to
|
||||
* asynchronously read from the underlying input stream and once the current active buffer is
|
||||
* exhausted, we flip the two buffers so that we can start reading from the read ahead buffer
|
||||
* without being blocked in disk I/O.
|
||||
*/
|
||||
public class ReadAheadInputStream extends InputStream {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);
|
||||
|
||||
private ReentrantLock stateChangeLock = new ReentrantLock();
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
private ByteBuffer activeBuffer;
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
private ByteBuffer readAheadBuffer;
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
private boolean endOfStream;
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
// true if async read is in progress
|
||||
private boolean readInProgress;
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
// true if read is aborted due to an exception in reading from underlying input stream.
|
||||
private boolean readAborted;
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
private Throwable readException;
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
// whether the close method is called.
|
||||
private boolean isClosed;
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
// true when the close method will close the underlying input stream. This is valid only if
|
||||
// `isClosed` is true.
|
||||
private boolean isUnderlyingInputStreamBeingClosed;
|
||||
|
||||
@GuardedBy("stateChangeLock")
|
||||
// whether there is a read ahead task running,
|
||||
private boolean isReading;
|
||||
|
||||
// whether there is a reader waiting for data.
|
||||
private AtomicBoolean isWaiting = new AtomicBoolean(false);
|
||||
|
||||
private final InputStream underlyingInputStream;
|
||||
|
||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build()
|
||||
);
|
||||
|
||||
private final Condition asyncReadComplete = stateChangeLock.newCondition();
|
||||
|
||||
private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]);
|
||||
|
||||
/**
|
||||
* Creates a <code>ReadAheadInputStream</code> with the specified buffer size and read-ahead
|
||||
* threshold
|
||||
*
|
||||
* @param inputStream The underlying input stream.
|
||||
* @param bufferSizeInBytes The buffer size.
|
||||
*/
|
||||
public ReadAheadInputStream(
|
||||
InputStream inputStream, int bufferSizeInBytes) {
|
||||
Preconditions.checkArgument(bufferSizeInBytes > 0,
|
||||
"bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
|
||||
activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
|
||||
readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
|
||||
this.underlyingInputStream = inputStream;
|
||||
activeBuffer.flip();
|
||||
readAheadBuffer.flip();
|
||||
}
|
||||
|
||||
private boolean isEndOfStream() {
|
||||
return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream);
|
||||
}
|
||||
|
||||
private void checkReadException() throws IOException {
|
||||
if (readAborted) {
|
||||
Throwables.propagateIfPossible(readException, IOException.class);
|
||||
throw new IOException(readException);
|
||||
}
|
||||
}
|
||||
|
||||
/** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
|
||||
private void readAsync() throws IOException {
|
||||
stateChangeLock.lock();
|
||||
final byte[] arr = readAheadBuffer.array();
|
||||
try {
|
||||
if (endOfStream || readInProgress) {
|
||||
return;
|
||||
}
|
||||
checkReadException();
|
||||
readAheadBuffer.position(0);
|
||||
readAheadBuffer.flip();
|
||||
readInProgress = true;
|
||||
} finally {
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
executorService.execute(() -> {
|
||||
stateChangeLock.lock();
|
||||
try {
|
||||
if (isClosed) {
|
||||
readInProgress = false;
|
||||
return;
|
||||
}
|
||||
// Flip this so that the close method will not close the underlying input stream when we
|
||||
// are reading.
|
||||
isReading = true;
|
||||
} finally {
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
|
||||
// Please note that it is safe to release the lock and read into the read ahead buffer
|
||||
// because either of following two conditions will hold - 1. The active buffer has
|
||||
// data available to read so the reader will not read from the read ahead buffer.
|
||||
// 2. This is the first time read is called or the active buffer is exhausted,
|
||||
// in that case the reader waits for this async read to complete.
|
||||
// So there is no race condition in both the situations.
|
||||
int read = 0;
|
||||
int off = 0, len = arr.length;
|
||||
Throwable exception = null;
|
||||
try {
|
||||
// try to fill the read ahead buffer.
|
||||
// if a reader is waiting, possibly return early.
|
||||
do {
|
||||
read = underlyingInputStream.read(arr, off, len);
|
||||
if (read <= 0) break;
|
||||
off += read;
|
||||
len -= read;
|
||||
} while (len > 0 && !isWaiting.get());
|
||||
} catch (Throwable ex) {
|
||||
exception = ex;
|
||||
if (ex instanceof Error) {
|
||||
// `readException` may not be reported to the user. Rethrow Error to make sure at least
|
||||
// The user can see Error in UncaughtExceptionHandler.
|
||||
throw (Error) ex;
|
||||
}
|
||||
} finally {
|
||||
stateChangeLock.lock();
|
||||
readAheadBuffer.limit(off);
|
||||
if (read < 0 || (exception instanceof EOFException)) {
|
||||
endOfStream = true;
|
||||
} else if (exception != null) {
|
||||
readAborted = true;
|
||||
readException = exception;
|
||||
}
|
||||
readInProgress = false;
|
||||
signalAsyncReadComplete();
|
||||
stateChangeLock.unlock();
|
||||
closeUnderlyingInputStreamIfNecessary();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void closeUnderlyingInputStreamIfNecessary() {
|
||||
boolean needToCloseUnderlyingInputStream = false;
|
||||
stateChangeLock.lock();
|
||||
try {
|
||||
isReading = false;
|
||||
if (isClosed && !isUnderlyingInputStreamBeingClosed) {
|
||||
// close method cannot close underlyingInputStream because we were reading.
|
||||
needToCloseUnderlyingInputStream = true;
|
||||
}
|
||||
} finally {
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
if (needToCloseUnderlyingInputStream) {
|
||||
try {
|
||||
underlyingInputStream.close();
|
||||
} catch (IOException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void signalAsyncReadComplete() {
|
||||
stateChangeLock.lock();
|
||||
try {
|
||||
asyncReadComplete.signalAll();
|
||||
} finally {
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForAsyncReadComplete() throws IOException {
|
||||
stateChangeLock.lock();
|
||||
isWaiting.set(true);
|
||||
try {
|
||||
// There is only one reader, and one writer, so the writer should signal only once,
|
||||
// but a while loop checking the wake up condition is still needed to avoid spurious wakeups.
|
||||
while (readInProgress) {
|
||||
asyncReadComplete.await();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
InterruptedIOException iio = new InterruptedIOException(e.getMessage());
|
||||
iio.initCause(e);
|
||||
throw iio;
|
||||
} finally {
|
||||
isWaiting.set(false);
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
checkReadException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
if (activeBuffer.hasRemaining()) {
|
||||
// short path - just get one byte.
|
||||
return activeBuffer.get() & 0xFF;
|
||||
} else {
|
||||
byte[] oneByteArray = oneByte.get();
|
||||
return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int offset, int len) throws IOException {
|
||||
if (offset < 0 || len < 0 || len > b.length - offset) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
}
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!activeBuffer.hasRemaining()) {
|
||||
// No remaining in active buffer - lock and switch to write ahead buffer.
|
||||
stateChangeLock.lock();
|
||||
try {
|
||||
waitForAsyncReadComplete();
|
||||
if (!readAheadBuffer.hasRemaining()) {
|
||||
// The first read.
|
||||
readAsync();
|
||||
waitForAsyncReadComplete();
|
||||
if (isEndOfStream()) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
// Swap the newly read read ahead buffer in place of empty active buffer.
|
||||
swapBuffers();
|
||||
// After swapping buffers, trigger another async read for read ahead buffer.
|
||||
readAsync();
|
||||
} finally {
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
}
|
||||
len = Math.min(len, activeBuffer.remaining());
|
||||
activeBuffer.get(b, offset, len);
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
/**
|
||||
* flip the active and read ahead buffer
|
||||
*/
|
||||
private void swapBuffers() {
|
||||
ByteBuffer temp = activeBuffer;
|
||||
activeBuffer = readAheadBuffer;
|
||||
readAheadBuffer = temp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
stateChangeLock.lock();
|
||||
// Make sure we have no integer overflow.
|
||||
try {
|
||||
return (int) Math.min((long) Integer.MAX_VALUE,
|
||||
(long) activeBuffer.remaining() + readAheadBuffer.remaining());
|
||||
} finally {
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long skip(long n) throws IOException {
|
||||
if (n <= 0L) {
|
||||
return 0L;
|
||||
}
|
||||
if (n <= activeBuffer.remaining()) {
|
||||
// Only skipping from active buffer is sufficient
|
||||
activeBuffer.position((int) n + activeBuffer.position());
|
||||
return n;
|
||||
}
|
||||
stateChangeLock.lock();
|
||||
long skipped;
|
||||
try {
|
||||
skipped = skipInternal(n);
|
||||
} finally {
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
return skipped;
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal skip function which should be called only from skip() api. The assumption is that
|
||||
* the stateChangeLock is already acquired in the caller before calling this function.
|
||||
*/
|
||||
private long skipInternal(long n) throws IOException {
|
||||
assert (stateChangeLock.isLocked());
|
||||
waitForAsyncReadComplete();
|
||||
if (isEndOfStream()) {
|
||||
return 0;
|
||||
}
|
||||
if (available() >= n) {
|
||||
// we can skip from the internal buffers
|
||||
int toSkip = (int) n;
|
||||
// We need to skip from both active buffer and read ahead buffer
|
||||
toSkip -= activeBuffer.remaining();
|
||||
assert(toSkip > 0); // skipping from activeBuffer already handled.
|
||||
activeBuffer.position(0);
|
||||
activeBuffer.flip();
|
||||
readAheadBuffer.position(toSkip + readAheadBuffer.position());
|
||||
swapBuffers();
|
||||
// Trigger async read to emptied read ahead buffer.
|
||||
readAsync();
|
||||
return n;
|
||||
} else {
|
||||
int skippedBytes = available();
|
||||
long toSkip = n - skippedBytes;
|
||||
activeBuffer.position(0);
|
||||
activeBuffer.flip();
|
||||
readAheadBuffer.position(0);
|
||||
readAheadBuffer.flip();
|
||||
long skippedFromInputStream = underlyingInputStream.skip(toSkip);
|
||||
readAsync();
|
||||
return skippedBytes + skippedFromInputStream;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
boolean isSafeToCloseUnderlyingInputStream = false;
|
||||
stateChangeLock.lock();
|
||||
try {
|
||||
if (isClosed) {
|
||||
return;
|
||||
}
|
||||
isClosed = true;
|
||||
if (!isReading) {
|
||||
// Nobody is reading, so we can close the underlying input stream in this method.
|
||||
isSafeToCloseUnderlyingInputStream = true;
|
||||
// Flip this to make sure the read ahead task will not close the underlying input stream.
|
||||
isUnderlyingInputStreamBeingClosed = true;
|
||||
}
|
||||
} finally {
|
||||
stateChangeLock.unlock();
|
||||
}
|
||||
|
||||
try {
|
||||
executorService.shutdownNow();
|
||||
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
InterruptedIOException iio = new InterruptedIOException(e.getMessage());
|
||||
iio.initCause(e);
|
||||
throw iio;
|
||||
} finally {
|
||||
if (isSafeToCloseUnderlyingInputStream) {
|
||||
underlyingInputStream.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,6 +11,7 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import seaweedfs.client.FilerProto;
|
||||
import seaweedfs.client.ReadAheadInputStream;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -76,7 +77,7 @@ public class SeaweedFileSystem extends FileSystem {
|
|||
|
||||
try {
|
||||
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
|
||||
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
|
||||
return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024));
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
|
||||
return null;
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import seaweedfs.client.FilerProto;
|
||||
import seaweedfs.client.ReadAheadInputStream;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -76,7 +77,7 @@ public class SeaweedFileSystem extends FileSystem {
|
|||
|
||||
try {
|
||||
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
|
||||
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
|
||||
return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024));
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
|
||||
return null;
|
||||
|
|
Loading…
Reference in a new issue