HCFS: read concatenated files

This commit is contained in:
Chris Lu 2018-12-03 22:12:20 -08:00
parent 4119c61df8
commit c85ee7c0fd
4 changed files with 116 additions and 29 deletions

View file

@ -225,10 +225,13 @@ public class SeaweedFileSystemStore {
long writePosition = 0; long writePosition = 0;
if (!overwrite) { if (!overwrite) {
FilerProto.Entry existingEntry = lookupEntry(path); FilerProto.Entry existingEntry = lookupEntry(path);
LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry);
if (existingEntry != null) { if (existingEntry != null) {
entry = FilerProto.Entry.newBuilder();
entry.mergeFrom(existingEntry); entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now); entry.getAttributesBuilder().setMtime(now);
} }
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
replication = existingEntry.getAttributes().getReplication(); replication = existingEntry.getAttributes().getReplication();
} }

View file

@ -55,6 +55,9 @@ public class SeaweedInputStream extends FSInputStream {
this.readAheadEnabled = true; this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
} }
public String getPath() { public String getPath() {

View file

@ -41,6 +41,7 @@ public class SeaweedRead {
//TODO parallel this //TODO parallel this
long readCount = 0; long readCount = 0;
int startOffset = bufferOffset;
for (ChunkView chunkView : chunkViews) { for (ChunkView chunkView : chunkViews) {
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
if (locations.getLocationsCount() == 0) { if (locations.getLocationsCount() == 0) {
@ -59,10 +60,11 @@ public class SeaweedRead {
HttpEntity entity = response.getEntity(); HttpEntity entity = response.getEntity();
int len = (int) (chunkView.logicOffset - position + chunkView.size); int len = (int) (chunkView.logicOffset - position + chunkView.size);
entity.getContent().read(buffer, bufferOffset, len); int chunReadCount = entity.getContent().read(buffer, startOffset, len);
LOG.debug("* read chunkView:{} length:{} position:{} bufferLength:{}", chunkView, len, position, bufferLength); LOG.debug("* read chunkView:{} startOffset:{} length:{} chunReadCount:{}", chunkView, startOffset, len, chunReadCount);
readCount += len; readCount += len;
startOffset += len;
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
@ -72,17 +74,20 @@ public class SeaweedRead {
return readCount; return readCount;
} }
private static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) { public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
List<ChunkView> views = new ArrayList<>(); List<ChunkView> views = new ArrayList<>();
long stop = offset + size; long stop = offset + size;
for (VisibleInterval chunk : visibleIntervals) { for (VisibleInterval chunk : visibleIntervals) {
if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
views.add(new ChunkView( views.add(new ChunkView(
chunk.fileId, chunk.fileId,
offset - chunk.start, offset - chunk.start,
Math.min(chunk.stop, stop) - offset, Math.min(chunk.stop, stop) - offset,
offset offset
)); ));
offset = Math.min(chunk.stop, stop);
}
} }
return views; return views;
} }
@ -96,20 +101,10 @@ public class SeaweedRead {
} }
}); });
List<VisibleInterval> newVisibles = new ArrayList<>();
List<VisibleInterval> visibles = new ArrayList<>(); List<VisibleInterval> visibles = new ArrayList<>();
for (FilerProto.FileChunk chunk : chunks) { for (FilerProto.FileChunk chunk : chunks) {
List<VisibleInterval> t = newVisibles; List<VisibleInterval> newVisibles = new ArrayList<>();
newVisibles = mergeIntoVisibles(visibles, newVisibles, chunk); visibles = mergeIntoVisibles(visibles, newVisibles, chunk);
if (t != newVisibles) {
// visibles are changed in place
} else {
// newVisibles are modified
visibles.clear();
t = visibles;
visibles = newVisibles;
newVisibles = t;
}
} }
return visibles; return visibles;
@ -192,10 +187,10 @@ public class SeaweedRead {
} }
public static class VisibleInterval { public static class VisibleInterval {
long start; public final long start;
long stop; public final long stop;
long modifiedTime; public final long modifiedTime;
String fileId; public final String fileId;
public VisibleInterval(long start, long stop, String fileId, long modifiedTime) { public VisibleInterval(long start, long stop, String fileId, long modifiedTime) {
this.start = start; this.start = start;
@ -203,13 +198,23 @@ public class SeaweedRead {
this.modifiedTime = modifiedTime; this.modifiedTime = modifiedTime;
this.fileId = fileId; this.fileId = fileId;
} }
@Override
public String toString() {
return "VisibleIntervalq{" +
"start=" + start +
", stop=" + stop +
", modifiedTime=" + modifiedTime +
", fileId='" + fileId + '\'' +
'}';
}
} }
public static class ChunkView { public static class ChunkView {
String fileId; public final String fileId;
long offset; public final long offset;
long size; public final long size;
long logicOffset; public final long logicOffset;
public ChunkView(String fileId, long offset, long size, long logicOffset) { public ChunkView(String fileId, long offset, long size, long logicOffset) {
this.fileId = fileId; this.fileId = fileId;
@ -217,6 +222,16 @@ public class SeaweedRead {
this.size = size; this.size = size;
this.logicOffset = logicOffset; this.logicOffset = logicOffset;
} }
@Override
public String toString() {
return "ChunkView{" +
"fileId='" + fileId + '\'' +
", offset=" + offset +
", size=" + size +
", logicOffset=" + logicOffset +
'}';
}
} }
} }

View file

@ -0,0 +1,66 @@
package seaweedfs.hdfs;
import org.junit.Test;
import seaweed.hdfs.SeaweedRead;
import seaweedfs.client.FilerProto;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class SeaweedReadTest {
@Test
public void testNonOverlappingVisibleIntervals() {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("aaa")
.setOffset(0)
.setSize(100)
.setMtime(1000)
.build());
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("bbb")
.setOffset(100)
.setSize(133)
.setMtime(2000)
.build());
List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
System.out.println("visible:" + visibleInterval);
}
assertEquals(visibleIntervals.size(), 2);
SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0);
assertEquals(visibleInterval.start, 0);
assertEquals(visibleInterval.stop, 100);
assertEquals(visibleInterval.modifiedTime, 1000);
assertEquals(visibleInterval.fileId, "aaa");
visibleInterval = visibleIntervals.get(1);
assertEquals(visibleInterval.start, 100);
assertEquals(visibleInterval.stop, 233);
assertEquals(visibleInterval.modifiedTime, 2000);
assertEquals(visibleInterval.fileId, "bbb");
List<SeaweedRead.ChunkView> chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233);
SeaweedRead.ChunkView chunkView = chunkViews.get(0);
assertEquals(chunkView.offset, 0);
assertEquals(chunkView.size, 100);
assertEquals(chunkView.logicOffset, 0);
assertEquals(chunkView.fileId, "aaa");
chunkView = chunkViews.get(1);
assertEquals(chunkView.offset, 0);
assertEquals(chunkView.size, 133);
assertEquals(chunkView.logicOffset, 100);
assertEquals(chunkView.fileId, "bbb");
}
}