refactoring: only expose FilerClient class

This commit is contained in:
Chris Lu 2021-02-08 02:28:45 -08:00
parent a833021132
commit ad36c7b0d7
14 changed files with 94 additions and 106 deletions

View file

@ -23,7 +23,7 @@ public class FileChunkManifest {
} }
public static List<FilerProto.FileChunk> resolveChunkManifest( public static List<FilerProto.FileChunk> resolveChunkManifest(
final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException { final FilerClient filerClient, List<FilerProto.FileChunk> chunks) throws IOException {
List<FilerProto.FileChunk> dataChunks = new ArrayList<>(); List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
@ -35,30 +35,30 @@ public class FileChunkManifest {
// IsChunkManifest // IsChunkManifest
LOG.debug("fetching chunk manifest:{}", chunk); LOG.debug("fetching chunk manifest:{}", chunk);
byte[] data = fetchChunk(filerGrpcClient, chunk); byte[] data = fetchChunk(filerClient, chunk);
FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>(); List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
for (FilerProto.FileChunk t : m.getChunksList()) { for (FilerProto.FileChunk t : m.getChunksList()) {
// avoid deprecated chunk.getFileId() // avoid deprecated chunk.getFileId()
resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
} }
dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks)); dataChunks.addAll(resolveChunkManifest(filerClient, resolvedChunks));
} }
return dataChunks; return dataChunks;
} }
private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { private static byte[] fetchChunk(final FilerClient filerClient, FilerProto.FileChunk chunk) throws IOException {
String vid = "" + chunk.getFid().getVolumeId(); String vid = "" + chunk.getFid().getVolumeId();
FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid); FilerProto.Locations locations = filerClient.vidLocations.get(vid);
if (locations == null) { if (locations == null) {
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
lookupRequest.addVolumeIds(vid); lookupRequest.addVolumeIds(vid);
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient FilerProto.LookupVolumeResponse lookupResponse = filerClient
.getBlockingStub().lookupVolume(lookupRequest.build()); .getBlockingStub().lookupVolume(lookupRequest.build());
locations = lookupResponse.getLocationsMapMap().get(vid); locations = lookupResponse.getLocationsMapMap().get(vid);
filerGrpcClient.vidLocations.put(vid, locations); filerClient.vidLocations.put(vid, locations);
LOG.debug("fetchChunk vid:{} locations:{}", vid, locations); LOG.debug("fetchChunk vid:{} locations:{}", vid, locations);
} }
@ -74,7 +74,7 @@ public class FileChunkManifest {
byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) { if (chunkData == null) {
LOG.debug("doFetchFullChunkData:{}", chunkView); LOG.debug("doFetchFullChunkData:{}", chunkView);
chunkData = SeaweedRead.doFetchFullChunkData(filerGrpcClient, chunkView, locations); chunkData = SeaweedRead.doFetchFullChunkData(filerClient, chunkView, locations);
} }
if (chunk.getIsChunkManifest()){ if (chunk.getIsChunkManifest()){
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
@ -86,7 +86,7 @@ public class FileChunkManifest {
} }
public static List<FilerProto.FileChunk> maybeManifestize( public static List<FilerProto.FileChunk> maybeManifestize(
final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException { final FilerClient filerClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException {
// the return variable // the return variable
List<FilerProto.FileChunk> chunks = new ArrayList<>(); List<FilerProto.FileChunk> chunks = new ArrayList<>();
@ -101,7 +101,7 @@ public class FileChunkManifest {
int remaining = dataChunks.size(); int remaining = dataChunks.size();
for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); FilerProto.FileChunk chunk = mergeIntoManifest(filerClient, dataChunks.subList(i, i + mergeFactor), parentDirectory);
chunks.add(chunk); chunks.add(chunk);
remaining -= mergeFactor; remaining -= mergeFactor;
} }
@ -113,7 +113,7 @@ public class FileChunkManifest {
return chunks; return chunks;
} }
private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException { private static FilerProto.FileChunk mergeIntoManifest(final FilerClient filerClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException {
// create and serialize the manifest // create and serialize the manifest
dataChunks = FilerClient.beforeEntrySerialization(dataChunks); dataChunks = FilerClient.beforeEntrySerialization(dataChunks);
FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
@ -127,8 +127,8 @@ public class FileChunkManifest {
} }
FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
filerGrpcClient.getReplication(), filerClient.getReplication(),
filerGrpcClient, filerClient,
minOffset, minOffset,
data, 0, data.length, parentDirectory); data, 0, data.length, parentDirectory);
manifestChunk.setIsChunkManifest(true); manifestChunk.setIsChunkManifest(true);

View file

@ -11,18 +11,12 @@ import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
public class FilerClient { public class FilerClient extends FilerGrpcClient {
private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class);
private final FilerGrpcClient filerGrpcClient;
public FilerClient(String host, int grpcPort) { public FilerClient(String host, int grpcPort) {
filerGrpcClient = new FilerGrpcClient(host, grpcPort); super(host, grpcPort);
}
public FilerClient(FilerGrpcClient filerGrpcClient) {
this.filerGrpcClient = filerGrpcClient;
} }
public static String toFileId(FilerProto.FileId fid) { public static String toFileId(FilerProto.FileId fid) {
@ -236,7 +230,7 @@ public class FilerClient {
} }
public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) { public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() Iterator<FilerProto.ListEntriesResponse> iter = this.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path) .setDirectory(path)
.setPrefix(entryPrefix) .setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName) .setStartFromFileName(lastEntryName)
@ -253,7 +247,7 @@ public class FilerClient {
public FilerProto.Entry lookupEntry(String directory, String entryName) { public FilerProto.Entry lookupEntry(String directory, String entryName) {
try { try {
FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry( FilerProto.Entry entry = this.getBlockingStub().lookupDirectoryEntry(
FilerProto.LookupDirectoryEntryRequest.newBuilder() FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(directory) .setDirectory(directory)
.setName(entryName) .setName(entryName)
@ -274,7 +268,7 @@ public class FilerClient {
public boolean createEntry(String parent, FilerProto.Entry entry) { public boolean createEntry(String parent, FilerProto.Entry entry) {
try { try {
FilerProto.CreateEntryResponse createEntryResponse = FilerProto.CreateEntryResponse createEntryResponse =
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() this.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parent) .setDirectory(parent)
.setEntry(entry) .setEntry(entry)
.build()); .build());
@ -291,7 +285,7 @@ public class FilerClient {
public boolean updateEntry(String parent, FilerProto.Entry entry) { public boolean updateEntry(String parent, FilerProto.Entry entry) {
try { try {
filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() this.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
.setDirectory(parent) .setDirectory(parent)
.setEntry(entry) .setEntry(entry)
.build()); .build());
@ -304,7 +298,7 @@ public class FilerClient {
public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) { public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) {
try { try {
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() this.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
.setDirectory(parent) .setDirectory(parent)
.setName(entryName) .setName(entryName)
.setIsDeleteData(isDeleteFileChunk) .setIsDeleteData(isDeleteFileChunk)
@ -320,7 +314,7 @@ public class FilerClient {
public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) { public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) {
try { try {
filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() this.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
.setOldDirectory(oldParent) .setOldDirectory(oldParent)
.setOldName(oldName) .setOldName(oldName)
.setNewDirectory(newParent) .setNewDirectory(newParent)
@ -334,7 +328,7 @@ public class FilerClient {
} }
public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) { public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) {
return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() return this.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder()
.setPathPrefix(prefix) .setPathPrefix(prefix)
.setClientName(clientName) .setClientName(clientName)
.setSinceNs(sinceNs) .setSinceNs(sinceNs)

View file

@ -16,7 +16,7 @@ public class SeaweedInputStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!"); private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!");
private final FilerGrpcClient filerGrpcClient; private final FilerClient filerClient;
private final String path; private final String path;
private final FilerProto.Entry entry; private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
@ -27,32 +27,31 @@ public class SeaweedInputStream extends InputStream {
private boolean closed = false; private boolean closed = false;
public SeaweedInputStream( public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient, final FilerClient filerClient,
final String fullpath) throws IOException { final String fullpath) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.path = fullpath; this.path = fullpath;
FilerClient filerClient = new FilerClient(filerGrpcClient); this.filerClient = filerClient;
this.entry = filerClient.lookupEntry( this.entry = filerClient.lookupEntry(
SeaweedOutputStream.getParentDirectory(fullpath), SeaweedOutputStream.getParentDirectory(fullpath),
SeaweedOutputStream.getFileName(fullpath)); SeaweedOutputStream.getFileName(fullpath));
this.contentLength = SeaweedRead.fileSize(entry); this.contentLength = SeaweedRead.fileSize(entry);
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
} }
public SeaweedInputStream( public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient, final FilerClient filerClient,
final String path, final String path,
final FilerProto.Entry entry) throws IOException { final FilerProto.Entry entry) throws IOException {
this.filerGrpcClient = filerGrpcClient; this.filerClient = filerClient;
this.path = path; this.path = path;
this.entry = entry; this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry); this.contentLength = SeaweedRead.fileSize(entry);
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@ -110,7 +109,7 @@ public class SeaweedInputStream extends InputStream {
if (start+len <= entry.getContent().size()) { if (start+len <= entry.getContent().size()) {
entry.getContent().substring(start, start+len).copyTo(buf); entry.getContent().substring(start, start+len).copyTo(buf);
} else { } else {
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
} }
if (bytesRead > Integer.MAX_VALUE) { if (bytesRead > Integer.MAX_VALUE) {

View file

@ -15,7 +15,7 @@ public class SeaweedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
protected final boolean supportFlush = true; protected final boolean supportFlush = true;
private final FilerGrpcClient filerGrpcClient; private final FilerClient filerClient;
private final String path; private final String path;
private final int bufferSize; private final int bufferSize;
private final int maxConcurrentRequestCount; private final int maxConcurrentRequestCount;
@ -33,17 +33,17 @@ public class SeaweedOutputStream extends OutputStream {
private long outputIndex; private long outputIndex;
private String replication = "000"; private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { public SeaweedOutputStream(FilerClient filerClient, final String fullpath) {
this(filerGrpcClient, fullpath, "000"); this(filerClient, fullpath, "000");
} }
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) { public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) {
this(filerGrpcClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000");
} }
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) { final long position, final int bufferSize, final String replication) {
this.filerGrpcClient = filerGrpcClient; this.filerClient = filerClient;
this.replication = replication; this.replication = replication;
this.path = path; this.path = path;
this.position = position; this.position = position;
@ -109,7 +109,7 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try { try {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} catch (Exception ex) { } catch (Exception ex) {
throw new IOException(ex); throw new IOException(ex);
} }
@ -225,7 +225,7 @@ public class SeaweedOutputStream extends OutputStream {
} }
final Future<Void> job = completionService.submit(() -> { final Future<Void> job = completionService.submit(() -> {
// System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
// System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
ByteBufferPool.release(bufferToWrite); ByteBufferPool.release(bufferToWrite);
return null; return null;

View file

@ -23,7 +23,7 @@ public class SeaweedRead {
static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024); static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024);
// returns bytesRead // returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals,
final long position, final ByteBuffer buf, final long fileSize) throws IOException { final long position, final ByteBuffer buf, final long fileSize) throws IOException {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
@ -42,7 +42,7 @@ public class SeaweedRead {
} }
if (lookupRequest.getVolumeIdsCount() > 0) { if (lookupRequest.getVolumeIdsCount() > 0) {
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient FilerProto.LookupVolumeResponse lookupResponse = filerClient
.getBlockingStub().lookupVolume(lookupRequest.build()); .getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) { for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) {
@ -71,7 +71,7 @@ public class SeaweedRead {
return 0; return 0;
} }
int len = readChunkView(filerGrpcClient, startOffset, buf, chunkView, locations); int len = readChunkView(filerClient, startOffset, buf, chunkView, locations);
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
@ -93,12 +93,12 @@ public class SeaweedRead {
return readCount; return readCount;
} }
private static int readChunkView(FilerGrpcClient filerGrpcClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId); byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) { if (chunkData == null) {
chunkData = doFetchFullChunkData(filerGrpcClient, chunkView, locations); chunkData = doFetchFullChunkData(filerClient, chunkView, locations);
chunkCache.setChunk(chunkView.fileId, chunkData); chunkCache.setChunk(chunkView.fileId, chunkData);
} }
@ -110,13 +110,13 @@ public class SeaweedRead {
return len; return len;
} }
public static byte[] doFetchFullChunkData(FilerGrpcClient filerGrpcClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] data = null; byte[] data = null;
IOException lastException = null; IOException lastException = null;
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
for (FilerProto.Location location : locations.getLocationsList()) { for (FilerProto.Location location : locations.getLocationsList()) {
String url = filerGrpcClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl()); String url = filerClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl());
try { try {
data = doFetchOneFullChunkData(chunkView, url); data = doFetchOneFullChunkData(chunkView, url);
lastException = null; lastException = null;
@ -221,9 +221,9 @@ public class SeaweedRead {
} }
public static List<VisibleInterval> nonOverlappingVisibleIntervals( public static List<VisibleInterval> nonOverlappingVisibleIntervals(
final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException { final FilerClient filerClient, List<FilerProto.FileChunk> chunkList) throws IOException {
chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList); chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList);
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {

View file

@ -23,29 +23,29 @@ public class SeaweedWrite {
public static void writeData(FilerProto.Entry.Builder entry, public static void writeData(FilerProto.Entry.Builder entry,
final String replication, final String replication,
final FilerGrpcClient filerGrpcClient, final FilerClient filerClient,
final long offset, final long offset,
final byte[] bytes, final byte[] bytes,
final long bytesOffset, final long bytesLength, final long bytesOffset, final long bytesLength,
final String path) throws IOException { final String path) throws IOException {
FilerProto.FileChunk.Builder chunkBuilder = writeChunk( FilerProto.FileChunk.Builder chunkBuilder = writeChunk(
replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength, path); replication, filerClient, offset, bytes, bytesOffset, bytesLength, path);
synchronized (entry) { synchronized (entry) {
entry.addChunks(chunkBuilder); entry.addChunks(chunkBuilder);
} }
} }
public static FilerProto.FileChunk.Builder writeChunk(final String replication, public static FilerProto.FileChunk.Builder writeChunk(final String replication,
final FilerGrpcClient filerGrpcClient, final FilerClient filerClient,
final long offset, final long offset,
final byte[] bytes, final byte[] bytes,
final long bytesOffset, final long bytesOffset,
final long bytesLength, final long bytesLength,
final String path) throws IOException { final String path) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder() FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(filerGrpcClient.getCollection()) .setCollection(filerClient.getCollection())
.setReplication(replication == null ? filerGrpcClient.getReplication() : replication) .setReplication(replication == null ? filerClient.getReplication() : replication)
.setDataCenter("") .setDataCenter("")
.setTtlSec(0) .setTtlSec(0)
.setPath(path) .setPath(path)
@ -53,11 +53,11 @@ public class SeaweedWrite {
String fileId = response.getFileId(); String fileId = response.getFileId();
String auth = response.getAuth(); String auth = response.getAuth();
String targetUrl = filerGrpcClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl());
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
byte[] cipherKey = null; byte[] cipherKey = null;
if (filerGrpcClient.isCipher()) { if (filerClient.isCipher()) {
cipherKey = genCipherKey(); cipherKey = genCipherKey();
cipherKeyString = ByteString.copyFrom(cipherKey); cipherKeyString = ByteString.copyFrom(cipherKey);
} }
@ -75,15 +75,15 @@ public class SeaweedWrite {
.setCipherKey(cipherKeyString); .setCipherKey(cipherKeyString);
} }
public static void writeMeta(final FilerGrpcClient filerGrpcClient, public static void writeMeta(final FilerClient filerClient,
final String parentDirectory, final String parentDirectory,
final FilerProto.Entry.Builder entry) throws IOException { final FilerProto.Entry.Builder entry) throws IOException {
synchronized (entry) { synchronized (entry) {
List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList(), parentDirectory); List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory);
entry.clearChunks(); entry.clearChunks();
entry.addAllChunks(chunks); entry.addAllChunks(chunks);
filerGrpcClient.getBlockingStub().createEntry( filerClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder() FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory) .setDirectory(parentDirectory)
.setEntry(entry) .setEntry(entry)

View file

@ -1,6 +1,6 @@
package com.seaweedfs.examples; package com.seaweedfs.examples;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerClient;
import seaweedfs.client.SeaweedInputStream; import seaweedfs.client.SeaweedInputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -13,7 +13,7 @@ public class ExampleReadFile {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); FilerClient filerClient = new FilerClient("localhost", 18888);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
parseZip("/Users/chris/tmp/test.zip"); parseZip("/Users/chris/tmp/test.zip");
@ -23,7 +23,7 @@ public class ExampleReadFile {
long localProcessTime = startTime2 - startTime; long localProcessTime = startTime2 - startTime;
SeaweedInputStream seaweedInputStream = new SeaweedInputStream( SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
filerGrpcClient, "/test.zip"); filerClient, "/test.zip");
parseZip(seaweedInputStream); parseZip(seaweedInputStream);
long swProcessTime = System.currentTimeMillis() - startTime2; long swProcessTime = System.currentTimeMillis() - startTime2;

View file

@ -1,6 +1,6 @@
package com.seaweedfs.examples; package com.seaweedfs.examples;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerClient;
import seaweedfs.client.SeaweedInputStream; import seaweedfs.client.SeaweedInputStream;
import seaweedfs.client.SeaweedOutputStream; import seaweedfs.client.SeaweedOutputStream;
@ -13,15 +13,14 @@ public class ExampleWriteFile {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); FilerClient filerClient = new FilerClient("localhost", 18888);
SeaweedInputStream seaweedInputStream = new SeaweedInputStream( SeaweedInputStream seaweedInputStream = new SeaweedInputStream(filerClient, "/test.zip");
filerGrpcClient, "/test.zip"); unZipFiles(filerClient, seaweedInputStream);
unZipFiles(filerGrpcClient, seaweedInputStream);
} }
public static void unZipFiles(FilerGrpcClient filerGrpcClient, InputStream is) throws IOException { public static void unZipFiles(FilerClient filerClient, InputStream is) throws IOException {
ZipInputStream zin = new ZipInputStream(is); ZipInputStream zin = new ZipInputStream(is);
ZipEntry ze; ZipEntry ze;
while ((ze = zin.getNextEntry()) != null) { while ((ze = zin.getNextEntry()) != null) {
@ -34,7 +33,7 @@ public class ExampleWriteFile {
continue; continue;
} }
SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerGrpcClient, "/test/"+filename); SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerClient, "/test/"+filename);
byte[] bytesIn = new byte[16 * 1024]; byte[] bytesIn = new byte[16 * 1024];
int read = 0; int read = 0;
while ((read = zin.read(bytesIn))!=-1) { while ((read = zin.read(bytesIn))!=-1) {

View file

@ -24,27 +24,25 @@ public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
private FilerGrpcClient filerGrpcClient;
private FilerClient filerClient; private FilerClient filerClient;
private Configuration conf; private Configuration conf;
public SeaweedFileSystemStore(String host, int port, Configuration conf) { public SeaweedFileSystemStore(String host, int port, Configuration conf) {
int grpcPort = 10000 + port; int grpcPort = 10000 + port;
filerGrpcClient = new FilerGrpcClient(host, grpcPort); filerClient = new FilerClient(host, grpcPort);
filerClient = new FilerClient(filerGrpcClient);
this.conf = conf; this.conf = conf;
String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
if (volumeServerAccessMode.equals("publicUrl")) { if (volumeServerAccessMode.equals("publicUrl")) {
filerGrpcClient.setAccessVolumeServerByPublicUrl(); filerClient.setAccessVolumeServerByPublicUrl();
} else if (volumeServerAccessMode.equals("filerProxy")) { } else if (volumeServerAccessMode.equals("filerProxy")) {
filerGrpcClient.setAccessVolumeServerByFilerProxy(); filerClient.setAccessVolumeServerByFilerProxy();
} }
} }
public void close() { public void close() {
try { try {
this.filerGrpcClient.shutdown(); this.filerClient.shutdown();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -219,10 +217,10 @@ public class SeaweedFileSystemStore {
.clearGroupName() .clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
); );
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} }
return new SeaweedHadoopOutputStream(filerGrpcClient, path.toString(), entry, writePosition, bufferSize, replication); return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
} }
@ -236,7 +234,7 @@ public class SeaweedFileSystemStore {
throw new FileNotFoundException("read non-exist file " + path); throw new FileNotFoundException("read non-exist file " + path);
} }
return new SeaweedHadoopInputStream(filerGrpcClient, return new SeaweedHadoopInputStream(filerClient,
statistics, statistics,
path.toUri().getPath(), path.toUri().getPath(),
entry); entry);

View file

@ -5,7 +5,7 @@ package seaweed.hdfs;
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileSystem.Statistics;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedInputStream; import seaweedfs.client.SeaweedInputStream;
@ -19,11 +19,11 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
private final Statistics statistics; private final Statistics statistics;
public SeaweedHadoopInputStream( public SeaweedHadoopInputStream(
final FilerGrpcClient filerGrpcClient, final FilerClient filerClient,
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final FilerProto.Entry entry) throws IOException { final FilerProto.Entry entry) throws IOException {
this.seaweedInputStream = new SeaweedInputStream(filerGrpcClient, path, entry); this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
this.statistics = statistics; this.statistics = statistics;
} }

View file

@ -2,15 +2,15 @@ package seaweed.hdfs;
// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream // adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedOutputStream; import seaweedfs.client.SeaweedOutputStream;
public class SeaweedHadoopOutputStream extends SeaweedOutputStream { public class SeaweedHadoopOutputStream extends SeaweedOutputStream {
public SeaweedHadoopOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) { final long position, final int bufferSize, final String replication) {
super(filerGrpcClient, path.toString(), entry, position, bufferSize, replication); super(filerClient, path, entry, position, bufferSize, replication);
} }
} }

View file

@ -24,27 +24,25 @@ public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
private FilerGrpcClient filerGrpcClient;
private FilerClient filerClient; private FilerClient filerClient;
private Configuration conf; private Configuration conf;
public SeaweedFileSystemStore(String host, int port, Configuration conf) { public SeaweedFileSystemStore(String host, int port, Configuration conf) {
int grpcPort = 10000 + port; int grpcPort = 10000 + port;
filerGrpcClient = new FilerGrpcClient(host, grpcPort); filerClient = new FilerClient(host, grpcPort);
filerClient = new FilerClient(filerGrpcClient);
this.conf = conf; this.conf = conf;
String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
if (volumeServerAccessMode.equals("publicUrl")) { if (volumeServerAccessMode.equals("publicUrl")) {
filerGrpcClient.setAccessVolumeServerByPublicUrl(); filerClient.setAccessVolumeServerByPublicUrl();
} else if (volumeServerAccessMode.equals("filerProxy")) { } else if (volumeServerAccessMode.equals("filerProxy")) {
filerGrpcClient.setAccessVolumeServerByFilerProxy(); filerClient.setAccessVolumeServerByFilerProxy();
} }
} }
public void close() { public void close() {
try { try {
this.filerGrpcClient.shutdown(); this.filerClient.shutdown();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -219,10 +217,10 @@ public class SeaweedFileSystemStore {
.clearGroupName() .clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
); );
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} }
return new SeaweedHadoopOutputStream(filerGrpcClient, path.toString(), entry, writePosition, bufferSize, replication); return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
} }
@ -236,7 +234,7 @@ public class SeaweedFileSystemStore {
throw new FileNotFoundException("read non-exist file " + path); throw new FileNotFoundException("read non-exist file " + path);
} }
return new SeaweedHadoopInputStream(filerGrpcClient, return new SeaweedHadoopInputStream(filerClient,
statistics, statistics,
path.toUri().getPath(), path.toUri().getPath(),
entry); entry);

View file

@ -5,7 +5,7 @@ package seaweed.hdfs;
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileSystem.Statistics;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedInputStream; import seaweedfs.client.SeaweedInputStream;
@ -19,11 +19,11 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
private final Statistics statistics; private final Statistics statistics;
public SeaweedHadoopInputStream( public SeaweedHadoopInputStream(
final FilerGrpcClient filerGrpcClient, final FilerClient filerClient,
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final FilerProto.Entry entry) throws IOException { final FilerProto.Entry entry) throws IOException {
this.seaweedInputStream = new SeaweedInputStream(filerGrpcClient, path, entry); this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
this.statistics = statistics; this.statistics = statistics;
} }

View file

@ -4,7 +4,7 @@ package seaweed.hdfs;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedOutputStream; import seaweedfs.client.SeaweedOutputStream;
@ -13,9 +13,9 @@ import java.util.Locale;
public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities { public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities {
public SeaweedHadoopOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) { final long position, final int bufferSize, final String replication) {
super(filerGrpcClient, path, entry, position, bufferSize, replication); super(filerClient, path, entry, position, bufferSize, replication);
} }
/** /**