HCFS can ls, mkdir

This commit is contained in:
Chris Lu 2018-12-02 22:44:49 -08:00
parent f0677c5af1
commit d3be8e022f
3 changed files with 46 additions and 19 deletions

View file

@ -17,7 +17,7 @@ import java.net.URI;
public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8333;
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
@ -87,10 +87,10 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public boolean rename(Path src, Path dst) throws IOException {
Path parentFolder = src.getParent();
if (parentFolder == null) {
if (src.isRoot()) {
return false;
}
if (src.equals(dst)) {
return true;
}

View file

@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@ -26,6 +27,19 @@ public class SeaweedFileSystemStore {
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
}
static String getParentDirectory(Path path) {
return path.isRoot() ? "/" : path.getParent().toUri().getPath();
}
static int permissionToMode(FsPermission permission, boolean isDirectory) {
int p = permission.toShort();
if (isDirectory) {
p = p | 1 << 31;
}
System.out.println(permission + " = " + p);
return p;
}
public boolean createDirectory(final Path path, UserGroupInformation currentUser,
final FsPermission permission, final FsPermission umask) {
@ -37,14 +51,14 @@ public class SeaweedFileSystemStore {
long now = System.currentTimeMillis() / 1000L;
FilerProto.CreateEntryRequest.Builder request = FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(path.getParent().toUri().getPath())
.setDirectory(getParentDirectory(path))
.setEntry(FilerProto.Entry.newBuilder()
.setName(path.getName())
.setIsDirectory(true)
.setAttributes(FilerProto.FuseAttributes.newBuilder()
.setMtime(now)
.setCrtime(now)
.setFileMode(permission.toShort())
.setFileMode(permissionToMode(permission, true))
.setUserName(currentUser.getUserName())
.addAllGroupName(Arrays.asList(currentUser.getGroupNames())))
);
@ -76,7 +90,7 @@ public class SeaweedFileSystemStore {
.build()).getEntriesList();
}
public FileStatus getFileStatus(final Path path) {
public FileStatus getFileStatus(final Path path) throws FileNotFoundException {
LOG.debug("getFileStatus path: {}", path);
FilerProto.Entry entry = lookupEntry(path);
@ -90,9 +104,13 @@ public class SeaweedFileSystemStore {
String.valueOf(isDirectroy),
String.valueOf(recursive));
if (path.isRoot()) {
return true;
}
FilerProto.DeleteEntryResponse response =
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
.setDirectory(path.getParent().toUri().getPath())
.setDirectory(getParentDirectory(path))
.setName(path.getName())
.setIsDirectory(isDirectroy)
.setIsDeleteData(true)
@ -101,7 +119,6 @@ public class SeaweedFileSystemStore {
return true;
}
private FileStatus getFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes();
long length = attributes.getFileSize();
@ -117,18 +134,26 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
private FilerProto.Entry lookupEntry(Path path) throws FileNotFoundException {
private FilerProto.Entry lookupEntry(Path path) {
FilerProto.LookupDirectoryEntryResponse response =
filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(path.getParent().toUri().getPath())
.setName(path.getName())
.build());
String directory = getParentDirectory(path);
return response.getEntry();
try {
FilerProto.LookupDirectoryEntryResponse response =
filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(directory)
.setName(path.getName())
.build());
return response.getEntry();
} catch (io.grpc.StatusRuntimeException e) {
throw new FileNotFoundException(e.getMessage());
}
}
public void rename(Path source, Path destination) {
public void rename(Path source, Path destination) throws FileNotFoundException {
if (source.isRoot()) {
return;
}
FilerProto.Entry entry = lookupEntry(source);
moveEntry(source.getParent(), entry, destination);
}
@ -146,7 +171,7 @@ public class SeaweedFileSystemStore {
}
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(destination.getParent().toUri().getPath())
.setDirectory(getParentDirectory(destination))
.build());
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
@ -186,7 +211,7 @@ public class SeaweedFileSystemStore {
if (entry == null) {
entry = FilerProto.Entry.newBuilder()
.setAttributes(FilerProto.FuseAttributes.newBuilder()
.setFileMode(permission.toOctal())
.setFileMode(permissionToMode(permission, false))
.setReplication(replication)
.setCrtime(System.currentTimeMillis() / 1000L)
.setUserName(userGroupInformation.getUserName())

View file

@ -14,6 +14,8 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
public class SeaweedWrite {
public static void writeData(FilerProto.Entry.Builder entry,
@ -50,7 +52,7 @@ public class SeaweedWrite {
final Path path, final FilerProto.Entry.Builder entry) {
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(path.getParent().toUri().getPath())
.setDirectory(getParentDirectory(path))
.setEntry(entry)
.build()
);