Hadoop: 1.3.3

improve memory efficiency
This commit is contained in:
Chris Lu 2020-07-15 13:25:44 -07:00
parent 316e853e0e
commit bc3be0bb37
9 changed files with 64 additions and 37 deletions

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.3.2</version> <version>1.3.3</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.3.2</version> <version>1.3.3</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.3.2</version> <version>1.3.3</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -127,7 +127,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.3.2</seaweedfs.client.version> <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>
</project> </project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.3.2</seaweedfs.client.version> <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>

View file

@ -14,6 +14,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
import java.util.concurrent.*; import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@ -28,16 +29,16 @@ public class SeaweedOutputStream extends OutputStream {
private final int maxConcurrentRequestCount; private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor; private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService; private final ExecutorCompletionService<Void> completionService;
private FilerProto.Entry.Builder entry; private final FilerProto.Entry.Builder entry;
private final boolean supportFlush = true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position; private long position;
private boolean closed; private boolean closed;
private boolean supportFlush = true;
private volatile IOException lastError; private volatile IOException lastError;
private long lastFlushOffset; private long lastFlushOffset;
private long lastTotalAppendOffset = 0; private long lastTotalAppendOffset = 0;
private byte[] buffer; private byte[] buffer;
private int bufferIndex; private int bufferIndex;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private String replication = "000"; private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@ -50,18 +51,18 @@ public class SeaweedOutputStream extends OutputStream {
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.buffer = new byte[bufferSize]; // this.buffer = new byte[bufferSize];
this.bufferIndex = 0; this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount, = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount, maxConcurrentRequestCount,
10L, 10L,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry; this.entry = entry;
@ -84,7 +85,7 @@ public class SeaweedOutputStream extends OutputStream {
@Override @Override
public synchronized void write(final byte[] data, final int off, final int length) public synchronized void write(final byte[] data, final int off, final int length)
throws IOException { throws IOException {
maybeThrowLastError(); maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data"); Preconditions.checkArgument(data != null, "null data");
@ -98,6 +99,22 @@ public class SeaweedOutputStream extends OutputStream {
int numberOfBytesToWrite = length; int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) { while (numberOfBytesToWrite > 0) {
if (buffer == null) {
buffer = new byte[32];
}
// ensureCapacity
if (numberOfBytesToWrite > buffer.length - bufferIndex) {
int capacity = buffer.length;
while(capacity-bufferIndex<numberOfBytesToWrite){
capacity = capacity << 1;
}
if (capacity < 0) {
throw new OutOfMemoryError();
}
buffer = Arrays.copyOf(buffer, capacity);
}
if (writableBytes <= numberOfBytesToWrite) { if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes; bufferIndex += writableBytes;
@ -165,7 +182,7 @@ public class SeaweedOutputStream extends OutputStream {
final byte[] bytes = buffer; final byte[] bytes = buffer;
final int bytesLength = bufferIndex; final int bytesLength = bufferIndex;
buffer = new byte[bufferSize]; buffer = null; // new byte[bufferSize];
bufferIndex = 0; bufferIndex = 0;
final long offset = position; final long offset = position;
position += bytesLength; position += bytesLength;

View file

@ -127,7 +127,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.3.2</seaweedfs.client.version> <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>
</project> </project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.3.2</seaweedfs.client.version> <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>

View file

@ -16,14 +16,8 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Locale; import java.util.Arrays;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@ -37,16 +31,16 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
private final int maxConcurrentRequestCount; private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor; private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService; private final ExecutorCompletionService<Void> completionService;
private FilerProto.Entry.Builder entry; private final FilerProto.Entry.Builder entry;
private final boolean supportFlush = true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position; private long position;
private boolean closed; private boolean closed;
private boolean supportFlush = true;
private volatile IOException lastError; private volatile IOException lastError;
private long lastFlushOffset; private long lastFlushOffset;
private long lastTotalAppendOffset = 0; private long lastTotalAppendOffset = 0;
private byte[] buffer; private byte[] buffer;
private int bufferIndex; private int bufferIndex;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private String replication = "000"; private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@ -59,18 +53,18 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.buffer = new byte[bufferSize]; // this.buffer = new byte[bufferSize];
this.bufferIndex = 0; this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount, = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount, maxConcurrentRequestCount,
10L, 10L,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry; this.entry = entry;
@ -93,7 +87,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
@Override @Override
public synchronized void write(final byte[] data, final int off, final int length) public synchronized void write(final byte[] data, final int off, final int length)
throws IOException { throws IOException {
maybeThrowLastError(); maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data"); Preconditions.checkArgument(data != null, "null data");
@ -107,6 +101,22 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
int numberOfBytesToWrite = length; int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) { while (numberOfBytesToWrite > 0) {
if (buffer == null) {
buffer = new byte[32];
}
// ensureCapacity
if (numberOfBytesToWrite > buffer.length - bufferIndex) {
int capacity = buffer.length;
while(capacity-bufferIndex<numberOfBytesToWrite){
capacity = capacity << 1;
}
if (capacity < 0) {
throw new OutOfMemoryError();
}
buffer = Arrays.copyOf(buffer, capacity);
}
if (writableBytes <= numberOfBytesToWrite) { if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes; bufferIndex += writableBytes;
@ -217,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
final byte[] bytes = buffer; final byte[] bytes = buffer;
final int bytesLength = bufferIndex; final int bytesLength = bufferIndex;
buffer = new byte[bufferSize]; buffer = null; // new byte[bufferSize];
bufferIndex = 0; bufferIndex = 0;
final long offset = position; final long offset = position;
position += bytesLength; position += bytesLength;