mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Hadoop: switch to ByteBuffer
fix https://github.com/chrislusf/seaweedfs/issues/1645
This commit is contained in:
parent
003b6245e7
commit
3857f9c840
|
@ -68,6 +68,11 @@
|
||||||
<version>4.13.1</version>
|
<version>4.13.1</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>javax.annotation</groupId>
|
||||||
|
<artifactId>javax.annotation-api</artifactId>
|
||||||
|
<version>1.3.2</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -12,6 +12,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
public class SeaweedRead {
|
public class SeaweedRead {
|
||||||
|
@ -23,10 +24,9 @@ public class SeaweedRead {
|
||||||
|
|
||||||
// returns bytesRead
|
// returns bytesRead
|
||||||
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
|
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
|
||||||
final long position, final byte[] buffer, final int bufferOffset,
|
final long position, final ByteBuffer buf, final long fileSize) throws IOException {
|
||||||
final int bufferLength, final long fileSize) throws IOException {
|
|
||||||
|
|
||||||
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
|
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
|
||||||
|
|
||||||
Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
|
Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
|
||||||
|
|
||||||
|
@ -59,6 +59,7 @@ public class SeaweedRead {
|
||||||
if (startOffset < chunkView.logicOffset) {
|
if (startOffset < chunkView.logicOffset) {
|
||||||
long gap = chunkView.logicOffset - startOffset;
|
long gap = chunkView.logicOffset - startOffset;
|
||||||
LOG.debug("zero [{},{})", startOffset, startOffset + gap);
|
LOG.debug("zero [{},{})", startOffset, startOffset + gap);
|
||||||
|
buf.position(buf.position()+ (int)gap);
|
||||||
readCount += gap;
|
readCount += gap;
|
||||||
startOffset += gap;
|
startOffset += gap;
|
||||||
}
|
}
|
||||||
|
@ -70,7 +71,7 @@ public class SeaweedRead {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations);
|
int len = readChunkView(startOffset, buf, chunkView, locations);
|
||||||
|
|
||||||
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
|
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
|
||||||
|
|
||||||
|
@ -79,11 +80,12 @@ public class SeaweedRead {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long limit = Math.min(bufferOffset + bufferLength, fileSize);
|
long limit = Math.min(buf.limit(), fileSize);
|
||||||
|
|
||||||
if (startOffset < limit) {
|
if (startOffset < limit) {
|
||||||
long gap = limit - startOffset;
|
long gap = limit - startOffset;
|
||||||
LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
|
LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
|
||||||
|
buf.position(buf.position()+ (int)gap);
|
||||||
readCount += gap;
|
readCount += gap;
|
||||||
startOffset += gap;
|
startOffset += gap;
|
||||||
}
|
}
|
||||||
|
@ -91,7 +93,7 @@ public class SeaweedRead {
|
||||||
return readCount;
|
return readCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
|
private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
|
||||||
|
|
||||||
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
|
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
|
||||||
|
|
||||||
|
@ -101,9 +103,9 @@ public class SeaweedRead {
|
||||||
}
|
}
|
||||||
|
|
||||||
int len = (int) chunkView.size;
|
int len = (int) chunkView.size;
|
||||||
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}",
|
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}",
|
||||||
chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset);
|
chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset);
|
||||||
System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len);
|
buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len);
|
||||||
|
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,14 +65,8 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
|
public int read(final byte[] b, final int off, final int len) throws IOException {
|
||||||
|
|
||||||
if (position < 0) {
|
|
||||||
throw new IllegalArgumentException("attempting to read from negative offset");
|
|
||||||
}
|
|
||||||
if (position >= contentLength) {
|
|
||||||
return -1; // Hadoop prefers -1 to EOFException
|
|
||||||
}
|
|
||||||
if (b == null) {
|
if (b == null) {
|
||||||
throw new IllegalArgumentException("null byte array passed in to read() method");
|
throw new IllegalArgumentException("null byte array passed in to read() method");
|
||||||
}
|
}
|
||||||
|
@ -86,11 +80,29 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
|
||||||
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
|
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
|
||||||
|
return read(buf);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// implement ByteBufferReadable
|
||||||
|
@Override
|
||||||
|
public synchronized int read(ByteBuffer buf) throws IOException {
|
||||||
|
|
||||||
|
if (position < 0) {
|
||||||
|
throw new IllegalArgumentException("attempting to read from negative offset");
|
||||||
|
}
|
||||||
|
if (position >= contentLength) {
|
||||||
|
return -1; // Hadoop prefers -1 to EOFException
|
||||||
|
}
|
||||||
|
|
||||||
long bytesRead = 0;
|
long bytesRead = 0;
|
||||||
if (position+len <= entry.getContent().size()) {
|
int len = buf.remaining();
|
||||||
entry.getContent().copyTo(b, (int) position, (int) off, len);
|
int start = (int) this.position;
|
||||||
|
if (start+len <= entry.getContent().size()) {
|
||||||
|
entry.getContent().substring(start, start+len).copyTo(buf);
|
||||||
} else {
|
} else {
|
||||||
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
|
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bytesRead > Integer.MAX_VALUE) {
|
if (bytesRead > Integer.MAX_VALUE) {
|
||||||
|
@ -105,13 +117,6 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
|
||||||
}
|
}
|
||||||
|
|
||||||
return (int) bytesRead;
|
return (int) bytesRead;
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// implement ByteBufferReadable
|
|
||||||
@Override
|
|
||||||
public synchronized int read(ByteBuffer buf) throws IOException {
|
|
||||||
return read(buf.array(), buf.position(), buf.remaining());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -288,4 +288,4 @@ public class SeaweedFileSystemStore {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,11 +86,29 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
|
||||||
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
|
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
|
||||||
|
return read(buf);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// implement ByteBufferReadable
|
||||||
|
@Override
|
||||||
|
public synchronized int read(ByteBuffer buf) throws IOException {
|
||||||
|
|
||||||
|
if (position < 0) {
|
||||||
|
throw new IllegalArgumentException("attempting to read from negative offset");
|
||||||
|
}
|
||||||
|
if (position >= contentLength) {
|
||||||
|
return -1; // Hadoop prefers -1 to EOFException
|
||||||
|
}
|
||||||
|
|
||||||
long bytesRead = 0;
|
long bytesRead = 0;
|
||||||
if (position+len <= entry.getContent().size()) {
|
int len = buf.remaining();
|
||||||
entry.getContent().copyTo(b, (int) position, (int) off, len);
|
int start = (int) this.position;
|
||||||
|
if (start+len <= entry.getContent().size()) {
|
||||||
|
entry.getContent().substring(start, start+len).copyTo(buf);
|
||||||
} else {
|
} else {
|
||||||
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
|
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bytesRead > Integer.MAX_VALUE) {
|
if (bytesRead > Integer.MAX_VALUE) {
|
||||||
|
@ -105,13 +123,6 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
|
||||||
}
|
}
|
||||||
|
|
||||||
return (int) bytesRead;
|
return (int) bytesRead;
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// implement ByteBufferReadable
|
|
||||||
@Override
|
|
||||||
public synchronized int read(ByteBuffer buf) throws IOException {
|
|
||||||
return read(buf.array(), buf.position(), buf.remaining());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue