HDFS support encrypted data storage

This commit is contained in:
Chris Lu 2020-03-14 00:27:57 -07:00
parent e2e691d9c2
commit de1ba85346
11 changed files with 308 additions and 59 deletions

View file

@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.2.4</version>
<version>1.2.5</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@ -88,8 +89,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
@ -97,9 +98,11 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>

View file

@ -14,12 +14,6 @@ import java.util.concurrent.TimeUnit;
public class FilerGrpcClient {
private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class);
private final ManagedChannel channel;
private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
static SslContext sslContext;
static {
@ -30,6 +24,14 @@ public class FilerGrpcClient {
}
}
private final ManagedChannel channel;
private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
private boolean cipher = false;
private String collection = "";
private String replication = "";
public FilerGrpcClient(String host, int grpcPort) {
this(host, grpcPort, sslContext);
}
@ -42,6 +44,13 @@ public class FilerGrpcClient {
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
this.getBlockingStub().getFilerConfiguration(
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
cipher = filerConfigurationResponse.getCipher();
collection = filerConfigurationResponse.getCollection();
replication = filerConfigurationResponse.getReplication();
}
public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
@ -51,6 +60,18 @@ public class FilerGrpcClient {
futureStub = SeaweedFilerGrpc.newFutureStub(channel);
}
public boolean isCipher() {
return cipher;
}
public String getCollection() {
return collection;
}
public String getReplication() {
return replication;
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}

View file

@ -0,0 +1,37 @@
package seaweedfs.client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class Gzip {
public static byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length);
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.close();
byte[] compressed = bos.toByteArray();
bos.close();
return compressed;
}
public static byte[] decompress(byte[] compressed) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
GZIPInputStream gis = new GZIPInputStream(bis);
return readAll(gis);
}
private static byte[] readAll(InputStream input) throws IOException {
try( ByteArrayOutputStream output = new ByteArrayOutputStream()){
byte[] buffer = new byte[4096];
int n;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
}
return output.toByteArray();
}
}
}

View file

@ -0,0 +1,55 @@
package seaweedfs.client;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.security.SecureRandom;
public class SeaweedCipher {
// AES-GCM parameters
public static final int AES_KEY_SIZE = 256; // in bits
public static final int GCM_NONCE_LENGTH = 12; // in bytes
public static final int GCM_TAG_LENGTH = 16; // in bytes
private static SecureRandom random = new SecureRandom();
public static byte[] genCipherKey() throws Exception {
byte[] key = new byte[AES_KEY_SIZE / 8];
random.nextBytes(key);
return key;
}
public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception {
return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey);
}
public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception {
final byte[] nonce = new byte[GCM_NONCE_LENGTH];
random.nextBytes(nonce);
GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce);
SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec);
byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length);
byte[] iv = AES_cipherInstance.getIV();
byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH];
System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH);
System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length);
return message;
}
public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception {
final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH);
SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params);
byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH);
return decryptedText;
}
}

View file

