1.4.7 hdfs configurable fs.seaweed.buffer.size

This commit is contained in:
Chris Lu 2020-09-16 17:18:18 -07:00
parent feca07bf96
commit 5eee4983f3
9 changed files with 37 additions and 35 deletions

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.4.6</version> <version>1.4.7</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.4.6</version> <version>1.4.7</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.4.6</version> <version>1.4.7</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -301,7 +301,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.4.6</seaweedfs.client.version> <seaweedfs.client.version>1.4.7</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>
</project> </project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.4.6</seaweedfs.client.version> <seaweedfs.client.version>1.4.7</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>

View file

@ -5,7 +5,6 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -14,20 +13,19 @@ import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
public class SeaweedFileSystem extends FileSystem { public class SeaweedFileSystem extends FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
@ -75,8 +73,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path); path = qualify(path);
try { try {
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null; return null;
@ -93,7 +92,8 @@ public class SeaweedFileSystem extends FileSystem {
try { try {
String replicaPlacement = String.format("%03d", replication - 1); String replicaPlacement = String.format("%03d", replication - 1);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
@ -103,8 +103,9 @@ public class SeaweedFileSystem extends FileSystem {
/** /**
* {@inheritDoc} * {@inheritDoc}
*
* @throws FileNotFoundException if the parent directory is not present -or * @throws FileNotFoundException if the parent directory is not present -or
* is not a directory. * is not a directory.
*/ */
@Override @Override
public FSDataOutputStream createNonRecursive(Path path, public FSDataOutputStream createNonRecursive(Path path,
@ -121,9 +122,10 @@ public class SeaweedFileSystem extends FileSystem {
throw new FileAlreadyExistsException("Not a directory: " + parent); throw new FileAlreadyExistsException("Not a directory: " + parent);
} }
} }
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
return create(path, permission, return create(path, permission,
flags.contains(CreateFlag.OVERWRITE), bufferSize, flags.contains(CreateFlag.OVERWRITE), bufferSize,
replication, blockSize, progress); replication, seaweedBufferSize, progress);
} }
@Override @Override
@ -133,7 +135,8 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path); path = qualify(path);
try { try {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
@ -338,9 +341,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override @Override
public void createSymlink(final Path target, final Path link, public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException, final boolean createParent) throws
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, UnsupportedFileSystemException,
IOException { IOException {
// Supporting filesystems should override this method // Supporting filesystems should override this method
throw new UnsupportedOperationException( throw new UnsupportedOperationException(

View file

@ -309,7 +309,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.4.6</seaweedfs.client.version> <seaweedfs.client.version>1.4.7</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>
</project> </project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.4.6</seaweedfs.client.version> <seaweedfs.client.version>1.4.7</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>

View file

@ -5,7 +5,6 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -14,20 +13,19 @@ import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
public class SeaweedFileSystem extends FileSystem { public class SeaweedFileSystem extends FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
@ -75,8 +73,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path); path = qualify(path);
try { try {
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null; return null;
@ -93,7 +92,8 @@ public class SeaweedFileSystem extends FileSystem {
try { try {
String replicaPlacement = String.format("%03d", replication - 1); String replicaPlacement = String.format("%03d", replication - 1);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
@ -103,8 +103,9 @@ public class SeaweedFileSystem extends FileSystem {
/** /**
* {@inheritDoc} * {@inheritDoc}
*
* @throws FileNotFoundException if the parent directory is not present -or * @throws FileNotFoundException if the parent directory is not present -or
* is not a directory. * is not a directory.
*/ */
@Override @Override
public FSDataOutputStream createNonRecursive(Path path, public FSDataOutputStream createNonRecursive(Path path,
@ -121,9 +122,10 @@ public class SeaweedFileSystem extends FileSystem {
throw new FileAlreadyExistsException("Not a directory: " + parent); throw new FileAlreadyExistsException("Not a directory: " + parent);
} }
} }
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
return create(path, permission, return create(path, permission,
flags.contains(CreateFlag.OVERWRITE), bufferSize, flags.contains(CreateFlag.OVERWRITE), bufferSize,
replication, blockSize, progress); replication, seaweedBufferSize, progress);
} }
@Override @Override
@ -133,7 +135,8 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path); path = qualify(path);
try { try {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
@ -338,9 +341,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override @Override
public void createSymlink(final Path target, final Path link, public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException, final boolean createParent) throws
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, UnsupportedFileSystemException,
IOException { IOException {
// Supporting filesystems should override this method // Supporting filesystems should override this method
throw new UnsupportedOperationException( throw new UnsupportedOperationException(