simplify inputstream

This commit is contained in:
Chris Lu 2020-07-16 10:54:29 -07:00
parent c9b50e8a22
commit 2629da2cb9
12 changed files with 58 additions and 1576 deletions

View file

@ -1,137 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBuffer {
private SeaweedInputStream stream;
private long offset; // offset within the file for the buffer
private int length; // actual length, set after the buffer is filles
private int requestedLength; // requested length of the read
private byte[] buffer; // the buffer itself
private int bufferindex = -1; // index in the buffers array in Buffer manager
private ReadBufferStatus status; // status of the buffer
private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
// waiting on this buffer gets unblocked
// fields to help with eviction logic
private long timeStamp = 0; // tick at which buffer became available to read
private boolean isFirstByteConsumed = false;
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;
public SeaweedInputStream getStream() {
return stream;
}
public void setStream(SeaweedInputStream stream) {
this.stream = stream;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public int getRequestedLength() {
return requestedLength;
}
public void setRequestedLength(int requestedLength) {
this.requestedLength = requestedLength;
}
public byte[] getBuffer() {
return buffer;
}
public void setBuffer(byte[] buffer) {
this.buffer = buffer;
}
public int getBufferindex() {
return bufferindex;
}
public void setBufferindex(int bufferindex) {
this.bufferindex = bufferindex;
}
public ReadBufferStatus getStatus() {
return status;
}
public void setStatus(ReadBufferStatus status) {
this.status = status;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public boolean isFirstByteConsumed() {
return isFirstByteConsumed;
}
public void setFirstByteConsumed(boolean isFirstByteConsumed) {
this.isFirstByteConsumed = isFirstByteConsumed;
}
public boolean isLastByteConsumed() {
return isLastByteConsumed;
}
public void setLastByteConsumed(boolean isLastByteConsumed) {
this.isLastByteConsumed = isLastByteConsumed;
}
public boolean isAnyByteConsumed() {
return isAnyByteConsumed;
}
public void setAnyByteConsumed(boolean isAnyByteConsumed) {
this.isAnyByteConsumed = isAnyByteConsumed;
}
}

View file

@ -1,394 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
/**
* The Read Buffer Manager for Rest AbfsClient.
*/
final class ReadBufferManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
static {
BUFFER_MANAGER = new ReadBufferManager();
BUFFER_MANAGER.init();
}
static ReadBufferManager getBufferManager() {
return BUFFER_MANAGER;
}
private void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
freeList.add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
Thread t = new Thread(new ReadBufferWorker(i));
t.setDaemon(true);
threads[i] = t;
t.setName("SeaweedFS-prefetch-" + i);
t.start();
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
}
// hide instance constructor
private ReadBufferManager() {
}
/*
*
* SeaweedInputStream-facing methods
*
*/
/**
* {@link SeaweedInputStream} calls this method to queue read-aheads.
*
* @param stream The {@link SeaweedInputStream} for which to do the read-ahead
* @param requestedOffset The offset in the file which shoukd be read
* @param requestedLength The length to read
*/
void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
stream.getPath(), requestedOffset, requestedLength);
}
ReadBuffer buffer;
synchronized (this) {
if (isAlreadyQueued(stream, requestedOffset)) {
return; // already queued, do not queue again
}
if (freeList.isEmpty() && !tryEvict()) {
return; // no buffers available, cannot queue anything
}
buffer = new ReadBuffer();
buffer.setStream(stream);
buffer.setOffset(requestedOffset);
buffer.setLength(0);
buffer.setRequestedLength(requestedLength);
buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
buffer.setLatch(new CountDownLatch(1));
Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
buffer.setBuffer(buffers[bufferIndex]);
buffer.setBufferindex(bufferIndex);
readAheadQueue.add(buffer);
notifyAll();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
}
}
/**
* {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
* remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
* the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
* but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
* depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
* read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
*
* @param stream the file to read bytes for
* @param position the offset in the file to do a read for
* @param length the length to read
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}",
stream.getPath(), position, Thread.currentThread().getName());
}
waitForProcess(stream, position);
int bytesRead = 0;
synchronized (this) {
bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
}
if (bytesRead > 0) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done read from Cache for {} position {} length {}",
stream.getPath(), position, bytesRead);
}
return bytesRead;
}
// otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/*
*
* Internal methods
*
*/
private void waitForProcess(final SeaweedInputStream stream, final long position) {
ReadBuffer readBuf;
synchronized (this) {
clearFromReadAheadQueue(stream, position);
readBuf = getFromList(inProgressList, stream, position);
}
if (readBuf != null) { // if in in-progress queue, then block for it
try {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
}
readBuf.getLatch().await(); // blocking wait on the caller stream's thread
// Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
// is done processing it (in doneReading). There, the latch is set after removing the buffer from
// inProgressList. So this latch is safe to be outside the synchronized block.
// Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
// while waiting, so no one will be able to change any state. If this becomes more complex in the future,
// then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("latch done for file {} buffer idx {} length {}",
stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
}
}
}
/**
* If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
* The objective is to find just one buffer - there is no advantage to evicting more than one.
*
* @return whether the eviction succeeeded - i.e., were we able to free up one buffer
*/
private synchronized boolean tryEvict() {
ReadBuffer nodeToEvict = null;
if (completedReadList.size() <= 0) {
return false; // there are no evict-able buffers
}
// first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
for (ReadBuffer buf : completedReadList) {
if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
for (ReadBuffer buf : completedReadList) {
if (buf.isAnyByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try any old nodes that have not been consumed
long earliestBirthday = Long.MAX_VALUE;
for (ReadBuffer buf : completedReadList) {
if (buf.getTimeStamp() < earliestBirthday) {
nodeToEvict = buf;
earliestBirthday = buf.getTimeStamp();
}
}
if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
return evict(nodeToEvict);
}
// nothing can be evicted
return false;
}
private boolean evict(final ReadBuffer buf) {
freeList.push(buf.getBufferindex());
completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
}
return true;
}
private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
// returns true if any part of the buffer is already queued
return (isInList(readAheadQueue, stream, requestedOffset)
|| isInList(inProgressList, stream, requestedOffset)
|| isInList(completedReadList, stream, requestedOffset));
}
private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
return (getFromList(list, stream, requestedOffset) != null);
}
private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : list) {
if (buffer.getStream() == stream) {
if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
&& requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getLength()) {
return buffer;
} else if (requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
return buffer;
}
}
}
return null;
}
private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) {
readAheadQueue.remove(buffer);
notifyAll(); // lock is held in calling method
freeList.push(buffer.getBufferindex());
}
}
private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
final byte[] buffer) {
ReadBuffer buf = getFromList(completedReadList, stream, position);
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
return 0;
}
int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
if (cursor == 0) {
buf.setFirstByteConsumed(true);
}
if (cursor + lengthToCopy == buf.getLength()) {
buf.setLastByteConsumed(true);
}
buf.setAnyByteConsumed(true);
return lengthToCopy;
}
/*
*
* ReadBufferWorker-thread-facing methods
*
*/
/**
* ReadBufferWorker thread calls this to get the next buffer that it should work on.
*
* @return {@link ReadBuffer}
* @throws InterruptedException if thread is interrupted
*/
ReadBuffer getNextBlockToRead() throws InterruptedException {
ReadBuffer buffer = null;
synchronized (this) {
//buffer = readAheadQueue.take(); // blocking method
while (readAheadQueue.size() == 0) {
wait();
}
buffer = readAheadQueue.remove();
notifyAll();
if (buffer == null) {
return null; // should never happen
}
buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
inProgressList.add(buffer);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
buffer.getStream().getPath(), buffer.getOffset());
}
return buffer;
}
/**
* ReadBufferWorker thread calls this method to post completion.
*
* @param buffer the buffer whose read was completed
* @param result the {@link ReadBufferStatus} after the read operation in the worker thread
* @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
}
synchronized (this) {
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else {
freeList.push(buffer.getBufferindex());
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
}
}
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
* Similar to System.currentTimeMillis, except implemented with System.nanoTime().
* System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
* making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
* Note: it is not monotonic across Sockets, and even within a CPU, its only the
* more recent parts which share a clock across all cores.
*
* @return current time in milliseconds
*/
private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}
}

