HDFS: add tls secured grpc

This commit is contained in:
Chris Lu 2019-02-19 11:57:25 -08:00
parent 07af52cb6f
commit 58d4088db4
5 changed files with 142 additions and 86 deletions

View file

@ -4,7 +4,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.0.5</version> <version>1.0.7</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -2,7 +2,14 @@ package seaweedfs.client;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import javax.net.ssl.SSLException;
import java.io.File;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -20,6 +27,16 @@ public class FilerGrpcClient {
this(ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()); this(ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext());
} }
public FilerGrpcClient(String host, int grpcPort,
String caFilePath,
String clientCertFilePath,
String clientPrivateKeyFilePath) throws SSLException {
this(NettyChannelBuilder.forAddress(host, grpcPort)
.negotiationType(NegotiationType.TLS)
.sslContext(buildSslContext(caFilePath,clientCertFilePath,clientPrivateKeyFilePath)));
}
public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build(); channel = channelBuilder.build();
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
@ -42,4 +59,18 @@ public class FilerGrpcClient {
public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() { public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() {
return futureStub; return futureStub;
} }
private static SslContext buildSslContext(String trustCertCollectionFilePath,
String clientCertChainFilePath,
String clientPrivateKeyFilePath) throws SSLException {
SslContextBuilder builder = GrpcSslContexts.forClient();
if (trustCertCollectionFilePath != null) {
builder.trustManager(new File(trustCertCollectionFilePath));
}
if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) {
builder.keyManager(new File(clientCertChainFilePath), new File(clientPrivateKeyFilePath));
}
return builder.build();
}
} }

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.0.5</seaweedfs.client.version> <seaweedfs.client.version>1.0.7</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>

View file

@ -34,6 +34,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
public static final String FS_SEAWEED_GRPC_CA = "fs.seaweed.ca";
public static final String FS_SEAWEED_GRPC_CLIENT_KEY = "fs.seaweed.client.key";
public static final String FS_SEAWEED_GRPC_CLIENT_CERT = "fs.seaweed.client.cert";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
private static int BUFFER_SIZE = 16 * 1024 * 1024; private static int BUFFER_SIZE = 16 * 1024 * 1024;
@ -72,9 +75,19 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
setConf(conf); setConf(conf);
this.uri = uri; this.uri = uri;
if (conf.get(FS_SEAWEED_GRPC_CA) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CA).length() != 0
&& conf.get(FS_SEAWEED_GRPC_CLIENT_CERT) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_CERT).length() != 0
&& conf.get(FS_SEAWEED_GRPC_CLIENT_KEY) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_KEY).length() != 0) {
seaweedFileSystemStore = new SeaweedFileSystemStore(host, port,
conf.get(FS_SEAWEED_GRPC_CA),
conf.get(FS_SEAWEED_GRPC_CLIENT_CERT),
conf.get(FS_SEAWEED_GRPC_CLIENT_KEY));
} else {
seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
} }
}
@Override @Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException { public FSDataInputStream open(Path path, int bufferSize) throws IOException {
@ -271,6 +284,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/** /**
* Concat existing files together. * Concat existing files together.
*
* @param trg the path to the target destination. * @param trg the path to the target destination.
* @param psrcs the paths to the sources to use for the concatenation. * @param psrcs the paths to the sources to use for the concatenation.
* @throws IOException IO failure * @throws IOException IO failure
@ -291,9 +305,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* <li>Fails if path is not closed.</li> * <li>Fails if path is not closed.</li>
* <li>Fails if new size is greater than current size.</li> * <li>Fails if new size is greater than current size.</li>
* </ul> * </ul>
*
* @param f The path to the file to be truncated * @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to * @param newLength The size the file is to be truncated to
*
* @return <code>true</code> if the file has been truncated to the desired * @return <code>true</code> if the file has been truncated to the desired
* <code>newLength</code> and is immediately available to be reused for * <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or * write operations such as <code>append</code>, or
@ -327,6 +341,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/** /**
* Create a snapshot. * Create a snapshot.
*
* @param path The directory where snapshots will be taken. * @param path The directory where snapshots will be taken.
* @param snapshotName The name of the snapshot * @param snapshotName The name of the snapshot
* @return the snapshot path. * @return the snapshot path.
@ -342,6 +357,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/** /**
* Rename a snapshot. * Rename a snapshot.
*
* @param path The directory path where the snapshot was taken * @param path The directory path where the snapshot was taken
* @param snapshotOldName Old name of the snapshot * @param snapshotOldName Old name of the snapshot
* @param snapshotNewName New name of the snapshot * @param snapshotNewName New name of the snapshot
@ -358,6 +374,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/** /**
* Delete a snapshot of a directory. * Delete a snapshot of a directory.
*
* @param path The directory that the to-be-deleted snapshot belongs to * @param path The directory that the to-be-deleted snapshot belongs to
* @param snapshotName The name of the snapshot * @param snapshotName The name of the snapshot
* @throws IOException IO failure * @throws IOException IO failure

View file

@ -12,6 +12,7 @@ import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedRead; import seaweedfs.client.SeaweedRead;
import javax.net.ssl.SSLException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -33,6 +34,13 @@ public class SeaweedFileSystemStore {
filerClient = new FilerClient(filerGrpcClient); filerClient = new FilerClient(filerGrpcClient);
} }
public SeaweedFileSystemStore(String host, int port,
String caFile, String clientCertFile, String clientKeyFile) throws SSLException {
int grpcPort = 10000 + port;
filerGrpcClient = new FilerGrpcClient(host, grpcPort, caFile, clientCertFile, clientKeyFile);
filerClient = new FilerClient(filerGrpcClient);
}
public static String getParentDirectory(Path path) { public static String getParentDirectory(Path path) {
return path.isRoot() ? "/" : path.getParent().toUri().getPath(); return path.isRoot() ? "/" : path.getParent().toUri().getPath();
} }