* add option to include last entry
*
This commit is contained in:
Chris Lu 2020-07-11 22:24:02 -07:00
parent 578f316173
commit 4bd8f3281e
10 changed files with 43 additions and 25 deletions

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.2.9</version> <version>1.3.0</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.2.9</version> <version>1.3.0</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

View file

@ -156,7 +156,7 @@ public class FilerClient {
List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>(); List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>();
String lastFileName = ""; String lastFileName = "";
for (int limit = Integer.MAX_VALUE; limit > 0; ) { for (int limit = Integer.MAX_VALUE; limit > 0; ) {
List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024); List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false);
if (t == null) { if (t == null) {
break; break;
} }
@ -173,11 +173,12 @@ public class FilerClient {
return results; return results;
} }
public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) { public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path) .setDirectory(path)
.setPrefix(entryPrefix) .setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName) .setStartFromFileName(lastEntryName)
.setInclusiveStartFrom(includeLastEntry)
.setLimit(limit) .setLimit(limit)
.build()); .build());
List<FilerProto.Entry> entries = new ArrayList<>(); List<FilerProto.Entry> entries = new ArrayList<>();

View file

@ -4,9 +4,9 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders; import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -79,7 +79,6 @@ public class SeaweedRead {
private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpClient client = HttpClientBuilder.create().build();
HttpGet request = new HttpGet( HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@ -87,17 +86,15 @@ public class SeaweedRead {
byte[] data = null; byte[] data = null;
CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request);
try { try {
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity(); HttpEntity entity = response.getEntity();
data = EntityUtils.toByteArray(entity); data = EntityUtils.toByteArray(entity);
} finally { } finally {
if (client instanceof Closeable) { response.close();
Closeable t = (Closeable) client;
t.close();
}
} }
if (chunkView.isCompressed) { if (chunkView.isCompressed) {

View file

@ -0,0 +1,24 @@
package seaweedfs.client;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
public class SeaweedUtil {
static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
static {
// Increase max total connection to 200
cm.setMaxTotal(200);
// Increase default max connection per route to 20
cm.setDefaultMaxPerRoute(20);
}
public static CloseableHttpClient getClosableHttpClient() {
return HttpClientBuilder.create()
.setConnectionManager(cm)
.build();
}
}

View file

@ -3,10 +3,10 @@ package seaweedfs.client;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.Closeable; import java.io.Closeable;
@ -16,7 +16,7 @@ import java.security.SecureRandom;
public class SeaweedWrite { public class SeaweedWrite {
private static SecureRandom random = new SecureRandom(); private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry, public static void writeData(FilerProto.Entry.Builder entry,
final String replication, final String replication,
@ -63,7 +63,7 @@ public class SeaweedWrite {
public static void writeMeta(final FilerGrpcClient filerGrpcClient, public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String parentDirectory, final FilerProto.Entry.Builder entry) { final String parentDirectory, final FilerProto.Entry.Builder entry) {
synchronized (entry){ synchronized (entry) {
filerGrpcClient.getBlockingStub().createEntry( filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder() FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory) .setDirectory(parentDirectory)
@ -79,8 +79,6 @@ public class SeaweedWrite {
final long bytesOffset, final long bytesLength, final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException { byte[] cipherKey) throws IOException {
HttpClient client = HttpClientBuilder.create().build();
InputStream inputStream = null; InputStream inputStream = null;
if (cipherKey == null || cipherKey.length == 0) { if (cipherKey == null || cipherKey.length == 0) {
inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
@ -103,8 +101,9 @@ public class SeaweedWrite {
.addBinaryBody("upload", inputStream) .addBinaryBody("upload", inputStream)
.build()); .build());
CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post);
try { try {
HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue(); String etag = response.getLastHeader("ETag").getValue();
@ -114,10 +113,7 @@ public class SeaweedWrite {
return etag; return etag;
} finally { } finally {
if (client instanceof Closeable) { response.close();
Closeable t = (Closeable) client;
t.close();
}
} }
} }

View file

@ -127,7 +127,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.2.9</seaweedfs.client.version> <seaweedfs.client.version>1.3.0</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>
</project> </project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.2.9</seaweedfs.client.version> <seaweedfs.client.version>1.3.0</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>

View file

@ -127,7 +127,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.2.9</seaweedfs.client.version> <seaweedfs.client.version>1.3.0</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>
</project> </project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.2.9</seaweedfs.client.version> <seaweedfs.client.version>1.3.0</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>