SeaweedFileSystem add SeaweedOutputStream to write

This commit is contained in:
Chris Lu 2018-11-25 23:49:05 -08:00
parent f970abf14a
commit 11eb014311
5 changed files with 152 additions and 23 deletions

View file

@ -40,6 +40,11 @@
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.2</version>
</dependency>
</dependencies>
</project>

View file

@ -12,6 +12,7 @@ import org.apache.hadoop.util.Progressable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
@ -55,16 +56,31 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
}
public FSDataInputStream open(Path path, int i) throws IOException {
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
return null;
}
public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, Progressable progressable) throws IOException {
return null;
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize, final Progressable progress) throws IOException {
path = qualify(path);
try {
String replicaPlacement = String.format("%03d", replication - 1);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
return null;
}
}
public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
return null;
public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
path = qualify(path);
try {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
return null;
}
}
public boolean rename(Path src, Path dst) throws IOException {
@ -73,7 +89,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
if (parentFolder == null) {
return false;
}
if (src.equals(dst)){
if (src.equals(dst)) {
return true;
}
FileStatus dstFileStatus = getFileStatus(dst);

View file

@ -9,6 +9,8 @@ import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -155,4 +157,43 @@ public class SeaweedFileSystemStore {
return true;
}
public OutputStream createFile(final Path path,
final boolean overwrite,
FsPermission permission,
String replication) throws IOException {
permission = permission == null ? FsPermission.getFileDefault() : permission;
LOG.debug("createFile path: {} overwrite: {} permission: {}",
path,
overwrite,
permission.toString());
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
FilerProto.Entry.Builder entry = FilerProto.Entry.newBuilder();
long writePosition = 0;
if (!overwrite) {
FilerProto.Entry existingEntry = lookupEntry(path);
if (existingEntry != null) {
entry.mergeFrom(existingEntry);
}
writePosition = existingEntry.getAttributes().getFileSize();
replication = existingEntry.getAttributes().getReplication();
}
if (entry == null) {
entry = FilerProto.Entry.newBuilder()
.setAttributes(FilerProto.FuseAttributes.newBuilder()
.setFileMode(permission.toOctal())
.setReplication(replication)
.setCrtime(System.currentTimeMillis() / 1000L)
.setUserName(userGroupInformation.getUserName())
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
}
return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, 16 * 1024 * 1024, replication);
}
}

View file

@ -4,6 +4,7 @@ package seaweed.hdfs;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import seaweedfs.client.FilerGrpcClient;
@ -12,8 +13,6 @@ import seaweedfs.client.FilerProto;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -26,12 +25,12 @@ import java.util.concurrent.TimeUnit;
public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
private final FilerGrpcClient filerGrpcClient;
private final List<FilerProto.Entry> entries = new ArrayList<>();
private final String path;
private final Path path;
private final int bufferSize;
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private FilerProto.Entry.Builder entry;
private long position;
private boolean closed;
private boolean supportFlush = true;
@ -41,12 +40,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
private byte[] buffer;
private int bufferIndex;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient,
final String path,
final long position,
final int bufferSize) {
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
this.filerGrpcClient = filerGrpcClient;
this.replication = replication;
this.path = path;
this.position = position;
this.closed = false;
@ -67,11 +66,14 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
SeaweedWrite.writeMeta(filerGrpcClient, path, entries);
SeaweedWrite.writeMeta(filerGrpcClient, path, entry);
} catch (Exception ex) {
throw new IOException(ex);
}
@ -221,8 +223,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
@Override
public Void call() throws Exception {
// originally: client.append(path, offset, bytes, 0, bytesLength);
FilerProto.Entry entry = SeaweedWrite.writeData(filerGrpcClient, offset, bytes, 0, bytesLength);
entries.add(entry);
SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
return null;
}
});

View file

@ -1,18 +1,84 @@
package seaweed.hdfs;
import org.apache.hadoop.fs.Path;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import java.util.List;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
public class SeaweedWrite {
public static FilerProto.Entry writeData(final FilerGrpcClient filerGrpcClient, final long offset,
final byte[] bytes, final long bytesOffset, final long bytesLength) {
return null;
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
final FilerGrpcClient filerGrpcClient,
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection("")
.setReplication(replication)
.setDataCenter("")
.setReplication("")
.setTtlSec(0)
.build());
String fileId = response.getFileId();
String url = response.getUrl();
String targetUrl = String.format("http://%s/%s", url, fileId);
String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength);
entry.addChunks(FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
.setSize(bytesLength)
.setMtime(System.currentTimeMillis() / 10000L)
.setETag(etag)
);
}
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String path, final List<FilerProto.Entry> entries) {
return;
final Path path, final FilerProto.Entry.Builder entry) {
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(path.getParent().toUri().getPath())
.setEntry(entry)
.build()
);
}
private static String multipartUpload(String targetUrl,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
HttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build();
InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
HttpPost post = new HttpPost(targetUrl);
post.setEntity(MultipartEntityBuilder.create()
.setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
.addBinaryBody("upload", inputStream)
.build());
HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue();
if (etag != null && etag.startsWith("\"") && etag.endsWith("\"")) {
etag = etag.substring(1, etag.length() - 1);
}
return etag;
}
}