diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java new file mode 100644 index 000000000..2eba4f808 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java @@ -0,0 +1,109 @@ +package seaweedfs.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class ReadChunks { + + public static List readResolvedChunks(List chunkList) throws IOException { + List points = new ArrayList<>(chunkList.size() * 2); + for (FilerProto.FileChunk chunk : chunkList) { + points.add(new Point(chunk.getOffset(), chunk, true)); + points.add(new Point(chunk.getOffset() + chunk.getSize(), chunk, false)); + } + Collections.sort(points, new Comparator() { + @Override + public int compare(Point a, Point b) { + int x = (int) (a.x - b.x); + if (a.x != b.x) { + return (int) (a.x - b.x); + } + if (a.ts != b.ts) { + return (int) (a.ts - b.ts); + } + if (!a.isStart) { + return -1; + } + return 1; + } + }); + + long prevX = 0; + List visibles = new ArrayList<>(); + ArrayList queue = new ArrayList<>(); + for (Point point : points) { + if (point.isStart) { + if (queue.size() > 0) { + int lastIndex = queue.size() - 1; + Point lastPoint = queue.get(lastIndex); + if (point.x != prevX && lastPoint.ts < point.ts) { + addToVisibles(visibles, prevX, lastPoint, point); + prevX = point.x; + } + } + // insert into queue + for (int i = queue.size(); i >= 0; i--) { + if (i == 0 || queue.get(i - 1).ts <= point.ts) { + if (i == queue.size()) { + prevX = point.x; + } + queue.add(i, point); + break; + } + } + } else { + int lastIndex = queue.size() - 1; + int index = lastIndex; + Point startPoint = null; + for (; index >= 0; index--) { + startPoint = queue.get(index); + if (startPoint.ts == point.ts) { + queue.remove(index); + break; + } + } + if (index == lastIndex && startPoint != null) { + addToVisibles(visibles, prevX, startPoint, point); + prevX = point.x; + } + } + } + + return visibles; + + } + + private static void addToVisibles(List visibles, long prevX, Point startPoint, Point point) { + if (prevX < point.x) { + FilerProto.FileChunk chunk = startPoint.chunk; + visibles.add(new SeaweedRead.VisibleInterval( + prevX, + point.x, + chunk.getFileId(), + chunk.getMtime(), + prevX - chunk.getOffset(), + chunk.getOffset() == prevX && chunk.getSize() == prevX - startPoint.x, + chunk.getCipherKey().toByteArray(), + chunk.getIsCompressed() + )); + } + } + + static class Point { + long x; + long ts; + FilerProto.FileChunk chunk; + boolean isStart; + + public Point(long x, FilerProto.FileChunk chunk, boolean isStart) { + this.x = x; + this.ts = chunk.getMtime(); + this.chunk = chunk; + this.isStart = isStart; + } + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 011462a17..41033befb 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -226,96 +226,8 @@ public class SeaweedRead { chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList); - FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); - Arrays.sort(chunks, new Comparator() { - @Override - public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - // if just a.getMtime() - b.getMtime(), it will overflow! - if (a.getMtime() < b.getMtime()) { - return -1; - } else if (a.getMtime() > b.getMtime()) { - return 1; - } - return 0; - } - }); + return ReadChunks.readResolvedChunks(chunkList); - List visibles = new ArrayList<>(); - for (FilerProto.FileChunk chunk : chunks) { - List newVisibles = new ArrayList<>(); - visibles = mergeIntoVisibles(visibles, newVisibles, chunk); - } - - return visibles; - } - - private static List mergeIntoVisibles(List visibles, - List newVisibles, - FilerProto.FileChunk chunk) { - VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - 0, - true, - chunk.getCipherKey().toByteArray(), - chunk.getIsCompressed() - ); - - // easy cases to speed up - if (visibles.size() == 0) { - visibles.add(newV); - return visibles; - } - if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) { - visibles.add(newV); - return visibles; - } - - for (VisibleInterval v : visibles) { - if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { - newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - v.chunkOffset, - false, - v.cipherKey, - v.isCompressed - )); - } - long chunkStop = chunk.getOffset() + chunk.getSize(); - if (v.start < chunkStop && chunkStop < v.stop) { - newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - v.chunkOffset + (chunkStop - v.start), - false, - v.cipherKey, - v.isCompressed - )); - } - if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { - newVisibles.add(v); - } - } - newVisibles.add(newV); - - // keep everything sorted - for (int i = newVisibles.size() - 1; i >= 0; i--) { - if (i > 0 && newV.start < newVisibles.get(i - 1).start) { - newVisibles.set(i, newVisibles.get(i - 1)); - } else { - newVisibles.set(i, newV); - break; - } - } - - return newVisibles; } public static String parseVolumeId(String fileId) { diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java index 44b833c90..6ad9edb2c 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java @@ -6,6 +6,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Random; public class SeaweedReadTest { @@ -13,17 +14,17 @@ public class SeaweedReadTest { public void testNonOverlappingVisibleIntervals() throws IOException { List chunks = new ArrayList<>(); chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("aaa") - .setOffset(0) - .setSize(100) - .setMtime(1000) - .build()); + .setFileId("aaa") + .setOffset(0) + .setSize(100) + .setMtime(1000) + .build()); chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("bbb") - .setOffset(100) - .setSize(133) - .setMtime(2000) - .build()); + .setFileId("bbb") + .setOffset(100) + .setSize(133) + .setMtime(2000) + .build()); List visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks); for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { @@ -61,4 +62,106 @@ public class SeaweedReadTest { } + + @Test + public void testReadResolvedChunks() throws IOException { + List chunks = new ArrayList<>(); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("a") + .setOffset(0) + .setSize(100) + .setMtime(1) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("b") + .setOffset(50) + .setSize(100) + .setMtime(2) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("c") + .setOffset(200) + .setSize(50) + .setMtime(3) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("d") + .setOffset(250) + .setSize(50) + .setMtime(4) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("e") + .setOffset(175) + .setSize(100) + .setMtime(5) + .build()); + + List visibleIntervals = ReadChunks.readResolvedChunks(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + } + + Assert.assertEquals(4, visibleIntervals.size()); + + SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0); + Assert.assertEquals(visibleInterval.start, 0); + Assert.assertEquals(visibleInterval.stop, 50); + Assert.assertEquals(visibleInterval.modifiedTime, 1); + Assert.assertEquals(visibleInterval.fileId, "a"); + + visibleInterval = visibleIntervals.get(1); + Assert.assertEquals(visibleInterval.start, 50); + Assert.assertEquals(visibleInterval.stop, 150); + Assert.assertEquals(visibleInterval.modifiedTime, 2); + Assert.assertEquals(visibleInterval.fileId, "b"); + + visibleInterval = visibleIntervals.get(2); + Assert.assertEquals(visibleInterval.start, 175); + Assert.assertEquals(visibleInterval.stop, 275); + Assert.assertEquals(visibleInterval.modifiedTime, 5); + Assert.assertEquals(visibleInterval.fileId, "e"); + + visibleInterval = visibleIntervals.get(3); + Assert.assertEquals(visibleInterval.start, 275); + Assert.assertEquals(visibleInterval.stop, 300); + Assert.assertEquals(visibleInterval.modifiedTime, 4); + Assert.assertEquals(visibleInterval.fileId, "d"); + + } + + + @Test + public void testRandomizedReadResolvedChunks() throws IOException { + Random random = new Random(); + int limit = 1024*1024; + long[] array = new long[limit]; + List chunks = new ArrayList<>(); + for (long ts=0;ts<1024;ts++){ + int x = random.nextInt(limit); + int y = random.nextInt(limit); + int size = Math.min(Math.abs(x-y), 1024); + chunks.add(randomWrite(array, Math.min(x,y), size, ts)); + } + + List visibleIntervals = ReadChunks.readResolvedChunks(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + for (int i = (int) visibleInterval.start; i 0 { + lastIndex := len(queue) -1 + lastPoint := queue[lastIndex] + if point.x != prevX && lastPoint.ts < point.ts { + visibles = addToVisibles(visibles, prevX, lastPoint, point) + prevX = point.x + } + } + // insert into queue + for i := len(queue); i >= 0; i-- { + if i == 0 || queue[i-1].ts <= point.ts { + if i == len(queue) { + prevX = point.x + } + queue = addToQueue(queue, i, point) + break + } + } + } else { + lastIndex := len(queue) - 1 + index := lastIndex + var startPoint *Point + for ; index >= 0; index-- { + startPoint = queue[index] + if startPoint.ts == point.ts { + queue = removeFromQueue(queue, index) + break + } + } + if index == lastIndex && startPoint != nil { + visibles = addToVisibles(visibles, prevX, startPoint, point) + prevX = point.x + } + } + } + + return +} + +func removeFromQueue(queue []*Point, index int) []*Point { + for i := index; i < len(queue)-1; i++ { + queue[i] = queue[i+1] + } + queue = queue[:len(queue)-1] + return queue +} + +func addToQueue(queue []*Point, index int, point *Point) []*Point { + queue = append(queue, point) + for i := len(queue) - 1; i > index; i-- { + queue[i], queue[i-1] = queue[i-1], queue[i] + } + return queue +} + +func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval { + if prevX < point.x { + chunk := startPoint.chunk + visibles = append(visibles, VisibleInterval{ + start: prevX, + stop: point.x, + fileId: chunk.FileId, + modifiedTime: chunk.Mtime, + chunkOffset: prevX - chunk.Offset, + chunkSize: chunk.Size, + cipherKey: chunk.CipherKey, + isGzipped: chunk.IsCompressed, + }) + } + return visibles +} + +type Point struct { + x int64 + ts int64 + chunk *filer_pb.FileChunk + isStart bool +} diff --git a/weed/filer/filechunks_read_test.go b/weed/filer/filechunks_read_test.go new file mode 100644 index 000000000..1920f5185 --- /dev/null +++ b/weed/filer/filechunks_read_test.go @@ -0,0 +1,119 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "math/rand" + "testing" +) + +func TestReadResolvedChunks(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + { + FileId: "a", + Offset: 0, + Size: 100, + Mtime: 1, + }, + { + FileId: "b", + Offset: 50, + Size: 100, + Mtime: 2, + }, + { + FileId: "c", + Offset: 200, + Size: 50, + Mtime: 3, + }, + { + FileId: "d", + Offset: 250, + Size: 50, + Mtime: 4, + }, + { + FileId: "e", + Offset: 175, + Size: 100, + Mtime: 5, + }, + } + + visibles := readResolvedChunks(chunks) + + for _, visible := range visibles { + fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime) + } + +} + +func TestRandomizedReadResolvedChunks(t *testing.T) { + + var limit int64 = 1024*1024 + array := make([]int64, limit) + var chunks []*filer_pb.FileChunk + for ts := int64(0); ts < 1024; ts++ { + x := rand.Int63n(limit) + y := rand.Int63n(limit) + size := x - y + if size < 0 { + size = -size + } + if size > 1024 { + size = 1024 + } + start := x + if start > y { + start = y + } + chunks = append(chunks, randomWrite(array, start, size, ts)) + } + + visibles := readResolvedChunks(chunks) + + for _, visible := range visibles { + for i := visible.start; i