This commit is contained in:
Chris Lu 2019-08-29 23:29:10 -07:00
parent 58168a8c52
commit 170ee6ef0f
21 changed files with 2590 additions and 1 deletions

View file

@ -86,7 +86,7 @@ public class SeaweedRead {
return 0;
}
public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
List<ChunkView> views = new ArrayList<>();
long stop = offset + size;

View file

@ -0,0 +1,129 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>oss-parent</artifactId>
<groupId>org.sonatype.oss</groupId>
<version>9</version>
<relativePath>../pom.xml/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop-client</artifactId>
<version>${seaweedfs.client.version}</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>org/slf4j/**</exclude>
<exclude>META-INF/maven/org.slf4j/**</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer />
</transformers>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>shaded.com.google</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc.internal</pattern>
<shadedPattern>shaded.io.grpc.internal</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>shaded.org.apache.commons</shadedPattern>
<excludes>
<exclude>org.apache.hadoop</exclude>
<exclude>org.apache.log4j</exclude>
</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.1.0</seaweedfs.client.version>
<hadoop.version>2.7.4</hadoop.version>
</properties>
</project>

159
other/java/hdfs2/pom.xml Normal file
View file

@ -0,0 +1,159 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.1.0</seaweedfs.client.version>
<hadoop.version>2.7.4</hadoop.version>
</properties>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop-client</artifactId>
<version>${seaweedfs.client.version}</version>
<parent>
<groupId>org.sonatype.oss</groupId>
<artifactId>oss-parent</artifactId>
<version>9</version>
</parent>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>org/slf4j/**</exclude>
<exclude>META-INF/maven/org.slf4j/**</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>shaded.com.google</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc.internal</pattern>
<shadedPattern>shaded.io.grpc.internal</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>shaded.org.apache.commons</shadedPattern>
<excludes>
<exclude>org.apache.hadoop</exclude>
<exclude>org.apache.log4j</exclude>
</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>${seaweedfs.client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>

View file

@ -0,0 +1,283 @@
package seaweed.hdfs;
// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
public class SeaweedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
private final FilerGrpcClient filerGrpcClient;
private final Path path;
private final int bufferSize;
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private FilerProto.Entry.Builder entry;
private long position;
private boolean closed;
private boolean supportFlush = true;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private byte[] buffer;
private int bufferIndex;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
this.filerGrpcClient = filerGrpcClient;
this.replication = replication;
this.path = path;
this.position = position;
this.closed = false;
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
this.buffer = new byte[bufferSize];
this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry);
try {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
} catch (Exception ex) {
throw new IOException(ex);
}
this.lastFlushOffset = offset;
}
@Override
public void write(final int byteVal) throws IOException {
write(new byte[]{(byte) (byteVal & 0xFF)});
}
@Override
public synchronized void write(final byte[] data, final int off, final int length)
throws IOException {
maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data");
if (off < 0 || length < 0 || length > data.length - off) {
throw new IndexOutOfBoundsException();
}
int currentOffset = off;
int writableBytes = bufferSize - bufferIndex;
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes;
writeCurrentBufferToService();
currentOffset += writableBytes;
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
} else {
System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
bufferIndex += numberOfBytesToWrite;
numberOfBytesToWrite = 0;
}
writableBytes = bufferSize - bufferIndex;
}
}
/**
* Flushes this output stream and forces any buffered output bytes to be
* written out. If any data remains in the payload it is committed to the
* service. Data is queued for writing and forced out to the service
* before the call returns.
*/
@Override
public void flush() throws IOException {
if (supportFlush) {
flushInternalAsync();
}
}
/**
* Force all data in the output stream to be written to Azure storage.
* Wait to return until this is complete. Close the access to the stream and
* shutdown the upload thread pool.
* If the blob was created, its lease will be released.
* Any error encountered caught in threads and stored will be rethrown here
* after cleanup.
*/
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
LOG.debug("close path: {}", path);
try {
flushInternal();
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
buffer = null;
bufferIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
threadExecutor.shutdownNow();
}
}
}
private synchronized void writeCurrentBufferToService() throws IOException {
if (bufferIndex == 0) {
return;
}
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
buffer = new byte[bufferSize];
bufferIndex = 0;
final long offset = position;
position += bytesLength;
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
// originally: client.append(path, offset, bytes, 0, bytesLength);
SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
return null;
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
}
private void waitForTaskToComplete() throws IOException {
boolean completed;
for (completed = false; completionService.poll() != null; completed = true) {
// keep polling until there is no data
}
if (!completed) {
try {
completionService.take();
} catch (InterruptedException e) {
lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
throw lastError;
}
}
}
private void maybeThrowLastError() throws IOException {
if (lastError != null) {
throw lastError;
}
}
/**
* Try to remove the completed write operations from the beginning of write
* operation FIFO queue.
*/
private synchronized void shrinkWriteOperationQueue() throws IOException {
try {
while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
writeOperations.peek().task.get();
lastTotalAppendOffset += writeOperations.peek().length;
writeOperations.remove();
}
} catch (Exception e) {
lastError = new IOException(e);
throw lastError;
}
}
private synchronized void flushInternal() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToService();
}
private synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToServiceAsync();
}
private synchronized void flushWrittenBytesToService() throws IOException {
for (WriteOperation writeOperation : writeOperations) {
try {
writeOperation.task.get();
} catch (Exception ex) {
lastError = new IOException(ex);
throw lastError;
}
}
LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
flushWrittenBytesToServiceInternal(position);
}
private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
shrinkWriteOperationQueue();
if (this.lastTotalAppendOffset > this.lastFlushOffset) {
this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
}
}
private static class WriteOperation {
private final Future<Void> task;
private final long startOffset;
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
Preconditions.checkNotNull(task, "task");
Preconditions.checkArgument(startOffset >= 0, "startOffset");
Preconditions.checkArgument(length >= 0, "length");
this.task = task;
this.startOffset = startOffset;
this.length = length;
}
}
}

