hdfs: support read write chunk manifest

This commit is contained in:
Chris Lu 2020-07-20 03:29:17 -07:00
parent 60d14a9800
commit 1d724ab237
8 changed files with 180 additions and 33 deletions

View file

@ -0,0 +1,134 @@
package seaweedfs.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class FileChunkManifest {
private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class);
private static final int mergeFactor = 3;
public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) {
for (FilerProto.FileChunk chunk : chunks) {
if (chunk.getIsChunkManifest()) {
return true;
}
}
return false;
}
public static List<FilerProto.FileChunk> resolveChunkManifest(
final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException {
List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
for (FilerProto.FileChunk chunk : chunks) {
if (!chunk.getIsChunkManifest()) {
dataChunks.add(chunk);
continue;
}
// IsChunkManifest
LOG.debug("fetching chunk manifest:{}", chunk);
byte[] data = fetchChunk(filerGrpcClient, chunk);
FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
for (FilerProto.FileChunk t : m.getChunksList()) {
// avoid deprecated chunk.getFileId()
resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
}
dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks));
}
return dataChunks;
}
private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException {
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
String vid = "" + chunk.getFid().getVolumeId();
lookupRequest.addVolumeIds(vid);
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
.getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
FilerProto.Locations locations = vid2Locations.get(vid);
SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()
0,
-1,
0,
true,
chunk.getCipherKey().toByteArray(),
chunk.getIsCompressed());
byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) {
LOG.debug("doFetchFullChunkData:{}", chunkView);
chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
}
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
return chunkData;
}
public static List<FilerProto.FileChunk> maybeManifestize(
final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks) throws IOException {
// the return variable
List<FilerProto.FileChunk> chunks = new ArrayList<>();
List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
for (FilerProto.FileChunk chunk : inputChunks) {
if (!chunk.getIsChunkManifest()) {
dataChunks.add(chunk);
} else {
chunks.add(chunk);
}
}
int remaining = dataChunks.size();
for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor));
chunks.add(chunk);
remaining -= mergeFactor;
}
// remaining
for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) {
chunks.add(dataChunks.get(i));
}
return chunks;
}
private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks) throws IOException {
// create and serialize the manifest
FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
byte[] data = m.build().toByteArray();
long minOffset = Long.MAX_VALUE;
long maxOffset = -1;
for (FilerProto.FileChunk chunk : dataChunks) {
minOffset = Math.min(minOffset, chunk.getOffset());
maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset());
}
FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
filerGrpcClient.getReplication(),
filerGrpcClient,
minOffset,
data, 0, data.length);
manifestChunk.setIsChunkManifest(true);
manifestChunk.setSize(maxOffset - minOffset);
return manifestChunk.build();
}
}

View file

@ -24,6 +24,10 @@ public class FilerClient {
this.filerGrpcClient = filerGrpcClient;
}
public static String toFileId(FilerProto.FileId fid) {
return String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
}
public boolean mkdirs(String path, int mode) {
String currentUser = System.getProperty("user.name");
return mkdirs(path, mode, 0, 0, currentUser, new String[]{});
@ -209,7 +213,6 @@ public class FilerClient {
}
}
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
@ -279,9 +282,7 @@ public class FilerClient {
entryBuilder.clearChunks();
for (FilerProto.FileChunk chunk : entry.getChunksList()) {
FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
FilerProto.FileId fid = chunk.getFid();
fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
chunkBuilder.setFileId(fileId);
chunkBuilder.setFileId(toFileId(chunk.getFid()));
entryBuilder.addChunks(chunkBuilder);
}
return entryBuilder.build();

View file

@ -2,16 +2,12 @@ package seaweedfs.client;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
@ -77,7 +73,7 @@ public class SeaweedRead {
return len;
}
private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@ -138,7 +134,11 @@ public class SeaweedRead {
return views;
}
public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
public static List<VisibleInterval> nonOverlappingVisibleIntervals(
final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException {
chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList);
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override

View file

@ -1,8 +1,6 @@
package seaweedfs.client;
import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
@ -10,10 +8,10 @@ import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
import java.util.List;
public class SeaweedWrite {
@ -25,6 +23,17 @@ public class SeaweedWrite {
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
synchronized (entry) {
entry.addChunks(writeChunk(replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength));
}
}
public static FilerProto.FileChunk.Builder writeChunk(final String replication,
final FilerGrpcClient filerGrpcClient,
final long offset,
final byte[] bytes,
final long bytesOffset,
final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(filerGrpcClient.getCollection())
@ -46,25 +55,28 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
synchronized (entry) {
entry.addChunks(FilerProto.FileChunk.newBuilder()
// cache fileId ~ bytes
SeaweedRead.chunkCache.setChunk(fileId, bytes);
return FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
.setSize(bytesLength)
.setMtime(System.currentTimeMillis() / 10000L)
.setETag(etag)
.setCipherKey(cipherKeyString)
);
}
// cache fileId ~ bytes
SeaweedRead.chunkCache.setChunk(fileId, bytes);
.setCipherKey(cipherKeyString);
}
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String parentDirectory, final FilerProto.Entry.Builder entry) {
final String parentDirectory,
final FilerProto.Entry.Builder entry) throws IOException {
int chunkSize = entry.getChunksCount();
List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList());
synchronized (entry) {
entry.clearChunks();
entry.addAllChunks(chunks);
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)

View file

@ -3,13 +3,14 @@ package seaweedfs.client;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SeaweedReadTest {
@Test
public void testNonOverlappingVisibleIntervals() {
public void testNonOverlappingVisibleIntervals() throws IOException {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("aaa")
@ -24,7 +25,7 @@ public class SeaweedReadTest {
.setMtime(2000)
.build());
List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
System.out.println("visible:" + visibleInterval);
}

View file

@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
final int bufferSize) {
final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
@ -45,7 +44,7 @@ public class SeaweedInputStream extends FSInputStream {
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);

View file

@ -109,7 +109,7 @@ public class SeaweedOutputStream extends OutputStream {
break;
}
// System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
// System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes;

View file

@ -45,7 +45,7 @@ public class SeaweedInputStream extends FSInputStream {
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);