Java: SeaweedOutputStream add option to pass in collection

This commit is contained in:
chrislu 2023-08-17 08:20:50 -07:00
parent 2af4cab3d0
commit 4523cd7a32
2 changed files with 13 additions and 7 deletions

View file

@ -33,6 +33,7 @@ public class SeaweedOutputStream extends OutputStream {
private ByteBuffer buffer; private ByteBuffer buffer;
private long outputIndex; private long outputIndex;
private String replication = ""; private String replication = "";
private String collection = "";
public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { public SeaweedOutputStream(FilerClient filerClient, final String fullpath) {
this(filerClient, fullpath, ""); this(filerClient, fullpath, "");
@ -53,7 +54,6 @@ public class SeaweedOutputStream extends OutputStream {
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.buffer = ByteBufferPool.request(bufferSize); this.buffer = ByteBufferPool.request(bufferSize);
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
@ -83,6 +83,13 @@ public class SeaweedOutputStream extends OutputStream {
} }
public void setReplication(String replication) {
this.replication = replication;
}
public void setCollection(String collection) {
this.collection = collection;
}
public static String getParentDirectory(String path) { public static String getParentDirectory(String path) {
int protoIndex = path.indexOf("://"); int protoIndex = path.indexOf("://");
if (protoIndex >= 0) { if (protoIndex >= 0) {
@ -144,13 +151,11 @@ public class SeaweedOutputStream extends OutputStream {
if (numberOfBytesToWrite < writableBytes) { if (numberOfBytesToWrite < writableBytes) {
buffer.put(data, currentOffset, numberOfBytesToWrite); buffer.put(data, currentOffset, numberOfBytesToWrite);
outputIndex += numberOfBytesToWrite;
break; break;
} }
// System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes); buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes; currentOffset += writableBytes;
writeCurrentBufferToService(); writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
@ -194,7 +199,6 @@ public class SeaweedOutputStream extends OutputStream {
lastError = new IOException("Stream is closed!"); lastError = new IOException("Stream is closed!");
ByteBufferPool.release(buffer); ByteBufferPool.release(buffer);
buffer = null; buffer = null;
outputIndex = 0;
closed = true; closed = true;
writeOperations.clear(); writeOperations.clear();
if (!threadExecutor.isShutdown()) { if (!threadExecutor.isShutdown()) {
@ -225,7 +229,7 @@ public class SeaweedOutputStream extends OutputStream {
} }
final Future<Void> job = completionService.submit(() -> { final Future<Void> job = completionService.submit(() -> {
// System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
// System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
ByteBufferPool.release(bufferToWrite); ByteBufferPool.release(bufferToWrite);
return null; return null;

View file

@ -26,6 +26,7 @@ public class SeaweedWrite {
public static void writeData(FilerProto.Entry.Builder entry, public static void writeData(FilerProto.Entry.Builder entry,
final String replication, final String replication,
String collection,
final FilerClient filerClient, final FilerClient filerClient,
final long offset, final long offset,
final byte[] bytes, final byte[] bytes,
@ -36,7 +37,7 @@ public class SeaweedWrite {
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
try { try {
FilerProto.FileChunk.Builder chunkBuilder = writeChunk( FilerProto.FileChunk.Builder chunkBuilder = writeChunk(
replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); replication, collection, filerClient, offset, bytes, bytesOffset, bytesLength, path);
lastException = null; lastException = null;
synchronized (entry) { synchronized (entry) {
entry.addChunks(chunkBuilder); entry.addChunks(chunkBuilder);
@ -59,6 +60,7 @@ public class SeaweedWrite {
} }
public static FilerProto.FileChunk.Builder writeChunk(final String replication, public static FilerProto.FileChunk.Builder writeChunk(final String replication,
final String collection,
final FilerClient filerClient, final FilerClient filerClient,
final long offset, final long offset,
final byte[] bytes, final byte[] bytes,
@ -67,7 +69,7 @@ public class SeaweedWrite {
final String path) throws IOException { final String path) throws IOException {
FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder() FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(filerClient.getCollection()) .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
.setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication) .setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
.setDataCenter("") .setDataCenter("")
.setTtlSec(0) .setTtlSec(0)