mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
java: add configurable volume access mode
This commit is contained in:
parent
8c3177d835
commit
694df89331
|
@ -74,7 +74,7 @@ public class FileChunkManifest {
|
||||||
byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
|
byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
|
||||||
if (chunkData == null) {
|
if (chunkData == null) {
|
||||||
LOG.debug("doFetchFullChunkData:{}", chunkView);
|
LOG.debug("doFetchFullChunkData:{}", chunkView);
|
||||||
chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
|
chunkData = SeaweedRead.doFetchFullChunkData(filerGrpcClient, chunkView, locations);
|
||||||
}
|
}
|
||||||
if (chunk.getIsChunkManifest()){
|
if (chunk.getIsChunkManifest()){
|
||||||
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
|
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
|
||||||
|
|
|
@ -15,6 +15,10 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class FilerGrpcClient {
|
public class FilerGrpcClient {
|
||||||
|
|
||||||
|
public final int VOLUME_SERVER_ACCESS_DIRECT = 0;
|
||||||
|
public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1;
|
||||||
|
public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class);
|
private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class);
|
||||||
static SslContext sslContext;
|
static SslContext sslContext;
|
||||||
|
|
||||||
|
@ -34,6 +38,8 @@ public class FilerGrpcClient {
|
||||||
private boolean cipher = false;
|
private boolean cipher = false;
|
||||||
private String collection = "";
|
private String collection = "";
|
||||||
private String replication = "";
|
private String replication = "";
|
||||||
|
private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
|
||||||
|
private String filerAddress;
|
||||||
|
|
||||||
public FilerGrpcClient(String host, int grpcPort) {
|
public FilerGrpcClient(String host, int grpcPort) {
|
||||||
this(host, grpcPort, sslContext);
|
this(host, grpcPort, sslContext);
|
||||||
|
@ -49,6 +55,8 @@ public class FilerGrpcClient {
|
||||||
.negotiationType(NegotiationType.TLS)
|
.negotiationType(NegotiationType.TLS)
|
||||||
.sslContext(sslContext));
|
.sslContext(sslContext));
|
||||||
|
|
||||||
|
filerAddress = String.format("%s:%d", host, grpcPort-10000);
|
||||||
|
|
||||||
FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
|
FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
|
||||||
this.getBlockingStub().getFilerConfiguration(
|
this.getBlockingStub().getFilerConfiguration(
|
||||||
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
|
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
|
||||||
|
@ -58,7 +66,7 @@ public class FilerGrpcClient {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
|
private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
|
||||||
channel = channelBuilder.build();
|
channel = channelBuilder.build();
|
||||||
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
|
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
|
||||||
asyncStub = SeaweedFilerGrpc.newStub(channel);
|
asyncStub = SeaweedFilerGrpc.newStub(channel);
|
||||||
|
@ -93,4 +101,26 @@ public class FilerGrpcClient {
|
||||||
return futureStub;
|
return futureStub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setAccessVolumeServerDirectly() {
|
||||||
|
this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
|
||||||
|
}
|
||||||
|
public boolean isAccessVolumeServerDirectly() {
|
||||||
|
return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT;
|
||||||
|
}
|
||||||
|
public void setAccessVolumeServerByPublicUrl() {
|
||||||
|
this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL;
|
||||||
|
}
|
||||||
|
public boolean isAccessVolumeServerByPublicUrl() {
|
||||||
|
return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL;
|
||||||
|
}
|
||||||
|
public void setAccessVolumeServerByFilerProxy() {
|
||||||
|
this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY;
|
||||||
|
}
|
||||||
|
public boolean isAccessVolumeServerByFilerProxy() {
|
||||||
|
return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY;
|
||||||
|
}
|
||||||
|
public String getFilerAddress() {
|
||||||
|
return this.filerAddress;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class SeaweedRead {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int len = readChunkView(startOffset, buf, chunkView, locations);
|
int len = readChunkView(filerGrpcClient, startOffset, buf, chunkView, locations);
|
||||||
|
|
||||||
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
|
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
|
||||||
|
|
||||||
|
@ -93,12 +93,12 @@ public class SeaweedRead {
|
||||||
return readCount;
|
return readCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
|
private static int readChunkView(FilerGrpcClient filerGrpcClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
|
||||||
|
|
||||||
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
|
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
|
||||||
|
|
||||||
if (chunkData == null) {
|
if (chunkData == null) {
|
||||||
chunkData = doFetchFullChunkData(chunkView, locations);
|
chunkData = doFetchFullChunkData(filerGrpcClient, chunkView, locations);
|
||||||
chunkCache.setChunk(chunkView.fileId, chunkData);
|
chunkCache.setChunk(chunkView.fileId, chunkData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,12 +110,18 @@ public class SeaweedRead {
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
|
public static byte[] doFetchFullChunkData(FilerGrpcClient filerGrpcClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
|
||||||
|
|
||||||
byte[] data = null;
|
byte[] data = null;
|
||||||
IOException lastException = null;
|
IOException lastException = null;
|
||||||
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
|
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
|
||||||
for (FilerProto.Location location : locations.getLocationsList()) {
|
for (FilerProto.Location location : locations.getLocationsList()) {
|
||||||
|
String host = location.getUrl();
|
||||||
|
if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) {
|
||||||
|
host = location.getPublicUrl();
|
||||||
|
} else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) {
|
||||||
|
host = filerGrpcClient.getFilerAddress();
|
||||||
|
}
|
||||||
String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId);
|
String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId);
|
||||||
try {
|
try {
|
||||||
data = doFetchOneFullChunkData(chunkView, url);
|
data = doFetchOneFullChunkData(chunkView, url);
|
||||||
|
@ -145,7 +151,7 @@ public class SeaweedRead {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
|
private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
|
||||||
|
|
||||||
HttpGet request = new HttpGet(url);
|
HttpGet request = new HttpGet(url);
|
||||||
|
|
||||||
|
|
|
@ -51,9 +51,15 @@ public class SeaweedWrite {
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
.build());
|
.build());
|
||||||
String fileId = response.getFileId();
|
String fileId = response.getFileId();
|
||||||
String url = response.getUrl();
|
|
||||||
String auth = response.getAuth();
|
String auth = response.getAuth();
|
||||||
String targetUrl = String.format("http://%s/%s", url, fileId);
|
|
||||||
|
String host = response.getUrl();
|
||||||
|
if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) {
|
||||||
|
host = response.getPublicUrl();
|
||||||
|
} else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) {
|
||||||
|
host = filerGrpcClient.getFilerAddress();
|
||||||
|
}
|
||||||
|
String targetUrl = String.format("http://%s/%s", host, fileId);
|
||||||
|
|
||||||
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
|
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
|
||||||
byte[] cipherKey = null;
|
byte[] cipherKey = null;
|
||||||
|
|
|
@ -26,6 +26,7 @@ public class SeaweedFileSystem extends FileSystem {
|
||||||
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
|
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
|
||||||
public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
|
public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
|
||||||
public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication";
|
public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication";
|
||||||
|
public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volumeServerAccess";
|
||||||
public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
|
public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
|
||||||
|
|
|
@ -18,8 +18,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE;
|
import static seaweed.hdfs.SeaweedFileSystem.*;
|
||||||
import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE;
|
|
||||||
|
|
||||||
public class SeaweedFileSystemStore {
|
public class SeaweedFileSystemStore {
|
||||||
|
|
||||||
|
@ -34,6 +33,13 @@ public class SeaweedFileSystemStore {
|
||||||
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
|
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
|
||||||
filerClient = new FilerClient(filerGrpcClient);
|
filerClient = new FilerClient(filerGrpcClient);
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
|
||||||
|
if (volumeServerAccessMode.equals("publicUrl")) {
|
||||||
|
filerGrpcClient.setAccessVolumeServerByPublicUrl();
|
||||||
|
} else if (volumeServerAccessMode.equals("filerProxy")) {
|
||||||
|
filerGrpcClient.setAccessVolumeServerByFilerProxy();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|
Loading…
Reference in a new issue