reuse bytebuffer

This commit is contained in:
Chris Lu 2020-07-15 23:33:31 -07:00
parent 2286d27730
commit 7bca72deed
10 changed files with 89 additions and 122 deletions

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.3.4</version> <version>1.3.5</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.3.4</version> <version>1.3.5</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.3.4</version> <version>1.3.5</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -0,0 +1,22 @@
package seaweedfs.client;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ByteBufferPool {
static List<ByteBuffer> bufferList = new ArrayList<>();
public static synchronized ByteBuffer request(int bufferSize) {
if (bufferList.isEmpty()) {
return ByteBuffer.allocate(bufferSize);
}
return bufferList.remove(bufferList.size()-1);
}
public static synchronized void release(ByteBuffer obj) {
bufferList.add(obj);
}
}

View file

@ -127,7 +127,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.3.4</seaweedfs.client.version> <seaweedfs.client.version>1.3.5</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>
</project> </project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.3.4</seaweedfs.client.version> <seaweedfs.client.version>1.3.5</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>

View file

@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite; import seaweedfs.client.SeaweedWrite;
@ -14,7 +15,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays; import java.nio.ByteBuffer;
import java.util.concurrent.*; import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@ -30,15 +31,15 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor; private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService; private final ExecutorCompletionService<Void> completionService;
private final FilerProto.Entry.Builder entry; private final FilerProto.Entry.Builder entry;
private final boolean supportFlush = true; private final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position; private long position;
private boolean closed; private boolean closed;
private volatile IOException lastError; private volatile IOException lastError;
private long lastFlushOffset; private long lastFlushOffset;
private long lastTotalAppendOffset = 0; private long lastTotalAppendOffset = 0;
private byte[] buffer; private ByteBuffer buffer;
private int bufferIndex; private long outputIndex;
private String replication = "000"; private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@ -51,8 +52,8 @@ public class SeaweedOutputStream extends OutputStream {
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
// this.buffer = new byte[bufferSize]; this.buffer = ByteBufferPool.request(bufferSize);
this.bufferIndex = 0; this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
@ -94,41 +95,29 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
// System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
int currentOffset = off; int currentOffset = off;
int writableBytes = bufferSize - bufferIndex; int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length; int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) { while (numberOfBytesToWrite > 0) {
if (buffer == null) { if (numberOfBytesToWrite < writableBytes) {
buffer = new byte[32]; buffer.put(data, currentOffset, numberOfBytesToWrite);
} outputIndex += numberOfBytesToWrite;
// ensureCapacity break;
if (numberOfBytesToWrite > buffer.length - bufferIndex) {
int capacity = buffer.length;
while(capacity-bufferIndex<numberOfBytesToWrite){
capacity = capacity << 1;
}
if (capacity < 0) {
throw new OutOfMemoryError();
}
buffer = Arrays.copyOf(buffer, capacity);
} }
if (writableBytes <= numberOfBytesToWrite) { // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); buffer.put(data, currentOffset, writableBytes);
bufferIndex += writableBytes; outputIndex += writableBytes;
writeCurrentBufferToService(); currentOffset += writableBytes;
currentOffset += writableBytes; writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
} else { writableBytes = bufferSize - buffer.position();
System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
bufferIndex += numberOfBytesToWrite;
numberOfBytesToWrite = 0;
}
writableBytes = bufferSize - bufferIndex;
} }
} }
/** /**
@ -164,8 +153,9 @@ public class SeaweedOutputStream extends OutputStream {
threadExecutor.shutdown(); threadExecutor.shutdown();
} finally { } finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
ByteBufferPool.release(buffer);
buffer = null; buffer = null;
bufferIndex = 0; outputIndex = 0;
closed = true; closed = true;
writeOperations.clear(); writeOperations.clear();
if (!threadExecutor.isShutdown()) { if (!threadExecutor.isShutdown()) {
@ -175,35 +165,17 @@ public class SeaweedOutputStream extends OutputStream {
} }
private synchronized void writeCurrentBufferToService() throws IOException { private synchronized void writeCurrentBufferToService() throws IOException {
if (bufferIndex == 0) { if (buffer.position() == 0) {
return; return;
} }
final byte[] bytes = buffer; buffer.flip();
final int bytesLength = bufferIndex; int bytesLength = buffer.limit() - buffer.position();
SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit());
buffer = null; // new byte[bufferSize]; // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")");
bufferIndex = 0;
final long offset = position;
position += bytesLength; position += bytesLength;
buffer.clear();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
// originally: client.append(path, offset, bytes, 0, bytesLength);
SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
return null;
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
} }
private void waitForTaskToComplete() throws IOException { private void waitForTaskToComplete() throws IOException {

View file

@ -127,7 +127,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.3.4</seaweedfs.client.version> <seaweedfs.client.version>1.3.5</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>
</project> </project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.3.4</seaweedfs.client.version> <seaweedfs.client.version>1.3.5</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>

View file

@ -9,6 +9,7 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite; import seaweedfs.client.SeaweedWrite;
@ -16,6 +17,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -33,15 +35,15 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
private final ThreadPoolExecutor threadExecutor; private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService; private final ExecutorCompletionService<Void> completionService;
private final FilerProto.Entry.Builder entry; private final FilerProto.Entry.Builder entry;
private final boolean supportFlush = true; private final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position; private long position;
private boolean closed; private boolean closed;
private volatile IOException lastError; private volatile IOException lastError;
private long lastFlushOffset; private long lastFlushOffset;
private long lastTotalAppendOffset = 0; private long lastTotalAppendOffset = 0;
private byte[] buffer; private ByteBuffer buffer;
private int bufferIndex; private long outputIndex;
private String replication = "000"; private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@ -54,8 +56,8 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
// this.buffer = new byte[bufferSize]; this.buffer = ByteBufferPool.request(bufferSize);
this.bufferIndex = 0; this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
@ -97,41 +99,29 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
// System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
int currentOffset = off; int currentOffset = off;
int writableBytes = bufferSize - bufferIndex; int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length; int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) { while (numberOfBytesToWrite > 0) {
if (buffer == null) { if (numberOfBytesToWrite < writableBytes) {
buffer = new byte[32]; buffer.put(data, currentOffset, numberOfBytesToWrite);
} outputIndex += numberOfBytesToWrite;
// ensureCapacity break;
if (numberOfBytesToWrite > buffer.length - bufferIndex) {
int capacity = buffer.length;
while (capacity - bufferIndex < numberOfBytesToWrite) {
capacity = capacity << 1;
}
if (capacity < 0) {
throw new OutOfMemoryError();
}
buffer = Arrays.copyOf(buffer, capacity);
} }
if (writableBytes <= numberOfBytesToWrite) { // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); buffer.put(data, currentOffset, writableBytes);
bufferIndex += writableBytes; outputIndex += writableBytes;
writeCurrentBufferToService(); currentOffset += writableBytes;
currentOffset += writableBytes; writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
} else { writableBytes = bufferSize - buffer.position();
System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
bufferIndex += numberOfBytesToWrite;
numberOfBytesToWrite = 0;
}
writableBytes = bufferSize - bufferIndex;
} }
} }
/** /**
@ -210,8 +200,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
threadExecutor.shutdown(); threadExecutor.shutdown();
} finally { } finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
ByteBufferPool.release(buffer);
buffer = null; buffer = null;
bufferIndex = 0; outputIndex = 0;
closed = true; closed = true;
writeOperations.clear(); writeOperations.clear();
if (!threadExecutor.isShutdown()) { if (!threadExecutor.isShutdown()) {
@ -221,35 +212,17 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
} }
private synchronized void writeCurrentBufferToService() throws IOException { private synchronized void writeCurrentBufferToService() throws IOException {
if (bufferIndex == 0) { if (buffer.position() == 0) {
return; return;
} }
final byte[] bytes = buffer; buffer.flip();
final int bytesLength = bufferIndex; int bytesLength = buffer.limit() - buffer.position();
SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit());
buffer = null; // new byte[bufferSize]; // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")");
bufferIndex = 0;
final long offset = position;
position += bytesLength; position += bytesLength;
buffer.clear();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
// originally: client.append(path, offset, bytes, 0, bytesLength);
SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
return null;
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
} }
private void waitForTaskToComplete() throws IOException { private void waitForTaskToComplete() throws IOException {