@ -6,6 +6,7 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import java.io.Closeable;
import java.io.IOException;
@ -31,7 +32,7 @@ public class SeaweedRead {
}
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
.getBlockingStub().lookupVolume(lookupRequest.build());
.getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
@ -56,14 +57,18 @@ public class SeaweedRead {
}
private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
if (chunkView.cipherKey != null) {
return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations);
}
HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
if (!chunkView.isFullChunk) {
request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
request.setHeader(HttpHeaders.RANGE,
String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1));
String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1));
}
try {
@ -85,6 +90,44 @@ public class SeaweedRead {
}
}
private static int readEncryptedChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
byte[] data = null;
try {
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
data = EntityUtils.toByteArray(entity);
} finally {
if (client instanceof Closeable) {
Closeable t = (Closeable) client;
t.close();
}
}
if (chunkView.isGzipped) {
data = Gzip.decompress(data);
}
try {
data = SeaweedCipher.decrypt(data, chunkView.cipherKey);
} catch (Exception e) {
throw new IOException("fail to decrypt", e);
}
int len = (int) (chunkView.logicOffset - position + chunkView.size);
System.arraycopy(data, (int) chunkView.offset, buffer, startOffset, len);
return len;
}
protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
List<ChunkView> views = new ArrayList<>();
@ -93,11 +136,13 @@ public class SeaweedRead {
if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView(
chunk.fileId,
offset - chunk.start,
Math.min(chunk.stop, stop) - offset,
offset,
isFullChunk
chunk.fileId,
offset - chunk.start,
Math.min(chunk.stop, stop) - offset,
offset,
isFullChunk,
chunk.cipherKey,
chunk.isGzipped
));
offset = Math.min(chunk.stop, stop);
}
@ -127,11 +172,13 @@ public class SeaweedRead {
List<VisibleInterval> newVisibles,
FilerProto.FileChunk chunk) {
VisibleInterval newV = new VisibleInterval(
chunk.getOffset(),
chunk.getOffset() + chunk.getSize(),
chunk.getFileId(),
chunk.getMtime(),
true
chunk.getOffset(),
chunk.getOffset() + chunk.getSize(),
chunk.getFileId(),
chunk.getMtime(),
true,
chunk.getCipherKey().toByteArray(),
chunk.getIsGzipped()
);
// easy cases to speed up
@ -147,21 +194,25 @@ public class SeaweedRead {
for (VisibleInterval v : visibles) {
if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
newVisibles.add(new VisibleInterval(
v.start,
chunk.getOffset(),
v.fileId,
v.modifiedTime,
false
v.start,
chunk.getOffset(),
v.fileId,
v.modifiedTime,
false,
v.cipherKey,
v.isGzipped
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
if (v.start < chunkStop && chunkStop < v.stop) {
newVisibles.add(new VisibleInterval(
chunkStop,
v.stop,
v.fileId,
v.modifiedTime,
false
chunkStop,
v.stop,
v.fileId,
v.modifiedTime,
false,
v.cipherKey,
v.isGzipped
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@ -208,24 +259,30 @@ public class SeaweedRead {
public final long modifiedTime;
public final String fileId;
public final boolean isFullChunk;
public final byte[] cipherKey;
public final boolean isGzipped;
public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) {
public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
this.isGzipped = isGzipped;
}
@Override
public String toString() {
return "VisibleInterval{" +
"start=" + start +
", stop=" + stop +
", modifiedTime=" + modifiedTime +
", fileId='" + fileId + '\'' +
", isFullChunk=" + isFullChunk +
'}';
"start=" + start +
", stop=" + stop +
", modifiedTime=" + modifiedTime +
", fileId='" + fileId + '\'' +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
", isGzipped=" + isGzipped +
'}';
}
}
@ -235,24 +292,30 @@ public class SeaweedRead {
public final long size;
public final long logicOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
public final boolean isGzipped;
public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) {
public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
this.isGzipped = isGzipped;
}
@Override
public String toString() {
return "ChunkView{" +
"fileId='" + fileId + '\'' +
", offset=" + offset +
", size=" + size +
", logicOffset=" + logicOffset +
", isFullChunk=" + isFullChunk +
'}';
"fileId='" + fileId + '\'' +
", offset=" + offset +
", size=" + size +
", logicOffset=" + logicOffset +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
", isGzipped=" + isGzipped +
'}';
}
}

View file

@ -1,5 +1,6 @@
package seaweedfs.client;
import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
@ -11,9 +12,12 @@ import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
public class SeaweedWrite {
private static SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
final FilerGrpcClient filerGrpcClient,
@ -22,10 +26,9 @@ public class SeaweedWrite {
final long bytesOffset, final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection("")
.setReplication(replication)
.setCollection(filerGrpcClient.getCollection())
.setReplication(replication == null ? filerGrpcClient.getReplication() : replication)
.setDataCenter("")
.setReplication("")
.setTtlSec(0)
.build());
String fileId = response.getFileId();
@ -33,7 +36,14 @@ public class SeaweedWrite {
String auth = response.getAuth();
String targetUrl = String.format("http://%s/%s", url, fileId);
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength);
ByteString cipherKeyString = null;
byte[] cipherKey = null;
if (filerGrpcClient.isCipher()) {
cipherKey = genCipherKey();
cipherKeyString = ByteString.copyFrom(cipherKey);
}
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
entry.addChunks(FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
@ -41,6 +51,7 @@ public class SeaweedWrite {
.setSize(bytesLength)
.setMtime(System.currentTimeMillis() / 10000L)
.setETag(etag)
.setCipherKey(cipherKeyString)
);
}
@ -58,11 +69,22 @@ public class SeaweedWrite {
private static String multipartUpload(String targetUrl,
String auth,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException {
HttpClient client = new DefaultHttpClient();
InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
InputStream inputStream = null;
if (cipherKey == null) {
inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
} else {
try {
byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey);
inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length);
} catch (Exception e) {
throw new IOException("fail to encrypt data", e);
}
}
HttpPost post = new HttpPost(targetUrl);
if (auth != null && auth.length() != 0) {
@ -92,4 +114,10 @@ public class SeaweedWrite {
}
}
private static byte[] genCipherKey() {
byte[] b = new byte[32];
random.nextBytes(b);
return b;
}
}

View file

@ -0,0 +1,42 @@
package seaweedfs.client;
import org.junit.Test;
import java.util.Base64;
import static seaweedfs.client.SeaweedCipher.decrypt;
import static seaweedfs.client.SeaweedCipher.encrypt;
public class SeaweedCipherTest {
@Test
public void testSameAsGoImplemnetation() throws Exception {
byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes();
String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
System.out.println("Original Text : " + plainText);
byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
byte[] decryptedText = decrypt(cipherText, secretKey);
System.out.println("DeCrypted Text : " + new String(decryptedText));
}
@Test
public void testEncryptDecrypt() throws Exception {
byte[] secretKey = SeaweedCipher.genCipherKey();
String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
System.out.println("Original Text : " + plainText);
byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
byte[] decryptedText = decrypt(cipherText, secretKey);
System.out.println("DeCrypted Text : " + new String(decryptedText));
}
}

View file

@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.2.4</seaweedfs.client.version>
<seaweedfs.client.version>1.2.5</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.2.4</seaweedfs.client.version>
<seaweedfs.client.version>1.2.5</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>

View file

@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.2.4</seaweedfs.client.version>
<seaweedfs.client.version>1.2.5</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.2.4</seaweedfs.client.version>
<seaweedfs.client.version>1.2.5</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>