diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 85490c181..2341d335d 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem { public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - setConf(conf); this.uri = uri; diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 209e32d0b..d62d74fb1 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -56,12 +56,12 @@ public class SeaweedOutputStream extends OutputStream { this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, - 10L, + 120L, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); @@ -180,7 +180,7 @@ public class SeaweedOutputStream extends OutputStream { bufferToWrite.flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } final Future job = completionService.submit(() -> { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 85490c181..2341d335d 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem { public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - setConf(conf); this.uri = uri; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 10ec9b3cc..6b3c72f7d 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,7 +2,6 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream { final Statistics statistics, final String path, final FilerProto.Entry entry, - final int bufferSize) { + final int bufferSize) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; @@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream { } } - return (int)bytesRead; + return (int) bytesRead; } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index c602a0d81..05805b9e5 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -60,12 +60,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, - 10L, + 120L, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); @@ -113,7 +113,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea break; } - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); outputIndex += writableBytes; currentOffset += writableBytes; @@ -227,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea bufferToWrite.flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } final Future job = completionService.submit(() -> {