Java: add SeaweedOutputStream example

This commit is contained in:
Chris Lu 2021-02-04 20:16:08 -08:00
parent 502554887f
commit 7f90d14f10
4 changed files with 110 additions and 6 deletions

View file

@ -28,11 +28,13 @@ public class SeaweedInputStream extends InputStream {
public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient,
final String dir, final String name) throws IOException {
final String fullpath) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.path = dir;
this.path = fullpath;
FilerClient filerClient = new FilerClient(filerGrpcClient);
this.entry = filerClient.lookupEntry(dir, name);
this.entry = filerClient.lookupEntry(
SeaweedOutputStream.getParentDirectory(fullpath),
SeaweedOutputStream.getFileName(fullpath));
this.contentLength = SeaweedRead.fileSize(entry);
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());

View file

@ -14,7 +14,7 @@ import java.util.concurrent.*;
public class SeaweedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
protected final boolean supportFlush = false; // true;
private final FilerGrpcClient filerGrpcClient;
private final String path;
private final int bufferSize;
@ -22,7 +22,6 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private final FilerProto.Entry.Builder entry;
protected final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
@ -32,6 +31,45 @@ public class SeaweedOutputStream extends OutputStream {
private ByteBuffer buffer;
private long outputIndex;
private String replication = "000";
private boolean shouldSaveMetadata = false;
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) {
this.filerGrpcClient = filerGrpcClient;
this.path = fullpath;
this.position = 0;
this.closed = false;
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = 8 * 1024 * 1024;
this.buffer = ByteBufferPool.request(bufferSize);
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
120L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
long now = System.currentTimeMillis() / 1000L;
this.entry = FilerProto.Entry.newBuilder()
.setName(getFileName(path))
.setIsDirectory(false)
.setAttributes(FilerProto.FuseAttributes.newBuilder()
.setFileMode(0755)
.setReplication(replication)
.setCrtime(now)
.setMtime(now)
.clearGroupName()
);
this.shouldSaveMetadata = true;
}
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
@ -66,9 +104,20 @@ public class SeaweedOutputStream extends OutputStream {
return path;
}
int lastSlashIndex = path.lastIndexOf("/");
if (lastSlashIndex == 0) {
return "/";
}
return path.substring(0, lastSlashIndex);
}
public static String getFileName(String path) {
if (path.indexOf("/") < 0) {
return path;
}
int lastSlashIndex = path.lastIndexOf("/");
return path.substring(lastSlashIndex+1);
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
@ -163,6 +212,11 @@ public class SeaweedOutputStream extends OutputStream {
threadExecutor.shutdownNow();
}
}
if (shouldSaveMetadata) {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
}
}
private synchronized void writeCurrentBufferToService() throws IOException {

View file

@ -23,7 +23,7 @@ public class UnzipFile {
long localProcessTime = startTime2 - startTime;
SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
filerGrpcClient, "/", "test.zip");
filerGrpcClient, "/test.zip");
parseZip(seaweedInputStream);
long swProcessTime = System.currentTimeMillis() - startTime2;

View file

@ -0,0 +1,48 @@
package com.seaweedfs.examples;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.SeaweedInputStream;
import seaweedfs.client.SeaweedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class WriteFile {
public static void main(String[] args) throws IOException {
FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
filerGrpcClient, "/test.zip");
unZipFiles(filerGrpcClient, seaweedInputStream);
}
public static void unZipFiles(FilerGrpcClient filerGrpcClient, InputStream is) throws IOException {
ZipInputStream zin = new ZipInputStream(is);
ZipEntry ze;
while ((ze = zin.getNextEntry()) != null) {
String filename = ze.getName();
if (filename.indexOf("/") >= 0) {
filename = filename.substring(filename.lastIndexOf("/") + 1);
}
if (filename.length()==0) {
continue;
}
SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerGrpcClient, "/test/"+filename);
byte[] bytesIn = new byte[16 * 1024];
int read = 0;
while ((read = zin.read(bytesIn))!=-1) {
seaweedOutputStream.write(bytesIn,0,read);
}
seaweedOutputStream.close();
System.out.println(ze.getName());
}
}
}