View file

@ -0,0 +1,129 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>oss-parent</artifactId>
<groupId>org.sonatype.oss</groupId>
<version>9</version>
<relativePath>../pom.xml/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop-client</artifactId>
<version>${seaweedfs.client.version}</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>org/slf4j/**</exclude>
<exclude>META-INF/maven/org.slf4j/**</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer />
</transformers>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>shaded.com.google</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc.internal</pattern>
<shadedPattern>shaded.io.grpc.internal</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>shaded.org.apache.commons</shadedPattern>
<excludes>
<exclude>org.apache.hadoop</exclude>
<exclude>org.apache.log4j</exclude>
</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.1.0</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>

View file

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBuffer {
private SeaweedInputStream stream;
private long offset; // offset within the file for the buffer
private int length; // actual length, set after the buffer is filles
private int requestedLength; // requested length of the read
private byte[] buffer; // the buffer itself
private int bufferindex = -1; // index in the buffers array in Buffer manager
private ReadBufferStatus status; // status of the buffer
private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
// waiting on this buffer gets unblocked
// fields to help with eviction logic
private long timeStamp = 0; // tick at which buffer became available to read
private boolean isFirstByteConsumed = false;
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;
public SeaweedInputStream getStream() {
return stream;
}
public void setStream(SeaweedInputStream stream) {
this.stream = stream;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public int getRequestedLength() {
return requestedLength;
}
public void setRequestedLength(int requestedLength) {
this.requestedLength = requestedLength;
}
public byte[] getBuffer() {
return buffer;
}
public void setBuffer(byte[] buffer) {
this.buffer = buffer;
}
public int getBufferindex() {
return bufferindex;
}
public void setBufferindex(int bufferindex) {
this.bufferindex = bufferindex;
}
public ReadBufferStatus getStatus() {
return status;
}
public void setStatus(ReadBufferStatus status) {
this.status = status;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public boolean isFirstByteConsumed() {
return isFirstByteConsumed;
}
public void setFirstByteConsumed(boolean isFirstByteConsumed) {
this.isFirstByteConsumed = isFirstByteConsumed;
}
public boolean isLastByteConsumed() {
return isLastByteConsumed;
}
public void setLastByteConsumed(boolean isLastByteConsumed) {
this.isLastByteConsumed = isLastByteConsumed;
}
public boolean isAnyByteConsumed() {
return isAnyByteConsumed;
}
public void setAnyByteConsumed(boolean isAnyByteConsumed) {
this.isAnyByteConsumed = isAnyByteConsumed;
}
}

View file

@ -0,0 +1,394 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
/**
* The Read Buffer Manager for Rest AbfsClient.
*/
final class ReadBufferManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
static {
BUFFER_MANAGER = new ReadBufferManager();
BUFFER_MANAGER.init();
}
static ReadBufferManager getBufferManager() {
return BUFFER_MANAGER;
}
private void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
freeList.add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
Thread t = new Thread(new ReadBufferWorker(i));
t.setDaemon(true);
threads[i] = t;
t.setName("SeaweedFS-prefetch-" + i);
t.start();
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
}
// hide instance constructor
private ReadBufferManager() {
}
/*
*
* SeaweedInputStream-facing methods
*
*/
/**
* {@link SeaweedInputStream} calls this method to queue read-aheads.
*
* @param stream The {@link SeaweedInputStream} for which to do the read-ahead
* @param requestedOffset The offset in the file which shoukd be read
* @param requestedLength The length to read
*/
void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
stream.getPath(), requestedOffset, requestedLength);
}
ReadBuffer buffer;
synchronized (this) {
if (isAlreadyQueued(stream, requestedOffset)) {
return; // already queued, do not queue again
}
if (freeList.isEmpty() && !tryEvict()) {
return; // no buffers available, cannot queue anything
}
buffer = new ReadBuffer();
buffer.setStream(stream);
buffer.setOffset(requestedOffset);
buffer.setLength(0);
buffer.setRequestedLength(requestedLength);
buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
buffer.setLatch(new CountDownLatch(1));
Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
buffer.setBuffer(buffers[bufferIndex]);
buffer.setBufferindex(bufferIndex);
readAheadQueue.add(buffer);
notifyAll();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
}
}
/**
* {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
* remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
* the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
* but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
* depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
* read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
*
* @param stream the file to read bytes for
* @param position the offset in the file to do a read for
* @param length the length to read
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}",
stream.getPath(), position, Thread.currentThread().getName());
}
waitForProcess(stream, position);
int bytesRead = 0;
synchronized (this) {
bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
}
if (bytesRead > 0) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done read from Cache for {} position {} length {}",
stream.getPath(), position, bytesRead);
}
return bytesRead;
}
// otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/*
*
* Internal methods
*
*/
private void waitForProcess(final SeaweedInputStream stream, final long position) {
ReadBuffer readBuf;
synchronized (this) {
clearFromReadAheadQueue(stream, position);
readBuf = getFromList(inProgressList, stream, position);
}
if (readBuf != null) { // if in in-progress queue, then block for it
try {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
}
readBuf.getLatch().await(); // blocking wait on the caller stream's thread
// Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
// is done processing it (in doneReading). There, the latch is set after removing the buffer from
// inProgressList. So this latch is safe to be outside the synchronized block.
// Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
// while waiting, so no one will be able to change any state. If this becomes more complex in the future,
// then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("latch done for file {} buffer idx {} length {}",
stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
}
}
}
/**
* If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
* The objective is to find just one buffer - there is no advantage to evicting more than one.
*
* @return whether the eviction succeeeded - i.e., were we able to free up one buffer
*/
private synchronized boolean tryEvict() {
ReadBuffer nodeToEvict = null;
if (completedReadList.size() <= 0) {
return false; // there are no evict-able buffers
}
// first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
for (ReadBuffer buf : completedReadList) {
if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
for (ReadBuffer buf : completedReadList) {
if (buf.isAnyByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try any old nodes that have not been consumed
long earliestBirthday = Long.MAX_VALUE;
for (ReadBuffer buf : completedReadList) {
if (buf.getTimeStamp() < earliestBirthday) {
nodeToEvict = buf;
earliestBirthday = buf.getTimeStamp();
}
}
if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
return evict(nodeToEvict);
}
// nothing can be evicted
return false;
}
private boolean evict(final ReadBuffer buf) {
freeList.push(buf.getBufferindex());
completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
}
return true;
}
private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
// returns true if any part of the buffer is already queued
return (isInList(readAheadQueue, stream, requestedOffset)
|| isInList(inProgressList, stream, requestedOffset)
|| isInList(completedReadList, stream, requestedOffset));
}
private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
return (getFromList(list, stream, requestedOffset) != null);
}
private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : list) {
if (buffer.getStream() == stream) {
if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
&& requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getLength()) {
return buffer;
} else if (requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
return buffer;
}
}
}
return null;
}
private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) {
readAheadQueue.remove(buffer);
notifyAll(); // lock is held in calling method
freeList.push(buffer.getBufferindex());
}
}
private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
final byte[] buffer) {
ReadBuffer buf = getFromList(completedReadList, stream, position);
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
return 0;
}
int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
if (cursor == 0) {
buf.setFirstByteConsumed(true);
}
if (cursor + lengthToCopy == buf.getLength()) {
buf.setLastByteConsumed(true);
}
buf.setAnyByteConsumed(true);
return lengthToCopy;
}
/*
*
* ReadBufferWorker-thread-facing methods
*
*/
/**
* ReadBufferWorker thread calls this to get the next buffer that it should work on.
*
* @return {@link ReadBuffer}
* @throws InterruptedException if thread is interrupted
*/
ReadBuffer getNextBlockToRead() throws InterruptedException {
ReadBuffer buffer = null;
synchronized (this) {
//buffer = readAheadQueue.take(); // blocking method
while (readAheadQueue.size() == 0) {
wait();
}
buffer = readAheadQueue.remove();
notifyAll();
if (buffer == null) {
return null; // should never happen
}
buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
inProgressList.add(buffer);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
buffer.getStream().getPath(), buffer.getOffset());
}
return buffer;
}
/**
* ReadBufferWorker thread calls this method to post completion.
*
* @param buffer the buffer whose read was completed
* @param result the {@link ReadBufferStatus} after the read operation in the worker thread
* @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
}
synchronized (this) {
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else {
freeList.push(buffer.getBufferindex());
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
}
}
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
* Similar to System.currentTimeMillis, except implemented with System.nanoTime().
* System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
* making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
* Note: it is not monotonic across Sockets, and even within a CPU, its only the
* more recent parts which share a clock across all cores.
*
* @return current time in milliseconds
*/
private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}
}

