Merge pull request #24 from chrislusf/master

sync
This commit is contained in:
hilimd 2020-10-10 16:20:26 +08:00 committed by GitHub
commit b9a446839a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 54 additions and 30 deletions

View file

@ -40,7 +40,7 @@ public class SeaweedRead {
//TODO parallel this //TODO parallel this
long readCount = 0; long readCount = 0;
int startOffset = bufferOffset; long startOffset = position;
for (ChunkView chunkView : chunkViews) { for (ChunkView chunkView : chunkViews) {
if (startOffset < chunkView.logicOffset) { if (startOffset < chunkView.logicOffset) {
@ -57,7 +57,7 @@ public class SeaweedRead {
return 0; return 0;
} }
int len = readChunkView(position, buffer, startOffset, chunkView, locations); int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations);
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
@ -66,7 +66,7 @@ public class SeaweedRead {
} }
long limit = Math.min(bufferLength, fileSize); long limit = Math.min(bufferOffset + bufferLength, fileSize);
if (startOffset < limit) { if (startOffset < limit) {
long gap = limit - startOffset; long gap = limit - startOffset;
@ -78,7 +78,7 @@ public class SeaweedRead {
return readCount; return readCount;
} }
private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId); byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
@ -88,17 +88,51 @@ public class SeaweedRead {
} }
int len = (int) chunkView.size; int len = (int) chunkView.size;
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}",
chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset);
System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len); System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len);
return len; return len;
} }
public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpGet request = new HttpGet( byte[] data = null;
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); IOException lastException = null;
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
for (FilerProto.Location location : locations.getLocationsList()) {
String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId);
try {
data = doFetchOneFullChunkData(chunkView, url);
lastException = null;
break;
} catch (IOException ioe) {
LOG.debug("doFetchFullChunkData {} :{}", url, ioe);
lastException = ioe;
}
}
if (data != null) {
break;
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
}
}
if (lastException != null) {
throw lastException;
}
LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
return data;
}
public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
HttpGet request = new HttpGet(url);
request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
@ -142,7 +176,7 @@ public class SeaweedRead {
data = Gzip.decompress(data); data = Gzip.decompress(data);
} }
LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length);
return data; return data;

View file

@ -74,7 +74,7 @@ public class SeaweedFileSystem extends FileSystem {
try { try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);

View file

@ -205,10 +205,9 @@ public class SeaweedFileSystemStore {
} }
public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException {
int bufferSize) throws IOException {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); LOG.debug("openFileForRead path:{}", path);
FilerProto.Entry entry = lookupEntry(path); FilerProto.Entry entry = lookupEntry(path);
@ -219,8 +218,7 @@ public class SeaweedFileSystemStore {
return new SeaweedInputStream(filerGrpcClient, return new SeaweedInputStream(filerGrpcClient,
statistics, statistics,
path.toUri().getPath(), path.toUri().getPath(),
entry, entry);
bufferSize);
} }
public void setOwner(Path path, String owner, String group) { public void setOwner(Path path, String owner, String group) {

View file

@ -25,7 +25,6 @@ public class SeaweedInputStream extends FSInputStream {
private final FilerProto.Entry entry; private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength; private final long contentLength;
private final int bufferSize; // default buffer size
private long position = 0; // cursor of the file private long position = 0; // cursor of the file
@ -35,14 +34,12 @@ public class SeaweedInputStream extends FSInputStream {
final FilerGrpcClient filerGrpcClient, final FilerGrpcClient filerGrpcClient,
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final FilerProto.Entry entry, final FilerProto.Entry entry) throws IOException {
final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient; this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
this.entry = entry; this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry); this.contentLength = SeaweedRead.fileSize(entry);
this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());

View file

@ -74,7 +74,7 @@ public class SeaweedFileSystem extends FileSystem {
try { try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);

View file

@ -205,10 +205,9 @@ public class SeaweedFileSystemStore {
} }
public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException {
int bufferSize) throws IOException {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); LOG.debug("openFileForRead path:{}", path);
FilerProto.Entry entry = lookupEntry(path); FilerProto.Entry entry = lookupEntry(path);
@ -219,8 +218,7 @@ public class SeaweedFileSystemStore {
return new SeaweedInputStream(filerGrpcClient, return new SeaweedInputStream(filerGrpcClient,
statistics, statistics,
path.toUri().getPath(), path.toUri().getPath(),
entry, entry);
bufferSize);
} }
public void setOwner(Path path, String owner, String group) { public void setOwner(Path path, String owner, String group) {

View file

@ -25,7 +25,6 @@ public class SeaweedInputStream extends FSInputStream {
private final FilerProto.Entry entry; private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength; private final long contentLength;
private final int bufferSize; // default buffer size
private long position = 0; // cursor of the file private long position = 0; // cursor of the file
@ -35,14 +34,12 @@ public class SeaweedInputStream extends FSInputStream {
final FilerGrpcClient filerGrpcClient, final FilerGrpcClient filerGrpcClient,
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final FilerProto.Entry entry, final FilerProto.Entry entry) throws IOException {
final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient; this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
this.entry = entry; this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry); this.contentLength = SeaweedRead.fileSize(entry);
this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());