diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java
new file mode 100644
index 000000000..926d0b83b
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBuffer {
+
+ private SeaweedInputStream stream;
+ private long offset; // offset within the file for the buffer
+ private int length; // actual length, set after the buffer is filles
+ private int requestedLength; // requested length of the read
+ private byte[] buffer; // the buffer itself
+ private int bufferindex = -1; // index in the buffers array in Buffer manager
+ private ReadBufferStatus status; // status of the buffer
+ private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
+ // waiting on this buffer gets unblocked
+
+ // fields to help with eviction logic
+ private long timeStamp = 0; // tick at which buffer became available to read
+ private boolean isFirstByteConsumed = false;
+ private boolean isLastByteConsumed = false;
+ private boolean isAnyByteConsumed = false;
+
+ public SeaweedInputStream getStream() {
+ return stream;
+ }
+
+ public void setStream(SeaweedInputStream stream) {
+ this.stream = stream;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+
+ public int getRequestedLength() {
+ return requestedLength;
+ }
+
+ public void setRequestedLength(int requestedLength) {
+ this.requestedLength = requestedLength;
+ }
+
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+ public void setBuffer(byte[] buffer) {
+ this.buffer = buffer;
+ }
+
+ public int getBufferindex() {
+ return bufferindex;
+ }
+
+ public void setBufferindex(int bufferindex) {
+ this.bufferindex = bufferindex;
+ }
+
+ public ReadBufferStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(ReadBufferStatus status) {
+ this.status = status;
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public boolean isFirstByteConsumed() {
+ return isFirstByteConsumed;
+ }
+
+ public void setFirstByteConsumed(boolean isFirstByteConsumed) {
+ this.isFirstByteConsumed = isFirstByteConsumed;
+ }
+
+ public boolean isLastByteConsumed() {
+ return isLastByteConsumed;
+ }
+
+ public void setLastByteConsumed(boolean isLastByteConsumed) {
+ this.isLastByteConsumed = isLastByteConsumed;
+ }
+
+ public boolean isAnyByteConsumed() {
+ return isAnyByteConsumed;
+ }
+
+ public void setAnyByteConsumed(boolean isAnyByteConsumed) {
+ this.isAnyByteConsumed = isAnyByteConsumed;
+ }
+
+}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java
new file mode 100644
index 000000000..5b1e21529
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package seaweed.hdfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient.
+ */
+final class ReadBufferManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
+
+ private static final int NUM_BUFFERS = 16;
+ private static final int BLOCK_SIZE = 4 * 1024 * 1024;
+ private static final int NUM_THREADS = 8;
+ private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
+
+ private Thread[] threads = new Thread[NUM_THREADS];
+ private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
+ private Stack freeList = new Stack<>(); // indices in buffers[] array that are available
+
+ private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
+ private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads
+ private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading
+ private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
+
+ static {
+ BUFFER_MANAGER = new ReadBufferManager();
+ BUFFER_MANAGER.init();
+ }
+
+ static ReadBufferManager getBufferManager() {
+ return BUFFER_MANAGER;
+ }
+
+ private void init() {
+ buffers = new byte[NUM_BUFFERS][];
+ for (int i = 0; i < NUM_BUFFERS; i++) {
+ buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
+ freeList.add(i);
+ }
+ for (int i = 0; i < NUM_THREADS; i++) {
+ Thread t = new Thread(new ReadBufferWorker(i));
+ t.setDaemon(true);
+ threads[i] = t;
+ t.setName("SeaweedFS-prefetch-" + i);
+ t.start();
+ }
+ ReadBufferWorker.UNLEASH_WORKERS.countDown();
+ }
+
+ // hide instance constructor
+ private ReadBufferManager() {
+ }
+
+
+ /*
+ *
+ * SeaweedInputStream-facing methods
+ *
+ */
+
+
+ /**
+ * {@link SeaweedInputStream} calls this method to queue read-aheads.
+ *
+ * @param stream The {@link SeaweedInputStream} for which to do the read-ahead
+ * @param requestedOffset The offset in the file which shoukd be read
+ * @param requestedLength The length to read
+ */
+ void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
+ stream.getPath(), requestedOffset, requestedLength);
+ }
+ ReadBuffer buffer;
+ synchronized (this) {
+ if (isAlreadyQueued(stream, requestedOffset)) {
+ return; // already queued, do not queue again
+ }
+ if (freeList.isEmpty() && !tryEvict()) {
+ return; // no buffers available, cannot queue anything
+ }
+
+ buffer = new ReadBuffer();
+ buffer.setStream(stream);
+ buffer.setOffset(requestedOffset);
+ buffer.setLength(0);
+ buffer.setRequestedLength(requestedLength);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+
+ Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
+
+ buffer.setBuffer(buffers[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+ readAheadQueue.add(buffer);
+ notifyAll();
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
+ stream.getPath(), requestedOffset, buffer.getBufferindex());
+ }
+ }
+
+
+ /**
+ * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
+ * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
+ * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
+ * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
+ * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
+ * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
+ *
+ * @param stream the file to read bytes for
+ * @param position the offset in the file to do a read for
+ * @param length the length to read
+ * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
+ * @return the number of bytes read
+ */
+ int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
+ // not synchronized, so have to be careful with locking
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("getBlock for file {} position {} thread {}",
+ stream.getPath(), position, Thread.currentThread().getName());
+ }
+
+ waitForProcess(stream, position);
+
+ int bytesRead = 0;
+ synchronized (this) {
+ bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
+ }
+ if (bytesRead > 0) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Done read from Cache for {} position {} length {}",
+ stream.getPath(), position, bytesRead);
+ }
+ return bytesRead;
+ }
+
+ // otherwise, just say we got nothing - calling thread can do its own read
+ return 0;
+ }
+
+ /*
+ *
+ * Internal methods
+ *
+ */
+
+ private void waitForProcess(final SeaweedInputStream stream, final long position) {
+ ReadBuffer readBuf;
+ synchronized (this) {
+ clearFromReadAheadQueue(stream, position);
+ readBuf = getFromList(inProgressList, stream, position);
+ }
+ if (readBuf != null) { // if in in-progress queue, then block for it
+ try {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
+ stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
+ }
+ readBuf.getLatch().await(); // blocking wait on the caller stream's thread
+ // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
+ // is done processing it (in doneReading). There, the latch is set after removing the buffer from
+ // inProgressList. So this latch is safe to be outside the synchronized block.
+ // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
+ // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
+ // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("latch done for file {} buffer idx {} length {}",
+ stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
+ }
+ }
+ }
+
+ /**
+ * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
+ * The objective is to find just one buffer - there is no advantage to evicting more than one.
+ *
+ * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
+ */
+ private synchronized boolean tryEvict() {
+ ReadBuffer nodeToEvict = null;
+ if (completedReadList.size() <= 0) {
+ return false; // there are no evict-able buffers
+ }
+
+ // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+ if (nodeToEvict != null) {
+ return evict(nodeToEvict);
+ }
+
+ // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.isAnyByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+
+ if (nodeToEvict != null) {
+ return evict(nodeToEvict);
+ }
+
+ // next, try any old nodes that have not been consumed
+ long earliestBirthday = Long.MAX_VALUE;
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.getTimeStamp() < earliestBirthday) {
+ nodeToEvict = buf;
+ earliestBirthday = buf.getTimeStamp();
+ }
+ }
+ if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
+ return evict(nodeToEvict);
+ }
+
+ // nothing can be evicted
+ return false;
+ }
+
+ private boolean evict(final ReadBuffer buf) {
+ freeList.push(buf.getBufferindex());
+ completedReadList.remove(buf);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
+ buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
+ }
+ return true;
+ }
+
+ private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
+ // returns true if any part of the buffer is already queued
+ return (isInList(readAheadQueue, stream, requestedOffset)
+ || isInList(inProgressList, stream, requestedOffset)
+ || isInList(completedReadList, stream, requestedOffset));
+ }
+
+ private boolean isInList(final Collection list, final SeaweedInputStream stream, final long requestedOffset) {
+ return (getFromList(list, stream, requestedOffset) != null);
+ }
+
+ private ReadBuffer getFromList(final Collection list, final SeaweedInputStream stream, final long requestedOffset) {
+ for (ReadBuffer buffer : list) {
+ if (buffer.getStream() == stream) {
+ if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+ && requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+ return buffer;
+ } else if (requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
+ return buffer;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
+ ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
+ if (buffer != null) {
+ readAheadQueue.remove(buffer);
+ notifyAll(); // lock is held in calling method
+ freeList.push(buffer.getBufferindex());
+ }
+ }
+
+ private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
+ final byte[] buffer) {
+ ReadBuffer buf = getFromList(completedReadList, stream, position);
+ if (buf == null || position >= buf.getOffset() + buf.getLength()) {
+ return 0;
+ }
+ int cursor = (int) (position - buf.getOffset());
+ int availableLengthInBuffer = buf.getLength() - cursor;
+ int lengthToCopy = Math.min(length, availableLengthInBuffer);
+ System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+ if (cursor == 0) {
+ buf.setFirstByteConsumed(true);
+ }
+ if (cursor + lengthToCopy == buf.getLength()) {
+ buf.setLastByteConsumed(true);
+ }
+ buf.setAnyByteConsumed(true);
+ return lengthToCopy;
+ }
+
+ /*
+ *
+ * ReadBufferWorker-thread-facing methods
+ *
+ */
+
+ /**
+ * ReadBufferWorker thread calls this to get the next buffer that it should work on.
+ *
+ * @return {@link ReadBuffer}
+ * @throws InterruptedException if thread is interrupted
+ */
+ ReadBuffer getNextBlockToRead() throws InterruptedException {
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ //buffer = readAheadQueue.take(); // blocking method
+ while (readAheadQueue.size() == 0) {
+ wait();
+ }
+ buffer = readAheadQueue.remove();
+ notifyAll();
+ if (buffer == null) {
+ return null; // should never happen
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ inProgressList.add(buffer);
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
+ buffer.getStream().getPath(), buffer.getOffset());
+ }
+ return buffer;
+ }
+
+ /**
+ * ReadBufferWorker thread calls this method to post completion.
+ *
+ * @param buffer the buffer whose read was completed
+ * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
+ * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
+ */
+ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+ buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
+ }
+ synchronized (this) {
+ inProgressList.remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setTimeStamp(currentTimeMillis());
+ buffer.setLength(bytesActuallyRead);
+ completedReadList.add(buffer);
+ } else {
+ freeList.push(buffer.getBufferindex());
+ // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
+ }
+ }
+ //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
+ }
+
+ /**
+ * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
+ * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
+ * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
+ * Note: it is not monotonic across Sockets, and even within a CPU, its only the
+ * more recent parts which share a clock across all cores.
+ *
+ * @return current time in milliseconds
+ */
+ private long currentTimeMillis() {
+ return System.nanoTime() / 1000 / 1000;
+ }
+}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java
new file mode 100644
index 000000000..d63674977
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+/**
+ * The ReadBufferStatus for Rest AbfsClient
+ */
+public enum ReadBufferStatus {
+ NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
+ READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
+ AVAILABLE, // data is available in buffer. It should be in completedList
+ READ_FAILED // read completed, but failed.
+}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java
new file mode 100644
index 000000000..6ffbc4644
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBufferWorker implements Runnable {
+
+ protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
+ private int id;
+
+ ReadBufferWorker(final int id) {
+ this.id = id;
+ }
+
+ /**
+ * return the ID of ReadBufferWorker.
+ */
+ public int getId() {
+ return this.id;
+ }
+
+ /**
+ * Waits until a buffer becomes available in ReadAheadQueue.
+ * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
+ * Rinse and repeat. Forever.
+ */
+ public void run() {
+ try {
+ UNLEASH_WORKERS.await();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ ReadBuffer buffer;
+ while (true) {
+ try {
+ buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (buffer != null) {
+ try {
+ // do the actual read, from the file.
+ int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
+ bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
+ } catch (Exception ex) {
+ bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
+ }
+ }
+ }
+ }
+}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 50f05a7c7..5599c5683 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -57,6 +57,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ path = qualify(path);
+
return null;
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
new file mode 100644
index 000000000..0cd118f22
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -0,0 +1,363 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+public class SeaweedInputStream extends FSInputStream {
+
+ private final FilerGrpcClient filerGrpcClient;
+ private final Statistics statistics;
+ private final String path;
+ private final FilerProto.Entry entry;
+ private final List visibleIntervalList;
+ private final long contentLength;
+ private final int bufferSize; // default buffer size
+ private final int readAheadQueueDepth; // initialized in constructor
+ private final boolean readAheadEnabled; // whether enable readAhead;
+
+ private byte[] buffer = null; // will be initialized on first use
+
+ private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
+ private long fCursorAfterLastRead = -1;
+ private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
+ private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
+ // of valid bytes in buffer)
+ private boolean closed = false;
+
+ public SeaweedInputStream(
+ final FilerGrpcClient filerGrpcClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry,
+ final int bufferSize,
+ final int readAheadQueueDepth) {
+ this.filerGrpcClient = filerGrpcClient;
+ this.statistics = statistics;
+ this.path = path;
+ this.entry = entry;
+ this.contentLength = entry.getAttributes().getFileSize();
+ this.bufferSize = bufferSize;
+ this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
+ this.readAheadEnabled = true;
+
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int numberOfBytesRead = read(b, 0, 1);
+ if (numberOfBytesRead < 0) {
+ return -1;
+ } else {
+ return (b[0] & 0xFF);
+ }
+ }
+
+ @Override
+ public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+ int currentOff = off;
+ int currentLen = len;
+ int lastReadBytes;
+ int totalReadBytes = 0;
+ do {
+ lastReadBytes = readOneBlock(b, currentOff, currentLen);
+ if (lastReadBytes > 0) {
+ currentOff += lastReadBytes;
+ currentLen -= lastReadBytes;
+ totalReadBytes += lastReadBytes;
+ }
+ if (currentLen <= 0 || currentLen > b.length - currentOff) {
+ break;
+ }
+ } while (lastReadBytes > 0);
+ return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
+ }
+
+ private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ Preconditions.checkNotNull(b);
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (this.available() == 0) {
+ return -1;
+ }
+
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ //If buffer is empty, then fill the buffer.
+ if (bCursor == limit) {
+ //If EOF, then return -1
+ if (fCursor >= contentLength) {
+ return -1;
+ }
+
+ long bytesRead = 0;
+ //reset buffer to initial state - i.e., throw away existing data
+ bCursor = 0;
+ limit = 0;
+ if (buffer == null) {
+ buffer = new byte[bufferSize];
+ }
+
+ // Enable readAhead when reading sequentially
+ if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
+ bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
+ } else {
+ bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
+ }
+
+ if (bytesRead == -1) {
+ return -1;
+ }
+
+ limit += bytesRead;
+ fCursor += bytesRead;
+ fCursorAfterLastRead = fCursor;
+ }
+
+ //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
+ //(bytes returned may be less than requested)
+ int bytesRemaining = limit - bCursor;
+ int bytesToRead = Math.min(len, bytesRemaining);
+ System.arraycopy(buffer, bCursor, b, off, bytesToRead);
+ bCursor += bytesToRead;
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesToRead);
+ }
+ return bytesToRead;
+ }
+
+
+ private int readInternal(final long position, final byte[] b, final int offset, final int length,
+ final boolean bypassReadAhead) throws IOException {
+ if (readAheadEnabled && !bypassReadAhead) {
+ // try reading from read-ahead
+ if (offset != 0) {
+ throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
+ }
+ int receivedBytes;
+
+ // queue read-aheads
+ int numReadAheads = this.readAheadQueueDepth;
+ long nextSize;
+ long nextOffset = position;
+ while (numReadAheads > 0 && nextOffset < contentLength) {
+ nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+ ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
+ nextOffset = nextOffset + nextSize;
+ numReadAheads--;
+ }
+
+ // try reading from buffers first
+ receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
+ if (receivedBytes > 0) {
+ return receivedBytes;
+ }
+
+ // got nothing from read-ahead, do our own read now
+ receivedBytes = readRemote(position, b, offset, length);
+ return receivedBytes;
+ } else {
+ return readRemote(position, b, offset, length);
+ }
+ }
+
+ int readRemote(long position, byte[] b, int offset, int length) throws IOException {
+ if (position < 0) {
+ throw new IllegalArgumentException("attempting to read from negative offset");
+ }
+ if (position >= contentLength) {
+ return -1; // Hadoop prefers -1 to EOFException
+ }
+ if (b == null) {
+ throw new IllegalArgumentException("null byte array passed in to read() method");
+ }
+ if (offset >= b.length) {
+ throw new IllegalArgumentException("offset greater than length of array");
+ }
+ if (length < 0) {
+ throw new IllegalArgumentException("requested read length is less than zero");
+ }
+ if (length > (b.length - offset)) {
+ throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ }
+
+ long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
+ if (bytesRead > Integer.MAX_VALUE) {
+ throw new IOException("Unexpected Content-Length");
+ }
+ return (int) bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ *
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ if (n < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (n > contentLength) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+
+ if (n >= fCursor - limit && n <= fCursor) { // within buffer
+ bCursor = (int) (n - (fCursor - limit));
+ return;
+ }
+
+ // next read will read from here
+ fCursor = n;
+
+ //invalidate buffer
+ limit = 0;
+ bCursor = 0;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ long currentPos = getPos();
+ if (currentPos == contentLength) {
+ if (n > 0) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+ }
+ long newPos = currentPos + n;
+ if (newPos < 0) {
+ newPos = 0;
+ n = newPos - currentPos;
+ }
+ if (newPos > contentLength) {
+ newPos = contentLength;
+ n = newPos - currentPos;
+ }
+ seek(newPos);
+ return n;
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ *
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (closed) {
+ throw new IOException(
+ FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ final long remaining = this.contentLength - this.getPos();
+ return remaining <= Integer.MAX_VALUE
+ ? (int) remaining : Integer.MAX_VALUE;
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ return contentLength;
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ return fCursor - limit + bCursor;
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ buffer = null; // de-reference the buffer so it can be GC'ed sooner
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 77886183c..5da77c3a3 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -68,7 +68,6 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.entry = entry;
-
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
index 8b2e5d1db..bf3669d59 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
@@ -1,14 +1,86 @@
package seaweed.hdfs;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
public class SeaweedRead {
+ // returns bytesRead
+ public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals,
+ final long position, final byte[] buffer, final int bufferOffset,
+ final int bufferLength) {
+
+ List chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
+
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ for (ChunkView chunkView : chunkViews) {
+ String vid = parseVolumeId(chunkView.fileId);
+ lookupRequest.addVolumeIds(vid);
+ }
+
+ FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+
+ Map vid2Locations = lookupResponse.getLocationsMapMap();
+
+ //TODO parallel this
+ long readCount = 0;
+ for (ChunkView chunkView : chunkViews) {
+ FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
+ if (locations.getLocationsCount() == 0) {
+ // log here!
+ return 0;
+ }
+
+ HttpClient client = HttpClientBuilder.create().build();
+ HttpGet request = new HttpGet(
+ String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+ request.setHeader("Range",
+ String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size));
+
+ try {
+ HttpResponse response = client.execute(request);
+ HttpEntity entity = response.getEntity();
+
+ readCount += entity.getContent().read(buffer,
+ (int) (chunkView.logicOffset - position),
+ (int) (chunkView.logicOffset - position + chunkView.size));
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ return readCount;
+ }
+
+ private static List viewFromVisibles(List visibleIntervals, long offset, long size) {
+ List views = new ArrayList<>();
+
+ long stop = offset + size;
+ for (VisibleInterval chunk : visibleIntervals) {
+ views.add(new ChunkView(
+ chunk.fileId,
+ offset - chunk.start,
+ Math.min(chunk.stop, stop) - offset,
+ offset
+ ));
+ }
+ return views;
+ }
+
public static List nonOverlappingVisibleIntervals(List chunkList) {
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator() {
@@ -88,6 +160,14 @@ public class SeaweedRead {
return newVisibles;
}
+ public static String parseVolumeId(String fileId) {
+ int commaIndex = fileId.lastIndexOf(',');
+ if (commaIndex > 0) {
+ return fileId.substring(0, commaIndex);
+ }
+ return fileId;
+ }
+
public static class VisibleInterval {
long start;
long stop;
@@ -102,4 +182,18 @@ public class SeaweedRead {
}
}
+ public static class ChunkView {
+ String fileId;
+ long offset;
+ long size;
+ long logicOffset;
+
+ public ChunkView(String fileId, long offset, long size, long logicOffset) {
+ this.fileId = fileId;
+ this.offset = offset;
+ this.size = size;
+ this.logicOffset = logicOffset;
+ }
+ }
+
}