View file

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
/**
* The ReadBufferStatus for Rest AbfsClient
*/
public enum ReadBufferStatus {
NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
AVAILABLE, // data is available in buffer. It should be in completedList
READ_FAILED // read completed, but failed.
}

View file

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
private int id;
ReadBufferWorker(final int id) {
this.id = id;
}
/**
* return the ID of ReadBufferWorker.
*/
public int getId() {
return this.id;
}
/**
* Waits until a buffer becomes available in ReadAheadQueue.
* Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
* Rinse and repeat. Forever.
*/
public void run() {
try {
UNLEASH_WORKERS.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
ReadBuffer buffer;
while (true) {
try {
buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
if (buffer != null) {
try {
// do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
}
}
}
}
}

View file

@ -0,0 +1,611 @@
package seaweed.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
public static final String FS_SEAWEED_GRPC_CA = "fs.seaweed.ca";
public static final String FS_SEAWEED_GRPC_CLIENT_KEY = "fs.seaweed.client.key";
public static final String FS_SEAWEED_GRPC_CLIENT_CERT = "fs.seaweed.client.cert";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
private SeaweedFileSystemStore seaweedFileSystemStore;
public URI getUri() {
return uri;
}
public String getScheme() {
return "seaweedfs";
}
@Override
public void initialize(URI uri, Configuration conf) throws IOException { // get
super.initialize(uri, conf);
// get host information from uri (overrides info in conf)
String host = uri.getHost();
host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host;
if (host == null) {
throw new IOException("Invalid host specified");
}
conf.set(FS_SEAWEED_FILER_HOST, host);
// get port information from uri, (overrides info in conf)
int port = uri.getPort();
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
setConf(conf);
this.uri = uri;
if (conf.get(FS_SEAWEED_GRPC_CA) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CA).length() != 0
&& conf.get(FS_SEAWEED_GRPC_CLIENT_CERT) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_CERT).length() != 0
&& conf.get(FS_SEAWEED_GRPC_CLIENT_KEY) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_KEY).length() != 0) {
seaweedFileSystemStore = new SeaweedFileSystemStore(host, port,
conf.get(FS_SEAWEED_GRPC_CA),
conf.get(FS_SEAWEED_GRPC_CLIENT_CERT),
conf.get(FS_SEAWEED_GRPC_CLIENT_KEY));
} else {
seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
}
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
LOG.debug("open path: {} bufferSize:{}", path, bufferSize);
path = qualify(path);
try {
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
return new FSDataInputStream(inputStream);
} catch (Exception ex) {
return null;
}
}
@Override
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize, final Progressable progress) throws IOException {
LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
path = qualify(path);
try {
String replicaPlacement = String.format("%03d", replication - 1);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
return null;
}
}
@Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
LOG.debug("append path: {} bufferSize:{}", path, bufferSize);
path = qualify(path);
try {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
return null;
}
}
@Override
public boolean rename(Path src, Path dst) {
LOG.debug("rename path: {} => {}", src, dst);
if (src.isRoot()) {
return false;
}
if (src.equals(dst)) {
return true;
}
FileStatus dstFileStatus = getFileStatus(dst);
String sourceFileName = src.getName();
Path adjustedDst = dst;
if (dstFileStatus != null) {
if (!dstFileStatus.isDirectory()) {
return false;
}
adjustedDst = new Path(dst, sourceFileName);
}
Path qualifiedSrcPath = qualify(src);
Path qualifiedDstPath = qualify(adjustedDst);
seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath);
return true;
}
@Override
public boolean delete(Path path, boolean recursive) {
LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path);
FileStatus fileStatus = getFileStatus(path);
if (fileStatus == null) {
return true;
}
return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
LOG.debug("listStatus path: {}", path);
path = qualify(path);
return seaweedFileSystemStore.listEntries(path);
}
@Override
public Path getWorkingDirectory() {
return workingDirectory;
}
@Override
public void setWorkingDirectory(Path path) {
if (path.isAbsolute()) {
workingDirectory = path;
} else {
workingDirectory = new Path(workingDirectory, path);
}
}
@Override
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
LOG.debug("mkdirs path: {}", path);
path = qualify(path);
FileStatus fileStatus = getFileStatus(path);
if (fileStatus == null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
FsPermission.getUMask(getConf()));
}
if (fileStatus.isDirectory()) {
return true;
} else {
throw new FileAlreadyExistsException("Path is a file: " + path);
}
}
@Override
public FileStatus getFileStatus(Path path) {
LOG.debug("getFileStatus path: {}", path);
path = qualify(path);
return seaweedFileSystemStore.getFileStatus(path);
}
/**
* Set owner of a path (i.e. a file or a directory).
* The parameters owner and group cannot both be null.
*
* @param path The path
* @param owner If it is null, the original username remains unchanged.
* @param group If it is null, the original groupname remains unchanged.
*/
@Override
public void setOwner(Path path, final String owner, final String group)
throws IOException {
LOG.debug("setOwner path: {}", path);
path = qualify(path);
seaweedFileSystemStore.setOwner(path, owner, group);
}
/**
* Set permission of a path.
*
* @param path The path
* @param permission Access permission
*/
@Override
public void setPermission(Path path, final FsPermission permission) throws IOException {
LOG.debug("setPermission path: {}", path);
if (permission == null) {
throw new IllegalArgumentException("The permission can't be null");
}
path = qualify(path);
seaweedFileSystemStore.setPermission(path, permission);
}
Path qualify(Path path) {
return path.makeQualified(uri, workingDirectory);
}
/**
* Concat existing files together.
*
* @param trg the path to the target destination.
* @param psrcs the paths to the sources to use for the concatenation.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
@Override
public void concat(final Path trg, final Path[] psrcs) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
getClass().getSimpleName() + " FileSystem implementation");
}
/**
* Truncate the file in the indicated path to the indicated size.
* <ul>
* <li>Fails if path is a directory.</li>
* <li>Fails if path does not exist.</li>
* <li>Fails if path is not closed.</li>
* <li>Fails if new size is greater than current size.</li>
* </ul>
*
* @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to
* @return <code>true</code> if the file has been truncated to the desired
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
@Override
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
getClass().getSimpleName() + " FileSystem implementation");
}
@Override
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, UnsupportedFileSystemException,
IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
}
public boolean supportsSymlinks() {
return false;
}
/**
* Create a snapshot.
*
* @param path The directory where snapshots will be taken.
* @param snapshotName The name of the snapshot
* @return the snapshot path.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
*/
@Override
public Path createSnapshot(Path path, String snapshotName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support createSnapshot");
}
/**
* Rename a snapshot.
*
* @param path The directory path where the snapshot was taken
* @param snapshotOldName Old name of the snapshot
* @param snapshotNewName New name of the snapshot
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support renameSnapshot");
}
/**
* Delete a snapshot of a directory.
*
* @param path The directory that the to-be-deleted snapshot belongs to
* @param snapshotName The name of the snapshot
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void deleteSnapshot(Path path, String snapshotName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support deleteSnapshot");
}
/**
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List&lt;AclEntry&gt; describing modifications
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support modifyAclEntries");
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
* @param aclSpec List describing entries to remove
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeAclEntries");
}
/**
* Removes all default ACL entries from files and directories.
*
* @param path Path to modify
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void removeDefaultAcl(Path path)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeDefaultAcl");
}
/**
* Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
* @param path Path to modify
* @throws IOException if an ACL could not be removed
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void removeAcl(Path path)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeAcl");
}
/**
* Fully replaces ACL of files and directories, discarding all existing
* entries.
*
* @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
* for user, group, and others for compatibility with permission bits.
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setAcl");
}
/**
* Gets the ACL of a file or directory.
*
* @param path Path to get
* @return AclStatus describing the ACL of the file or directory
* @throws IOException if an ACL could not be read
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public AclStatus getAclStatus(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getAclStatus");
}
/**
* Set an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to modify
* @param name xattr name.
* @param value xattr value.
* @param flag xattr set flag
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setXAttr");
}
/**
* Get an xattr name and value for a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attribute
* @param name xattr name.
* @return byte[] xattr value.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttr");
}
/**
* Get all of the xattr name/value pairs for a file or directory.
* Only those xattrs which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @return Map describing the XAttrs of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttrs");
}
/**
* Get all of the xattrs name/value pairs for a file or directory.
* Only those xattrs which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @param names XAttr names.
* @return Map describing the XAttrs of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttrs");
}
/**
* Get all of the xattr names for a file or directory.
* Only those xattr names which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @return List{@literal <String>} of the XAttr names of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public List<String> listXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support listXAttrs");
}
/**
* Remove an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to remove extended attribute
* @param name xattr name
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void removeXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeXAttr");
}
}

View file

@ -0,0 +1,277 @@
package seaweed.hdfs;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedRead;
import javax.net.ssl.SSLException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
private FilerGrpcClient filerGrpcClient;
private FilerClient filerClient;
public SeaweedFileSystemStore(String host, int port) {
int grpcPort = 10000 + port;
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
filerClient = new FilerClient(filerGrpcClient);
}
public SeaweedFileSystemStore(String host, int port,
String caFile, String clientCertFile, String clientKeyFile) throws SSLException {
int grpcPort = 10000 + port;
filerGrpcClient = new FilerGrpcClient(host, grpcPort, caFile, clientCertFile, clientKeyFile);
filerClient = new FilerClient(filerGrpcClient);
}
public static String getParentDirectory(Path path) {
return path.isRoot() ? "/" : path.getParent().toUri().getPath();
}
static int permissionToMode(FsPermission permission, boolean isDirectory) {
int p = permission.toShort();
if (isDirectory) {
p = p | 1 << 31;
}
return p;
}
public boolean createDirectory(final Path path, UserGroupInformation currentUser,
final FsPermission permission, final FsPermission umask) {
LOG.debug("createDirectory path: {} permission: {} umask: {}",
path,
permission,
umask);
return filerClient.mkdirs(
path.toUri().getPath(),
permissionToMode(permission, true),
currentUser.getUserName(),
currentUser.getGroupNames()
);
}
public FileStatus[] listEntries(final Path path) {
LOG.debug("listEntries path: {}", path);
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
for (FilerProto.Entry entry : entries) {
FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry);
fileStatuses.add(fileStatus);
}
return fileStatuses.toArray(new FileStatus[0]);
}
public FileStatus getFileStatus(final Path path) {
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
return null;
}
LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
FileStatus fileStatus = doGetFileStatus(path, entry);
return fileStatus;
}
public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) {
LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
path,
String.valueOf(isDirectory),
String.valueOf(recursive));
if (path.isRoot()) {
return true;
}
if (recursive && isDirectory) {
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
for (FilerProto.Entry entry : entries) {
deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true);
}
}
return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive);
}
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes();
long length = SeaweedRead.totalSize(entry.getChunksList());
boolean isDir = entry.getIsDirectory();
int block_replication = 1;
int blocksize = 512;
long modification_time = attributes.getMtime() * 1000; // milliseconds
long access_time = 0;
FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode());
String owner = attributes.getUserName();
String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : "";
return new FileStatus(length, isDir, block_replication, blocksize,
modification_time, access_time, permission, owner, group, null, path);
}
private FilerProto.Entry lookupEntry(Path path) {
return filerClient.lookupEntry(getParentDirectory(path), path.getName());
}
public void rename(Path source, Path destination) {
LOG.debug("rename source: {} destination:{}", source, destination);
if (source.isRoot()) {
return;
}
LOG.warn("rename lookupEntry source: {}", source);
FilerProto.Entry entry = lookupEntry(source);
if (entry == null) {
LOG.warn("rename non-existing source: {}", source);
return;
}
filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
}
public OutputStream createFile(final Path path,
final boolean overwrite,
FsPermission permission,
int bufferSize,
String replication) throws IOException {
permission = permission == null ? FsPermission.getFileDefault() : permission;
LOG.debug("createFile path: {} overwrite: {} permission: {}",
path,
overwrite,
permission.toString());
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
long now = System.currentTimeMillis() / 1000L;
FilerProto.Entry.Builder entry = null;
long writePosition = 0;
if (!overwrite) {
FilerProto.Entry existingEntry = lookupEntry(path);
LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry);
if (existingEntry != null) {
entry = FilerProto.Entry.newBuilder();
entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now);
}
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
replication = existingEntry.getAttributes().getReplication();
}
if (entry == null) {
entry = FilerProto.Entry.newBuilder()
.setName(path.getName())
.setIsDirectory(false)
.setAttributes(FilerProto.FuseAttributes.newBuilder()
.setFileMode(permissionToMode(permission, false))
.setReplication(replication)
.setCrtime(now)
.setMtime(now)
.setUserName(userGroupInformation.getUserName())
.clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
}
return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
}
public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
int bufferSize) throws IOException {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
throw new FileNotFoundException("read non-exist file " + path);
}
return new SeaweedInputStream(filerGrpcClient,
statistics,
path.toUri().getPath(),
entry,
bufferSize,
readAheadQueueDepth);
}
public void setOwner(Path path, String owner, String group) {
LOG.debug("setOwner path:{} owner:{} group:{}", path, owner, group);
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
LOG.debug("setOwner path:{} entry:{}", path, entry);
return;
}
FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder();
if (owner != null) {
attributesBuilder.setUserName(owner);
}
if (group != null) {
attributesBuilder.clearGroupName();
attributesBuilder.addGroupName(group);
}
entryBuilder.setAttributes(attributesBuilder);
LOG.debug("setOwner path:{} entry:{}", path, entryBuilder);
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
}
public void setPermission(Path path, FsPermission permission) {
LOG.debug("setPermission path:{} permission:{}", path, permission);
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
LOG.debug("setPermission path:{} entry:{}", path, entry);
return;
}
FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder();
attributesBuilder.setFileMode(permissionToMode(permission, entry.getIsDirectory()));
entryBuilder.setAttributes(attributesBuilder);
LOG.debug("setPermission path:{} entry:{}", path, entryBuilder);
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
}
}

View file

@ -0,0 +1,371 @@
package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
import java.util.List;
public class SeaweedInputStream extends FSInputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
private final FilerGrpcClient filerGrpcClient;
private final Statistics statistics;
private final String path;
private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength;
private final int bufferSize; // default buffer size
private final int readAheadQueueDepth; // initialized in constructor
private final boolean readAheadEnabled; // whether enable readAhead;
private byte[] buffer = null; // will be initialized on first use
private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
private long fCursorAfterLastRead = -1;
private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
// of valid bytes in buffer)
private boolean closed = false;
public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient,
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
final int bufferSize,
final int readAheadQueueDepth) {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
}
public String getPath() {
return path;
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
int numberOfBytesRead = read(b, 0, 1);
if (numberOfBytesRead < 0) {
return -1;
} else {
return (b[0] & 0xFF);
}
}
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
currentOff += lastReadBytes;
currentLen -= lastReadBytes;
totalReadBytes += lastReadBytes;
}
if (currentLen <= 0 || currentLen > b.length - currentOff) {
break;
}
} while (lastReadBytes > 0);
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Preconditions.checkNotNull(b);
if (len == 0) {
return 0;
}
if (this.available() == 0) {
return -1;
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
//If buffer is empty, then fill the buffer.
if (bCursor == limit) {
//If EOF, then return -1
if (fCursor >= contentLength) {
return -1;
}
long bytesRead = 0;
//reset buffer to initial state - i.e., throw away existing data
bCursor = 0;
limit = 0;
if (buffer == null) {
buffer = new byte[bufferSize];
}
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
}
if (bytesRead == -1) {
return -1;
}
limit += bytesRead;
fCursor += bytesRead;
fCursorAfterLastRead = fCursor;
}
//If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
//(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor;
int bytesToRead = Math.min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead);
bCursor += bytesToRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
return bytesToRead;
}
private int readInternal(final long position, final byte[] b, final int offset, final int length,
final boolean bypassReadAhead) throws IOException {
if (readAheadEnabled && !bypassReadAhead) {
// try reading from read-ahead
if (offset != 0) {
throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
}
int receivedBytes;
// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
}
// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
return receivedBytes;
}
// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
return readRemote(position, b, offset, length);
}
}
int readRemote(long position, byte[] b, int offset, int length) throws IOException {
if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset");
}
if (position >= contentLength) {
return -1; // Hadoop prefers -1 to EOFException
}
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
if (offset >= b.length) {
throw new IllegalArgumentException("offset greater than length of array");
}
if (length < 0) {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (length > (b.length - offset)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
return (int) bytesRead;
}
/**
* Seek to given position in stream.
*
* @param n position to seek to
* @throws IOException if there is an error
* @throws EOFException if attempting to seek past end of file
*/
@Override
public synchronized void seek(long n) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
if (n < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (n > contentLength) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
if (n >= fCursor - limit && n <= fCursor) { // within buffer
bCursor = (int) (n - (fCursor - limit));
return;
}
// next read will read from here
fCursor = n;
//invalidate buffer
limit = 0;
bCursor = 0;
}
@Override
public synchronized long skip(long n) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
long currentPos = getPos();
if (currentPos == contentLength) {
if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
}
long newPos = currentPos + n;
if (newPos < 0) {
newPos = 0;
n = newPos - currentPos;
}
if (newPos > contentLength) {
newPos = contentLength;
n = newPos - currentPos;
}
seek(newPos);
return n;
}
/**
* Return the size of the remaining available bytes
* if the size is less than or equal to {@link Integer#MAX_VALUE},
* otherwise, return {@link Integer#MAX_VALUE}.
* <p>
* This is to match the behavior of DFSInputStream.available(),
* which some clients may rely on (HBase write-ahead log reading in
* particular).
*/
@Override
public synchronized int available() throws IOException {
if (closed) {
throw new IOException(
FSExceptionMessages.STREAM_IS_CLOSED);
}
final long remaining = this.contentLength - this.getPos();
return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE;
}
/**
* Returns the length of the file that this stream refers to. Note that the length returned is the length
* as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
* @throws IOException if the stream is closed
*/
public long length() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return contentLength;
}
/**
* Return the current offset from the start of the file
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public synchronized long getPos() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return fCursor - limit + bCursor;
}
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized void close() throws IOException {
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*
* @param readlimit ignored
*/
@Override
public synchronized void mark(int readlimit) {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*/
@Override
public synchronized void reset() throws IOException {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
*
* @return always {@code false}
*/
@Override
public boolean markSupported() {
return false;
}
}