View file

@ -1,29 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
/**
* The ReadBufferStatus for Rest AbfsClient
*/
public enum ReadBufferStatus {
NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
AVAILABLE, // data is available in buffer. It should be in completedList
READ_FAILED // read completed, but failed.
}

View file

@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
private int id;
ReadBufferWorker(final int id) {
this.id = id;
}
/**
* return the ID of ReadBufferWorker.
*/
public int getId() {
return this.id;
}
/**
* Waits until a buffer becomes available in ReadAheadQueue.
* Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
* Rinse and repeat. Forever.
*/
public void run() {
try {
UNLEASH_WORKERS.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
ReadBuffer buffer;
while (true) {
try {
buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
if (buffer != null) {
try {
// do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
}
}
}
}
}

View file

@ -212,7 +212,6 @@ public class SeaweedFileSystemStore {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path); FilerProto.Entry entry = lookupEntry(path);
if (entry == null) { if (entry == null) {
@ -223,8 +222,7 @@ public class SeaweedFileSystemStore {
statistics, statistics,
path.toUri().getPath(), path.toUri().getPath(),
entry, entry,
bufferSize, bufferSize);
readAheadQueueDepth);
} }
public void setOwner(Path path, String owner, String group) { public void setOwner(Path path, String owner, String group) {

View file

@ -27,33 +27,23 @@ public class SeaweedInputStream extends FSInputStream {
private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength; private final long contentLength;
private final int bufferSize; // default buffer size private final int bufferSize; // default buffer size
private final int readAheadQueueDepth; // initialized in constructor
private final boolean readAheadEnabled; // whether enable readAhead;
private byte[] buffer = null; // will be initialized on first use private long position = 0; // cursor of the file
private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
private long fCursorAfterLastRead = -1;
private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
// of valid bytes in buffer)
private boolean closed = false; private boolean closed = false;
public SeaweedInputStream( public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient, final FilerGrpcClient filerGrpcClient,
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final FilerProto.Entry entry, final FilerProto.Entry entry,
final int bufferSize, final int bufferSize) {
final int readAheadQueueDepth) {
this.filerGrpcClient = filerGrpcClient; this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
this.entry = entry; this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream {
@Override @Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException { public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
currentOff += lastReadBytes;
currentLen -= lastReadBytes;
totalReadBytes += lastReadBytes;
}
if (currentLen <= 0 || currentLen > b.length - currentOff) {
break;
}
} while (lastReadBytes > 0);
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Preconditions.checkNotNull(b);
if (len == 0) {
return 0;
}
if (this.available() == 0) {
return -1;
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
//If buffer is empty, then fill the buffer.
if (bCursor == limit) {
//If EOF, then return -1
if (fCursor >= contentLength) {
return -1;
}
long bytesRead = 0;
//reset buffer to initial state - i.e., throw away existing data
bCursor = 0;
limit = 0;
if (buffer == null) {
buffer = new byte[bufferSize];
}
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
}
if (bytesRead == -1) {
return -1;
}
limit += bytesRead;
fCursor += bytesRead;
fCursorAfterLastRead = fCursor;
}
//If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
//(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor;
int bytesToRead = Math.min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead);
bCursor += bytesToRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
return bytesToRead;
}
private int readInternal(final long position, final byte[] b, final int offset, final int length,
final boolean bypassReadAhead) throws IOException {
if (readAheadEnabled && !bypassReadAhead) {
// try reading from read-ahead
if (offset != 0) {
throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
}
int receivedBytes;
// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
}
// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
return receivedBytes;
}
// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
return readRemote(position, b, offset, length);
}
}
int readRemote(long position, byte[] b, int offset, int length) throws IOException {
if (position < 0) { if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset"); throw new IllegalArgumentException("attempting to read from negative offset");
} }
@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream {
if (b == null) { if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method"); throw new IllegalArgumentException("null byte array passed in to read() method");
} }
if (offset >= b.length) { if (off >= b.length) {
throw new IllegalArgumentException("offset greater than length of array"); throw new IllegalArgumentException("offset greater than length of array");
} }
if (length < 0) { if (len < 0) {
throw new IllegalArgumentException("requested read length is less than zero"); throw new IllegalArgumentException("requested read length is less than zero");
} }
if (length > (b.length - offset)) { if (len > (b.length - off)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
} }
long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
if (bytesRead > Integer.MAX_VALUE) { if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length"); throw new IOException("Unexpected Content-Length");
} }
return (int) bytesRead;
if (bytesRead > 0) {
this.position += bytesRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
}
return (int)bytesRead;
} }
/** /**
@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
} }
if (n >= fCursor - limit && n <= fCursor) { // within buffer this.position = n;
bCursor = (int) (n - (fCursor - limit));
return;
}
// next read will read from here
fCursor = n;
//invalidate buffer
limit = 0;
bCursor = 0;
} }
@Override @Override
@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) { if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
} }
long currentPos = getPos(); if (this.position == contentLength) {
if (currentPos == contentLength) {
if (n > 0) { if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
} }
} }
long newPos = currentPos + n; long newPos = this.position + n;
if (newPos < 0) { if (newPos < 0) {
newPos = 0; newPos = 0;
n = newPos - currentPos; n = newPos - this.position;
} }
if (newPos > contentLength) { if (newPos > contentLength) {
newPos = contentLength; newPos = contentLength;
n = newPos - currentPos; n = newPos - this.position;
} }
seek(newPos); seek(newPos);
return n; return n;
@ -289,11 +163,11 @@ public class SeaweedInputStream extends FSInputStream {
public synchronized int available() throws IOException { public synchronized int available() throws IOException {
if (closed) { if (closed) {
throw new IOException( throw new IOException(
FSExceptionMessages.STREAM_IS_CLOSED); FSExceptionMessages.STREAM_IS_CLOSED);
} }
final long remaining = this.contentLength - this.getPos(); final long remaining = this.contentLength - this.getPos();
return remaining <= Integer.MAX_VALUE return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE; ? (int) remaining : Integer.MAX_VALUE;
} }
/** /**
@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) { if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
} }
return fCursor - limit + bCursor; return position;
} }
/** /**
@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
closed = true; closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
} }
/** /**

View file

@ -1,137 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBuffer {
private SeaweedInputStream stream;
private long offset; // offset within the file for the buffer
private int length; // actual length, set after the buffer is filles
private int requestedLength; // requested length of the read
private byte[] buffer; // the buffer itself
private int bufferindex = -1; // index in the buffers array in Buffer manager
private ReadBufferStatus status; // status of the buffer
private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
// waiting on this buffer gets unblocked
// fields to help with eviction logic
private long timeStamp = 0; // tick at which buffer became available to read
private boolean isFirstByteConsumed = false;
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;
public SeaweedInputStream getStream() {
return stream;
}
public void setStream(SeaweedInputStream stream) {
this.stream = stream;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public int getRequestedLength() {
return requestedLength;
}
public void setRequestedLength(int requestedLength) {
this.requestedLength = requestedLength;
}
public byte[] getBuffer() {
return buffer;
}
public void setBuffer(byte[] buffer) {
this.buffer = buffer;
}
public int getBufferindex() {
return bufferindex;
}
public void setBufferindex(int bufferindex) {
this.bufferindex = bufferindex;
}
public ReadBufferStatus getStatus() {
return status;
}
public void setStatus(ReadBufferStatus status) {
this.status = status;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public boolean isFirstByteConsumed() {
return isFirstByteConsumed;
}
public void setFirstByteConsumed(boolean isFirstByteConsumed) {
this.isFirstByteConsumed = isFirstByteConsumed;
}
public boolean isLastByteConsumed() {
return isLastByteConsumed;
}
public void setLastByteConsumed(boolean isLastByteConsumed) {
this.isLastByteConsumed = isLastByteConsumed;
}
public boolean isAnyByteConsumed() {
return isAnyByteConsumed;
}
public void setAnyByteConsumed(boolean isAnyByteConsumed) {
this.isAnyByteConsumed = isAnyByteConsumed;
}
}

View file

@ -1,394 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
/**
* The Read Buffer Manager for Rest AbfsClient.
*/
final class ReadBufferManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
static {
BUFFER_MANAGER = new ReadBufferManager();
BUFFER_MANAGER.init();
}
static ReadBufferManager getBufferManager() {
return BUFFER_MANAGER;
}
private void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
freeList.add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
Thread t = new Thread(new ReadBufferWorker(i));
t.setDaemon(true);
threads[i] = t;
t.setName("SeaweedFS-prefetch-" + i);
t.start();
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
}
// hide instance constructor
private ReadBufferManager() {
}
/*
*
* SeaweedInputStream-facing methods
*
*/
/**
* {@link SeaweedInputStream} calls this method to queue read-aheads.
*
* @param stream The {@link SeaweedInputStream} for which to do the read-ahead
* @param requestedOffset The offset in the file which shoukd be read
* @param requestedLength The length to read
*/
void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
stream.getPath(), requestedOffset, requestedLength);
}
ReadBuffer buffer;
synchronized (this) {
if (isAlreadyQueued(stream, requestedOffset)) {
return; // already queued, do not queue again
}
if (freeList.isEmpty() && !tryEvict()) {
return; // no buffers available, cannot queue anything
}
buffer = new ReadBuffer();
buffer.setStream(stream);
buffer.setOffset(requestedOffset);
buffer.setLength(0);
buffer.setRequestedLength(requestedLength);
buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
buffer.setLatch(new CountDownLatch(1));
Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
buffer.setBuffer(buffers[bufferIndex]);
buffer.setBufferindex(bufferIndex);
readAheadQueue.add(buffer);
notifyAll();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
}
}
/**
* {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
* remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
* the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
* but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
* depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
* read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
*
* @param stream the file to read bytes for
* @param position the offset in the file to do a read for
* @param length the length to read
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}",
stream.getPath(), position, Thread.currentThread().getName());
}
waitForProcess(stream, position);
int bytesRead = 0;
synchronized (this) {
bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
}
if (bytesRead > 0) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done read from Cache for {} position {} length {}",
stream.getPath(), position, bytesRead);
}
return bytesRead;
}
// otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/*
*
* Internal methods
*
*/
private void waitForProcess(final SeaweedInputStream stream, final long position) {
ReadBuffer readBuf;
synchronized (this) {
clearFromReadAheadQueue(stream, position);
readBuf = getFromList(inProgressList, stream, position);
}
if (readBuf != null) { // if in in-progress queue, then block for it
try {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
}
readBuf.getLatch().await(); // blocking wait on the caller stream's thread
// Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
// is done processing it (in doneReading). There, the latch is set after removing the buffer from
// inProgressList. So this latch is safe to be outside the synchronized block.
// Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
// while waiting, so no one will be able to change any state. If this becomes more complex in the future,
// then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("latch done for file {} buffer idx {} length {}",
stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
}
}
}
/**
* If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
* The objective is to find just one buffer - there is no advantage to evicting more than one.
*
* @return whether the eviction succeeeded - i.e., were we able to free up one buffer
*/
private synchronized boolean tryEvict() {
ReadBuffer nodeToEvict = null;
if (completedReadList.size() <= 0) {
return false; // there are no evict-able buffers
}
// first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
for (ReadBuffer buf : completedReadList) {
if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
for (ReadBuffer buf : completedReadList) {
if (buf.isAnyByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try any old nodes that have not been consumed
long earliestBirthday = Long.MAX_VALUE;
for (ReadBuffer buf : completedReadList) {
if (buf.getTimeStamp() < earliestBirthday) {
nodeToEvict = buf;
earliestBirthday = buf.getTimeStamp();
}
}
if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
return evict(nodeToEvict);
}
// nothing can be evicted
return false;
}
private boolean evict(final ReadBuffer buf) {
freeList.push(buf.getBufferindex());
completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
}
return true;
}
private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
// returns true if any part of the buffer is already queued
return (isInList(readAheadQueue, stream, requestedOffset)
|| isInList(inProgressList, stream, requestedOffset)
|| isInList(completedReadList, stream, requestedOffset));
}
private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
return (getFromList(list, stream, requestedOffset) != null);
}
private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : list) {
if (buffer.getStream() == stream) {
if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
&& requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getLength()) {
return buffer;
} else if (requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
return buffer;
}
}
}
return null;
}
private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) {
readAheadQueue.remove(buffer);
notifyAll(); // lock is held in calling method
freeList.push(buffer.getBufferindex());
}
}
private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
final byte[] buffer) {
ReadBuffer buf = getFromList(completedReadList, stream, position);
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
return 0;
}
int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
if (cursor == 0) {
buf.setFirstByteConsumed(true);
}
if (cursor + lengthToCopy == buf.getLength()) {
buf.setLastByteConsumed(true);
}
buf.setAnyByteConsumed(true);
return lengthToCopy;
}
/*
*
* ReadBufferWorker-thread-facing methods
*
*/
/**
* ReadBufferWorker thread calls this to get the next buffer that it should work on.
*
* @return {@link ReadBuffer}
* @throws InterruptedException if thread is interrupted
*/
ReadBuffer getNextBlockToRead() throws InterruptedException {
ReadBuffer buffer = null;
synchronized (this) {
//buffer = readAheadQueue.take(); // blocking method
while (readAheadQueue.size() == 0) {
wait();
}
buffer = readAheadQueue.remove();
notifyAll();
if (buffer == null) {
return null; // should never happen
}
buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
inProgressList.add(buffer);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
buffer.getStream().getPath(), buffer.getOffset());
}
return buffer;
}
/**
* ReadBufferWorker thread calls this method to post completion.
*
* @param buffer the buffer whose read was completed
* @param result the {@link ReadBufferStatus} after the read operation in the worker thread
* @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
}
synchronized (this) {
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else {
freeList.push(buffer.getBufferindex());
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
}
}
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
* Similar to System.currentTimeMillis, except implemented with System.nanoTime().
* System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
* making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
* Note: it is not monotonic across Sockets, and even within a CPU, its only the
* more recent parts which share a clock across all cores.
*
* @return current time in milliseconds
*/
private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}
}

