From fd509c384403ed73e2fbb850c62a2176fd8bc820 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 3 Sep 2019 00:50:28 -0700 Subject: [PATCH] HCFS: working with HBase --- .../java/seaweed/hdfs/SeaweedFileSystem.java | 37 ++++++++++++++---- .../seaweed/hdfs/SeaweedFileSystemStore.java | 9 +++-- .../java/seaweed/hdfs/SeaweedFileSystem.java | 39 ++++++++++++++----- .../seaweed/hdfs/SeaweedFileSystemStore.java | 9 +++-- 4 files changed, 69 insertions(+), 25 deletions(-) diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 7cf76e5e8..d471d8440 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -1,14 +1,7 @@ package seaweed.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; @@ -87,6 +80,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); return new FSDataInputStream(inputStream); } catch (Exception ex) { + LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; } } @@ -104,10 +98,36 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { + LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); return null; } } + /** + * {@inheritDoc} + * @throws FileNotFoundException if the parent directory is not present -or + * is not a directory. + */ + @Override + public FSDataOutputStream createNonRecursive(Path path, + FsPermission permission, + EnumSet flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + Path parent = path.getParent(); + if (parent != null) { + // expect this to raise an exception if there is no parent + if (!getFileStatus(parent).isDirectory()) { + throw new FileAlreadyExistsException("Not a directory: " + parent); + } + } + return create(path, permission, + flags.contains(CreateFlag.OVERWRITE), bufferSize, + replication, blockSize, progress); + } + @Override public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { @@ -118,6 +138,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { + LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); return null; } } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 2ddcd41e8..826b74560 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -137,7 +137,7 @@ public class SeaweedFileSystemStore { if (source.isRoot()) { return; } - LOG.warn("rename lookupEntry source: {}", source); + LOG.warn("rename source: {} destination:{}", source, destination); FilerProto.Entry entry = lookupEntry(source); if (entry == null) { LOG.warn("rename non-existing source: {}", source); @@ -171,10 +171,10 @@ public class SeaweedFileSystemStore { entry = FilerProto.Entry.newBuilder(); entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); + LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); + writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + replication = existingEntry.getAttributes().getReplication(); } - LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); - replication = existingEntry.getAttributes().getReplication(); } if (entry == null) { entry = FilerProto.Entry.newBuilder() @@ -266,4 +266,5 @@ public class SeaweedFileSystemStore { filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); } + } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 7cf76e5e8..c12da8261 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -1,14 +1,7 @@ package seaweed.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; @@ -29,7 +22,7 @@ import java.util.Map; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; -public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { +public class SeaweedFileSystem extends FileSystem { public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; @@ -87,6 +80,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); return new FSDataInputStream(inputStream); } catch (Exception ex) { + LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; } } @@ -104,10 +98,36 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { + LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); return null; } } + /** + * {@inheritDoc} + * @throws FileNotFoundException if the parent directory is not present -or + * is not a directory. + */ + @Override + public FSDataOutputStream createNonRecursive(Path path, + FsPermission permission, + EnumSet flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + Path parent = path.getParent(); + if (parent != null) { + // expect this to raise an exception if there is no parent + if (!getFileStatus(parent).isDirectory()) { + throw new FileAlreadyExistsException("Not a directory: " + parent); + } + } + return create(path, permission, + flags.contains(CreateFlag.OVERWRITE), bufferSize, + replication, blockSize, progress); + } + @Override public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { @@ -118,6 +138,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { + LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); return null; } } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 2ddcd41e8..826b74560 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -137,7 +137,7 @@ public class SeaweedFileSystemStore { if (source.isRoot()) { return; } - LOG.warn("rename lookupEntry source: {}", source); + LOG.warn("rename source: {} destination:{}", source, destination); FilerProto.Entry entry = lookupEntry(source); if (entry == null) { LOG.warn("rename non-existing source: {}", source); @@ -171,10 +171,10 @@ public class SeaweedFileSystemStore { entry = FilerProto.Entry.newBuilder(); entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); + LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); + writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + replication = existingEntry.getAttributes().getReplication(); } - LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); - replication = existingEntry.getAttributes().getReplication(); } if (entry == null) { entry = FilerProto.Entry.newBuilder() @@ -266,4 +266,5 @@ public class SeaweedFileSystemStore { filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); } + }