SeaweedFileSystem add rename

This commit is contained in:
Chris Lu 2018-11-25 14:48:17 -08:00
parent 1cbd53c01c
commit 6ecefad692
2 changed files with 76 additions and 14 deletions

View file

@ -67,7 +67,31 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
return null;
}
public boolean rename(Path path, Path path1) throws IOException {
public boolean rename(Path src, Path dst) throws IOException {
Path parentFolder = src.getParent();
if (parentFolder == null) {
return false;
}
if (src.equals(dst)){
return true;
}
FileStatus dstFileStatus = getFileStatus(dst);
String sourceFileName = src.getName();
Path adjustedDst = dst;
if (dstFileStatus != null) {
if (!dstFileStatus.isDirectory()) {
return false;
}
adjustedDst = new Path(dst, sourceFileName);
}
Path qualifiedSrcPath = qualify(src);
Path qualifiedDstPath = qualify(adjustedDst);
seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath);
return false;
}

View file

@ -55,13 +55,9 @@ public class SeaweedFileSystemStore {
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
FilerProto.ListEntriesResponse response =
filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path.toUri().getPath())
.setLimit(100000)
.build());
List<FilerProto.Entry> entries = lookupEntries(path);
for (FilerProto.Entry entry : response.getEntriesList()) {
for (FilerProto.Entry entry : entries) {
FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry);
@ -70,16 +66,17 @@ public class SeaweedFileSystemStore {
return fileStatuses.toArray(new FileStatus[0]);
}
private List<FilerProto.Entry> lookupEntries(Path path) {
return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path.toUri().getPath())
.setLimit(100000)
.build()).getEntriesList();
}
public FileStatus getFileStatus(final Path path) {
LOG.debug("getFileStatus path: {}", path);
FilerProto.LookupDirectoryEntryResponse response =
filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(path.getParent().toUri().getPath())
.setName(path.getName())
.build());
FilerProto.Entry entry = response.getEntry();
FilerProto.Entry entry = lookupEntry(path);
FileStatus fileStatus = getFileStatus(path, entry);
return fileStatus;
}
@ -117,4 +114,45 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
private FilerProto.Entry lookupEntry(Path path) {
FilerProto.LookupDirectoryEntryResponse response =
filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(path.getParent().toUri().getPath())
.setName(path.getName())
.build());
return response.getEntry();
}
public void rename(Path source, Path destination) {
FilerProto.Entry entry = lookupEntry(source);
moveEntry(source.getParent(), entry, destination);
}
private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) {
if (entry.getIsDirectory()) {
Path entryPath = new Path(oldParent, entry.getName());
List<FilerProto.Entry> entries = lookupEntries(entryPath);
for (FilerProto.Entry ent : entries) {
boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName()));
if (!isSucess) {
return false;
}
}
}
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(destination.getParent().toUri().getPath())
.build());
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
.setDirectory(oldParent.toUri().getPath())
.setName(entry.getName())
.setIsDirectory(entry.getIsDirectory())
.setIsDeleteData(false)
.build());
return true;
}
}