View file

@ -1,29 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
/**
* The ReadBufferStatus for Rest AbfsClient
*/
public enum ReadBufferStatus {
NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
AVAILABLE, // data is available in buffer. It should be in completedList
READ_FAILED // read completed, but failed.
}

View file

@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
private int id;
ReadBufferWorker(final int id) {
this.id = id;
}
/**
* return the ID of ReadBufferWorker.
*/
public int getId() {
return this.id;
}
/**
* Waits until a buffer becomes available in ReadAheadQueue.
* Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
* Rinse and repeat. Forever.
*/
public void run() {
try {
UNLEASH_WORKERS.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
ReadBuffer buffer;
while (true) {
try {
buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
if (buffer != null) {
try {
// do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
}
}
}
}
}

View file

@ -212,7 +212,6 @@ public class SeaweedFileSystemStore {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path); FilerProto.Entry entry = lookupEntry(path);
if (entry == null) { if (entry == null) {
@ -223,8 +222,7 @@ public class SeaweedFileSystemStore {
statistics, statistics,
path.toUri().getPath(), path.toUri().getPath(),
entry, entry,
bufferSize, bufferSize);
readAheadQueueDepth);
} }
public void setOwner(Path path, String owner, String group) { public void setOwner(Path path, String owner, String group) {

View file

@ -27,33 +27,23 @@ public class SeaweedInputStream extends FSInputStream {
private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength; private final long contentLength;
private final int bufferSize; // default buffer size private final int bufferSize; // default buffer size
private final int readAheadQueueDepth; // initialized in constructor
private final boolean readAheadEnabled; // whether enable readAhead;
private byte[] buffer = null; // will be initialized on first use private long position = 0; // cursor of the file
private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
private long fCursorAfterLastRead = -1;
private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
// of valid bytes in buffer)
private boolean closed = false; private boolean closed = false;
public SeaweedInputStream( public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient, final FilerGrpcClient filerGrpcClient,
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final FilerProto.Entry entry, final FilerProto.Entry entry,
final int bufferSize, final int bufferSize) {
final int readAheadQueueDepth) {
this.filerGrpcClient = filerGrpcClient; this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
this.entry = entry; this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream {
@Override @Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException { public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
currentOff += lastReadBytes;
currentLen -= lastReadBytes;
totalReadBytes += lastReadBytes;
}
if (currentLen <= 0 || currentLen > b.length - currentOff) {
break;
}
} while (lastReadBytes > 0);
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Preconditions.checkNotNull(b);
if (len == 0) {
return 0;
}
if (this.available() == 0) {
return -1;
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
//If buffer is empty, then fill the buffer.
if (bCursor == limit) {
//If EOF, then return -1
if (fCursor >= contentLength) {
return -1;
}
long bytesRead = 0;
//reset buffer to initial state - i.e., throw away existing data
bCursor = 0;
limit = 0;
if (buffer == null) {
buffer = new byte[bufferSize];
}
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
}
if (bytesRead == -1) {
return -1;
}
limit += bytesRead;
fCursor += bytesRead;
fCursorAfterLastRead = fCursor;
}
//If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
//(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor;
int bytesToRead = Math.min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead);
bCursor += bytesToRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
return bytesToRead;
}
private int readInternal(final long position, final byte[] b, final int offset, final int length,
final boolean bypassReadAhead) throws IOException {
if (readAheadEnabled && !bypassReadAhead) {
// try reading from read-ahead
if (offset != 0) {
throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
}
int receivedBytes;
// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
}
// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
return receivedBytes;
}
// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
return readRemote(position, b, offset, length);
}
}
int readRemote(long position, byte[] b, int offset, int length) throws IOException {
if (position < 0) { if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset"); throw new IllegalArgumentException("attempting to read from negative offset");
} }
@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream {
if (b == null) { if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method"); throw new IllegalArgumentException("null byte array passed in to read() method");
} }
if (offset >= b.length) { if (off >= b.length) {
throw new IllegalArgumentException("offset greater than length of array"); throw new IllegalArgumentException("offset greater than length of array");
} }
if (length < 0) { if (len < 0) {
throw new IllegalArgumentException("requested read length is less than zero"); throw new IllegalArgumentException("requested read length is less than zero");
} }
if (length > (b.length - offset)) { if (len > (b.length - off)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
} }
long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
if (bytesRead > Integer.MAX_VALUE) { if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length"); throw new IOException("Unexpected Content-Length");
} }
return (int) bytesRead;
if (bytesRead > 0) {
this.position += bytesRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
}
return (int)bytesRead;
} }
/** /**
@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
} }
if (n >= fCursor - limit && n <= fCursor) { // within buffer this.position = n;
bCursor = (int) (n - (fCursor - limit));
return;
}
// next read will read from here
fCursor = n;
//invalidate buffer
limit = 0;
bCursor = 0;
} }
@Override @Override
@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) { if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
} }
long currentPos = getPos(); if (this.position == contentLength) {
if (currentPos == contentLength) {
if (n > 0) { if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
} }
} }
long newPos = currentPos + n; long newPos = this.position + n;
if (newPos < 0) { if (newPos < 0) {
newPos = 0; newPos = 0;
n = newPos - currentPos; n = newPos - this.position;
} }
if (newPos > contentLength) { if (newPos > contentLength) {
newPos = contentLength; newPos = contentLength;
n = newPos - currentPos; n = newPos - this.position;
} }
seek(newPos); seek(newPos);
return n; return n;
@ -289,11 +163,11 @@ public class SeaweedInputStream extends FSInputStream {
public synchronized int available() throws IOException { public synchronized int available() throws IOException {
if (closed) { if (closed) {
throw new IOException( throw new IOException(
FSExceptionMessages.STREAM_IS_CLOSED); FSExceptionMessages.STREAM_IS_CLOSED);
} }
final long remaining = this.contentLength - this.getPos(); final long remaining = this.contentLength - this.getPos();
return remaining <= Integer.MAX_VALUE return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE; ? (int) remaining : Integer.MAX_VALUE;
} }
/** /**
@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) { if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
} }
return fCursor - limit + bCursor; return position;
} }
/** /**
@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
closed = true; closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
} }
/** /**