diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index d5c3399ed..68c281992 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -33,6 +33,7 @@ public class SeaweedOutputStream extends OutputStream { private ByteBuffer buffer; private long outputIndex; private String replication = ""; + private String collection = ""; public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { this(filerClient, fullpath, ""); @@ -53,7 +54,6 @@ public class SeaweedOutputStream extends OutputStream { this.lastFlushOffset = 0; this.bufferSize = bufferSize; this.buffer = ByteBufferPool.request(bufferSize); - this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); @@ -83,6 +83,13 @@ public class SeaweedOutputStream extends OutputStream { } + public void setReplication(String replication) { + this.replication = replication; + } + public void setCollection(String collection) { + this.collection = collection; + } + public static String getParentDirectory(String path) { int protoIndex = path.indexOf("://"); if (protoIndex >= 0) { @@ -144,13 +151,11 @@ public class SeaweedOutputStream extends OutputStream { if (numberOfBytesToWrite < writableBytes) { buffer.put(data, currentOffset, numberOfBytesToWrite); - outputIndex += numberOfBytesToWrite; break; } // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); - outputIndex += writableBytes; currentOffset += writableBytes; writeCurrentBufferToService(); numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; @@ -194,7 +199,6 @@ public class SeaweedOutputStream extends OutputStream { lastError = new IOException("Stream is closed!"); ByteBufferPool.release(buffer); buffer = null; - outputIndex = 0; closed = true; writeOperations.clear(); if (!threadExecutor.isShutdown()) { @@ -225,7 +229,7 @@ public class SeaweedOutputStream extends OutputStream { } final Future job = completionService.submit(() -> { // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); + SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); ByteBufferPool.release(bufferToWrite); return null; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index f477303c9..88c7cefbe 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -26,6 +26,7 @@ public class SeaweedWrite { public static void writeData(FilerProto.Entry.Builder entry, final String replication, + String collection, final FilerClient filerClient, final long offset, final byte[] bytes, @@ -36,7 +37,7 @@ public class SeaweedWrite { for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { try { FilerProto.FileChunk.Builder chunkBuilder = writeChunk( - replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); + replication, collection, filerClient, offset, bytes, bytesOffset, bytesLength, path); lastException = null; synchronized (entry) { entry.addChunks(chunkBuilder); @@ -59,6 +60,7 @@ public class SeaweedWrite { } public static FilerProto.FileChunk.Builder writeChunk(final String replication, + final String collection, final FilerClient filerClient, final long offset, final byte[] bytes, @@ -67,7 +69,7 @@ public class SeaweedWrite { final String path) throws IOException { FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() - .setCollection(filerClient.getCollection()) + .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection) .setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication) .setDataCenter("") .setTtlSec(0)