diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java deleted file mode 100644 index 926d0b83b..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBuffer { - - private SeaweedInputStream stream; - private long offset; // offset within the file for the buffer - private int length; // actual length, set after the buffer is filles - private int requestedLength; // requested length of the read - private byte[] buffer; // the buffer itself - private int bufferindex = -1; // index in the buffers array in Buffer manager - private ReadBufferStatus status; // status of the buffer - private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client - // waiting on this buffer gets unblocked - - // fields to help with eviction logic - private long timeStamp = 0; // tick at which buffer became available to read - private boolean isFirstByteConsumed = false; - private boolean isLastByteConsumed = false; - private boolean isAnyByteConsumed = false; - - public SeaweedInputStream getStream() { - return stream; - } - - public void setStream(SeaweedInputStream stream) { - this.stream = stream; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; - } - - public int getRequestedLength() { - return requestedLength; - } - - public void setRequestedLength(int requestedLength) { - this.requestedLength = requestedLength; - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getBufferindex() { - return bufferindex; - } - - public void setBufferindex(int bufferindex) { - this.bufferindex = bufferindex; - } - - public ReadBufferStatus getStatus() { - return status; - } - - public void setStatus(ReadBufferStatus status) { - this.status = status; - } - - public CountDownLatch getLatch() { - return latch; - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public long getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - public boolean isFirstByteConsumed() { - return isFirstByteConsumed; - } - - public void setFirstByteConsumed(boolean isFirstByteConsumed) { - this.isFirstByteConsumed = isFirstByteConsumed; - } - - public boolean isLastByteConsumed() { - return isLastByteConsumed; - } - - public void setLastByteConsumed(boolean isLastByteConsumed) { - this.isLastByteConsumed = isLastByteConsumed; - } - - public boolean isAnyByteConsumed() { - return isAnyByteConsumed; - } - - public void setAnyByteConsumed(boolean isAnyByteConsumed) { - this.isAnyByteConsumed = isAnyByteConsumed; - } - -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java deleted file mode 100644 index 5b1e21529..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package seaweed.hdfs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - - private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; - private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } - - static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("SeaweedFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - } - - - /* - * - * SeaweedInputStream-facing methods - * - */ - - - /** - * {@link SeaweedInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link SeaweedInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - - - /** - * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final SeaweedInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - long earliestBirthday = Long.MAX_VALUE; - for (ReadBuffer buf : completedReadList) { - if (buf.getTimeStamp() < earliestBirthday) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } - } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); - completedReadList.remove(buf); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection list, final SeaweedInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection list, final SeaweedInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { - return 0; - } - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); - } - synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); - buffer.setLength(bytesActuallyRead); - completedReadList.add(buffer); - } else { - freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC - } - } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java deleted file mode 100644 index d63674977..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -/** - * The ReadBufferStatus for Rest AbfsClient - */ -public enum ReadBufferStatus { - NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats - READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList - AVAILABLE, // data is available in buffer. It should be in completedList - READ_FAILED // read completed, but failed. -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java deleted file mode 100644 index 6ffbc4644..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBufferWorker implements Runnable { - - protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); - private int id; - - ReadBufferWorker(final int id) { - this.id = id; - } - - /** - * return the ID of ReadBufferWorker. - */ - public int getId() { - return this.id; - } - - /** - * Waits until a buffer becomes available in ReadAheadQueue. - * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. - * Rinse and repeat. Forever. - */ - public void run() { - try { - UNLEASH_WORKERS.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - ReadBuffer buffer; - while (true) { - try { - buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (Exception ex) { - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } - } - } - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index f9aaa175f..d9c6d6f0d 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -212,7 +212,6 @@ public class SeaweedFileSystemStore { LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); - int readAheadQueueDepth = 2; FilerProto.Entry entry = lookupEntry(path); if (entry == null) { @@ -223,8 +222,7 @@ public class SeaweedFileSystemStore { statistics, path.toUri().getPath(), entry, - bufferSize, - readAheadQueueDepth); + bufferSize); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 90c14c772..c26ad728f 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -27,33 +27,23 @@ public class SeaweedInputStream extends FSInputStream { private final List visibleIntervalList; private final long contentLength; private final int bufferSize; // default buffer size - private final int readAheadQueueDepth; // initialized in constructor - private final boolean readAheadEnabled; // whether enable readAhead; - private byte[] buffer = null; // will be initialized on first use + private long position = 0; // cursor of the file - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server - private long fCursorAfterLastRead = -1; - private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer - private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 - // of valid bytes in buffer) private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry, - final int bufferSize, - final int readAheadQueueDepth) { + final FilerGrpcClient filerGrpcClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry, + final int bufferSize) { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.readAheadEnabled = true; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); @@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { - int currentOff = off; - int currentLen = len; - int lastReadBytes; - int totalReadBytes = 0; - do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); - if (lastReadBytes > 0) { - currentOff += lastReadBytes; - currentLen -= lastReadBytes; - totalReadBytes += lastReadBytes; - } - if (currentLen <= 0 || currentLen > b.length - currentOff) { - break; - } - } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; - } - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - - if (len == 0) { - return 0; - } - - if (this.available() == 0) { - return -1; - } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - buffer = new byte[bufferSize]; - } - - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); - } - - if (bytesRead == -1) { - return -1; - } - - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - - //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) - //(bytes returned may be less than requested) - int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); - bCursor += bytesToRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesToRead); - } - return bytesToRead; - } - - - private int readInternal(final long position, final byte[] b, final int offset, final int length, - final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { - // try reading from read-ahead - if (offset != 0) { - throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); - } - int receivedBytes; - - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; - long nextSize; - long nextOffset = position; - while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); - nextOffset = nextOffset + nextSize; - numReadAheads--; - } - - // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); - if (receivedBytes > 0) { - return receivedBytes; - } - - // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length); - return receivedBytes; - } else { - return readRemote(position, b, offset, length); - } - } - - int readRemote(long position, byte[] b, int offset, int length) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } @@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream { if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } - if (offset >= b.length) { + if (off >= b.length) { throw new IllegalArgumentException("offset greater than length of array"); } - if (length < 0) { + if (len < 0) { throw new IllegalArgumentException("requested read length is less than zero"); } - if (length > (b.length - offset)) { + if (len > (b.length - off)) { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } - return (int) bytesRead; + + if (bytesRead > 0) { + this.position += bytesRead; + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + + return (int)bytesRead; + } /** @@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } - if (n >= fCursor - limit && n <= fCursor) { // within buffer - bCursor = (int) (n - (fCursor - limit)); - return; - } + this.position = n; - // next read will read from here - fCursor = n; - - //invalidate buffer - limit = 0; - bCursor = 0; } @Override @@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - long currentPos = getPos(); - if (currentPos == contentLength) { + if (this.position == contentLength) { if (n > 0) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } } - long newPos = currentPos + n; + long newPos = this.position + n; if (newPos < 0) { newPos = 0; - n = newPos - currentPos; + n = newPos - this.position; } if (newPos > contentLength) { newPos = contentLength; - n = newPos - currentPos; + n = newPos - this.position; } seek(newPos); return n; @@ -289,11 +163,11 @@ public class SeaweedInputStream extends FSInputStream { public synchronized int available() throws IOException { if (closed) { throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); + FSExceptionMessages.STREAM_IS_CLOSED); } final long remaining = this.contentLength - this.getPos(); return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining : Integer.MAX_VALUE; } /** @@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return position; } /** @@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized void close() throws IOException { closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner } /** diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java deleted file mode 100644 index 926d0b83b..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBuffer { - - private SeaweedInputStream stream; - private long offset; // offset within the file for the buffer - private int length; // actual length, set after the buffer is filles - private int requestedLength; // requested length of the read - private byte[] buffer; // the buffer itself - private int bufferindex = -1; // index in the buffers array in Buffer manager - private ReadBufferStatus status; // status of the buffer - private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client - // waiting on this buffer gets unblocked - - // fields to help with eviction logic - private long timeStamp = 0; // tick at which buffer became available to read - private boolean isFirstByteConsumed = false; - private boolean isLastByteConsumed = false; - private boolean isAnyByteConsumed = false; - - public SeaweedInputStream getStream() { - return stream; - } - - public void setStream(SeaweedInputStream stream) { - this.stream = stream; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; - } - - public int getRequestedLength() { - return requestedLength; - } - - public void setRequestedLength(int requestedLength) { - this.requestedLength = requestedLength; - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getBufferindex() { - return bufferindex; - } - - public void setBufferindex(int bufferindex) { - this.bufferindex = bufferindex; - } - - public ReadBufferStatus getStatus() { - return status; - } - - public void setStatus(ReadBufferStatus status) { - this.status = status; - } - - public CountDownLatch getLatch() { - return latch; - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public long getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - public boolean isFirstByteConsumed() { - return isFirstByteConsumed; - } - - public void setFirstByteConsumed(boolean isFirstByteConsumed) { - this.isFirstByteConsumed = isFirstByteConsumed; - } - - public boolean isLastByteConsumed() { - return isLastByteConsumed; - } - - public void setLastByteConsumed(boolean isLastByteConsumed) { - this.isLastByteConsumed = isLastByteConsumed; - } - - public boolean isAnyByteConsumed() { - return isAnyByteConsumed; - } - - public void setAnyByteConsumed(boolean isAnyByteConsumed) { - this.isAnyByteConsumed = isAnyByteConsumed; - } - -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java deleted file mode 100644 index 5b1e21529..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package seaweed.hdfs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - - private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; - private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } - - static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("SeaweedFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - } - - - /* - * - * SeaweedInputStream-facing methods - * - */ - - - /** - * {@link SeaweedInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link SeaweedInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - - - /** - * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final SeaweedInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - long earliestBirthday = Long.MAX_VALUE; - for (ReadBuffer buf : completedReadList) { - if (buf.getTimeStamp() < earliestBirthday) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } - } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); - completedReadList.remove(buf); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection list, final SeaweedInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection list, final SeaweedInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { - return 0; - } - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); - } - synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); - buffer.setLength(bytesActuallyRead); - completedReadList.add(buffer); - } else { - freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC - } - } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java deleted file mode 100644 index d63674977..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -/** - * The ReadBufferStatus for Rest AbfsClient - */ -public enum ReadBufferStatus { - NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats - READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList - AVAILABLE, // data is available in buffer. It should be in completedList - READ_FAILED // read completed, but failed. -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java deleted file mode 100644 index 6ffbc4644..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBufferWorker implements Runnable { - - protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); - private int id; - - ReadBufferWorker(final int id) { - this.id = id; - } - - /** - * return the ID of ReadBufferWorker. - */ - public int getId() { - return this.id; - } - - /** - * Waits until a buffer becomes available in ReadAheadQueue. - * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. - * Rinse and repeat. Forever. - */ - public void run() { - try { - UNLEASH_WORKERS.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - ReadBuffer buffer; - while (true) { - try { - buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (Exception ex) { - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } - } - } - } -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index f9aaa175f..d9c6d6f0d 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -212,7 +212,6 @@ public class SeaweedFileSystemStore { LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); - int readAheadQueueDepth = 2; FilerProto.Entry entry = lookupEntry(path); if (entry == null) { @@ -223,8 +222,7 @@ public class SeaweedFileSystemStore { statistics, path.toUri().getPath(), entry, - bufferSize, - readAheadQueueDepth); + bufferSize); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 90c14c772..c26ad728f 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -27,33 +27,23 @@ public class SeaweedInputStream extends FSInputStream { private final List visibleIntervalList; private final long contentLength; private final int bufferSize; // default buffer size - private final int readAheadQueueDepth; // initialized in constructor - private final boolean readAheadEnabled; // whether enable readAhead; - private byte[] buffer = null; // will be initialized on first use + private long position = 0; // cursor of the file - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server - private long fCursorAfterLastRead = -1; - private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer - private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 - // of valid bytes in buffer) private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry, - final int bufferSize, - final int readAheadQueueDepth) { + final FilerGrpcClient filerGrpcClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry, + final int bufferSize) { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.readAheadEnabled = true; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); @@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { - int currentOff = off; - int currentLen = len; - int lastReadBytes; - int totalReadBytes = 0; - do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); - if (lastReadBytes > 0) { - currentOff += lastReadBytes; - currentLen -= lastReadBytes; - totalReadBytes += lastReadBytes; - } - if (currentLen <= 0 || currentLen > b.length - currentOff) { - break; - } - } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; - } - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - - if (len == 0) { - return 0; - } - - if (this.available() == 0) { - return -1; - } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - buffer = new byte[bufferSize]; - } - - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); - } - - if (bytesRead == -1) { - return -1; - } - - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - - //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) - //(bytes returned may be less than requested) - int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); - bCursor += bytesToRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesToRead); - } - return bytesToRead; - } - - - private int readInternal(final long position, final byte[] b, final int offset, final int length, - final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { - // try reading from read-ahead - if (offset != 0) { - throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); - } - int receivedBytes; - - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; - long nextSize; - long nextOffset = position; - while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); - nextOffset = nextOffset + nextSize; - numReadAheads--; - } - - // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); - if (receivedBytes > 0) { - return receivedBytes; - } - - // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length); - return receivedBytes; - } else { - return readRemote(position, b, offset, length); - } - } - - int readRemote(long position, byte[] b, int offset, int length) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } @@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream { if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } - if (offset >= b.length) { + if (off >= b.length) { throw new IllegalArgumentException("offset greater than length of array"); } - if (length < 0) { + if (len < 0) { throw new IllegalArgumentException("requested read length is less than zero"); } - if (length > (b.length - offset)) { + if (len > (b.length - off)) { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } - return (int) bytesRead; + + if (bytesRead > 0) { + this.position += bytesRead; + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + + return (int)bytesRead; + } /** @@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } - if (n >= fCursor - limit && n <= fCursor) { // within buffer - bCursor = (int) (n - (fCursor - limit)); - return; - } + this.position = n; - // next read will read from here - fCursor = n; - - //invalidate buffer - limit = 0; - bCursor = 0; } @Override @@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - long currentPos = getPos(); - if (currentPos == contentLength) { + if (this.position == contentLength) { if (n > 0) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } } - long newPos = currentPos + n; + long newPos = this.position + n; if (newPos < 0) { newPos = 0; - n = newPos - currentPos; + n = newPos - this.position; } if (newPos > contentLength) { newPos = contentLength; - n = newPos - currentPos; + n = newPos - this.position; } seek(newPos); return n; @@ -289,11 +163,11 @@ public class SeaweedInputStream extends FSInputStream { public synchronized int available() throws IOException { if (closed) { throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); + FSExceptionMessages.STREAM_IS_CLOSED); } final long remaining = this.contentLength - this.getPos(); return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining : Integer.MAX_VALUE; } /** @@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return position; } /** @@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized void close() throws IOException { closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner } /**