From 5b5018265857bdc83ae7673042223dc3f7020bbd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Dec 2018 01:37:29 -0800 Subject: [PATCH] put file is working --- .../java/seaweed/hdfs/SeaweedFileSystem.java | 53 ++++++++++++++----- .../seaweed/hdfs/SeaweedFileSystemStore.java | 48 ++++++++++++----- .../seaweed/hdfs/SeaweedOutputStream.java | 9 ++++ 3 files changed, 84 insertions(+), 26 deletions(-) diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 752758a4e..79c2641ec 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -9,8 +9,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.net.URI; @@ -21,6 +22,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); + private URI uri; private Path workingDirectory = new Path("/"); private SeaweedFileSystemStore seaweedFileSystemStore; @@ -57,6 +60,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } public FSDataInputStream open(Path path, int bufferSize) throws IOException { + + LOG.debug("open path: {} bufferSize:{}", path, bufferSize); + path = qualify(path); return null; @@ -64,6 +70,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { + + LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize); + path = qualify(path); try { @@ -76,6 +85,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { + + LOG.debug("append path: {} bufferSize:{}", path, bufferSize); + path = qualify(path); try { OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, ""); @@ -85,7 +97,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } } - public boolean rename(Path src, Path dst) throws IOException { + public boolean rename(Path src, Path dst) { + + LOG.debug("rename path: {} => {}", src, dst); if (src.isRoot()) { return false; @@ -110,14 +124,17 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { Path qualifiedDstPath = qualify(adjustedDst); seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath); - return false; + return true; } - public boolean delete(Path path, boolean recursive) throws IOException { + public boolean delete(Path path, boolean recursive) { + + LOG.debug("delete path: {} recursive:{}", path, recursive); path = qualify(path); FileStatus fileStatus = getFileStatus(path); + if (fileStatus == null) { return true; } @@ -126,7 +143,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } - public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { + public FileStatus[] listStatus(Path path) throws IOException { + + LOG.debug("listStatus path: {}", path); path = qualify(path); @@ -147,25 +166,31 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { + LOG.debug("mkdirs path: {}", path); + path = qualify(path); - try { - FileStatus fileStatus = getFileStatus(path); + FileStatus fileStatus = getFileStatus(path); + + if (fileStatus == null) { - if (fileStatus.isDirectory()) { - return true; - } else { - throw new FileAlreadyExistsException("Path is a file: " + path); - } - } catch (FileNotFoundException e) { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return seaweedFileSystemStore.createDirectory(path, currentUser, fsPermission == null ? FsPermission.getDirDefault() : fsPermission, FsPermission.getUMask(getConf())); + + } + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + path); } } - public FileStatus getFileStatus(Path path) throws IOException { + public FileStatus getFileStatus(Path path) { + + LOG.debug("getFileStatus path: {}", path); path = qualify(path); diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index d58d07ea8..7cc12424b 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -9,7 +9,6 @@ import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -27,7 +26,7 @@ public class SeaweedFileSystemStore { filerGrpcClient = new FilerGrpcClient(host, grpcPort); } - static String getParentDirectory(Path path) { + public static String getParentDirectory(Path path) { return path.isRoot() ? "/" : path.getParent().toUri().getPath(); } @@ -76,7 +75,7 @@ public class SeaweedFileSystemStore { for (FilerProto.Entry entry : entries) { - FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry); + FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry); fileStatuses.add(fileStatus); } @@ -84,17 +83,23 @@ public class SeaweedFileSystemStore { } private List lookupEntries(Path path) { + + LOG.debug("listEntries path: {}", path); + return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path.toUri().getPath()) .setLimit(100000) .build()).getEntriesList(); } - public FileStatus getFileStatus(final Path path) throws FileNotFoundException { - LOG.debug("getFileStatus path: {}", path); + public FileStatus getFileStatus(final Path path) { + LOG.debug("doGetFileStatus path: {}", path); FilerProto.Entry entry = lookupEntry(path); - FileStatus fileStatus = getFileStatus(path, entry); + if (entry == null) { + return null; + } + FileStatus fileStatus = doGetFileStatus(path, entry); return fileStatus; } @@ -119,7 +124,7 @@ public class SeaweedFileSystemStore { return true; } - private FileStatus getFileStatus(Path path, FilerProto.Entry entry) { + private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { FilerProto.FuseAttributes attributes = entry.getAttributes(); long length = attributes.getFileSize(); boolean isDir = entry.getIsDirectory(); @@ -134,7 +139,7 @@ public class SeaweedFileSystemStore { modification_time, access_time, permission, owner, group, null, path); } - private FilerProto.Entry lookupEntry(Path path) throws FileNotFoundException { + private FilerProto.Entry lookupEntry(Path path) { String directory = getParentDirectory(path); @@ -146,19 +151,31 @@ public class SeaweedFileSystemStore { .build()); return response.getEntry(); } catch (io.grpc.StatusRuntimeException e) { - throw new FileNotFoundException(e.getMessage()); + return null; } } - public void rename(Path source, Path destination) throws FileNotFoundException { + public void rename(Path source, Path destination) { + + LOG.debug("rename source: {} destination:{}", source, destination); + if (source.isRoot()) { return; } + LOG.warn("rename lookupEntry source: {}", source); FilerProto.Entry entry = lookupEntry(source); + if (entry == null) { + LOG.warn("rename non-existing source: {}", source); + return; + } + LOG.warn("rename moveEntry source: {}", source); moveEntry(source.getParent(), entry, destination); } private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) { + + LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination); + if (entry.getIsDirectory()) { Path entryPath = new Path(oldParent, entry.getName()); List entries = lookupEntries(entryPath); @@ -170,8 +187,10 @@ public class SeaweedFileSystemStore { } } + FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName()); filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() .setDirectory(getParentDirectory(destination)) + .setEntry(newEntry) .build()); filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() @@ -197,23 +216,28 @@ public class SeaweedFileSystemStore { permission.toString()); UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); + long now = System.currentTimeMillis() / 1000L; - FilerProto.Entry.Builder entry = FilerProto.Entry.newBuilder(); + FilerProto.Entry.Builder entry = null; long writePosition = 0; if (!overwrite) { FilerProto.Entry existingEntry = lookupEntry(path); if (existingEntry != null) { entry.mergeFrom(existingEntry); + entry.getAttributesBuilder().setMtime(now); } writePosition = existingEntry.getAttributes().getFileSize(); replication = existingEntry.getAttributes().getReplication(); } if (entry == null) { entry = FilerProto.Entry.newBuilder() + .setName(path.getName()) + .setIsDirectory(false) .setAttributes(FilerProto.FuseAttributes.newBuilder() .setFileMode(permissionToMode(permission, false)) .setReplication(replication) - .setCrtime(System.currentTimeMillis() / 1000L) + .setCrtime(now) + .setMtime(now) .setUserName(userGroupInformation.getUserName()) .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 5da77c3a3..19894956a 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -7,6 +7,8 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; @@ -24,6 +26,8 @@ import java.util.concurrent.TimeUnit; public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities { + private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); + private final FilerGrpcClient filerGrpcClient; private final Path path; private final int bufferSize; @@ -71,6 +75,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea } private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { + + LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry); + try { SeaweedWrite.writeMeta(filerGrpcClient, path, entry); } catch (Exception ex) { @@ -186,6 +193,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea return; } + LOG.debug("close path: {}", path); try { flushInternal(); threadExecutor.shutdown(); @@ -293,6 +301,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea throw lastError; } } + LOG.debug("flushWrittenBytesToService: {} position:{}", path, position); flushWrittenBytesToServiceInternal(position); }