remove changing buffer size

This commit is contained in:
Chris Lu 2020-07-20 20:27:19 -07:00
parent 0b2e06268b
commit ae3e6d8244
5 changed files with 9 additions and 16 deletions

View file

@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri; private URI uri;
private Path workingDirectory = new Path("/"); private Path workingDirectory = new Path("/");
@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port); conf.setInt(FS_SEAWEED_FILER_PORT, port);
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
setConf(conf); setConf(conf);
this.uri = uri; this.uri = uri;

View file

@ -56,12 +56,12 @@ public class SeaweedOutputStream extends OutputStream {
this.outputIndex = 0; this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount, = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount, maxConcurrentRequestCount,
10L, 120L,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
@ -180,7 +180,7 @@ public class SeaweedOutputStream extends OutputStream {
bufferToWrite.flip(); bufferToWrite.flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete(); waitForTaskToComplete();
} }
final Future<Void> job = completionService.submit(() -> { final Future<Void> job = completionService.submit(() -> {

View file

@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri; private URI uri;
private Path workingDirectory = new Path("/"); private Path workingDirectory = new Path("/");
@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port); conf.setInt(FS_SEAWEED_FILER_PORT, port);
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
setConf(conf); setConf(conf);
this.uri = uri; this.uri = uri;

View file

@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileSystem.Statistics;
@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final FilerProto.Entry entry, final FilerProto.Entry entry,
final int bufferSize) { final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient; this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream {
} }
} }
return (int)bytesRead; return (int) bytesRead;
} }

View file

@ -60,12 +60,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.outputIndex = 0; this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount, = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount, maxConcurrentRequestCount,
10L, 120L,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
@ -113,7 +113,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
break; break;
} }
// System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes); buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes; outputIndex += writableBytes;
currentOffset += writableBytes; currentOffset += writableBytes;
@ -227,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
bufferToWrite.flip(); bufferToWrite.flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete(); waitForTaskToComplete();
} }
final Future<Void> job = completionService.submit(() -> { final Future<Void> job = completionService.submit(() -> {