HCFS: working with HBase

This commit is contained in:
Chris Lu 2019-09-03 00:50:28 -07:00
parent 60c9215a00
commit fd509c3844
4 changed files with 69 additions and 25 deletions

View file

@ -1,14 +1,7 @@
package seaweed.hdfs; package seaweed.hdfs;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -87,6 +80,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
return new FSDataInputStream(inputStream); return new FSDataInputStream(inputStream);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null; return null;
} }
} }
@ -104,10 +98,36 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
return null; return null;
} }
} }
/**
* {@inheritDoc}
* @throws FileNotFoundException if the parent directory is not present -or
* is not a directory.
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
if (!getFileStatus(parent).isDirectory()) {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
return create(path, permission,
flags.contains(CreateFlag.OVERWRITE), bufferSize,
replication, blockSize, progress);
}
@Override @Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
@ -118,6 +138,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
return null; return null;
} }
} }

View file

@ -137,7 +137,7 @@ public class SeaweedFileSystemStore {
if (source.isRoot()) { if (source.isRoot()) {
return; return;
} }
LOG.warn("rename lookupEntry source: {}", source); LOG.warn("rename source: {} destination:{}", source, destination);
FilerProto.Entry entry = lookupEntry(source); FilerProto.Entry entry = lookupEntry(source);
if (entry == null) { if (entry == null) {
LOG.warn("rename non-existing source: {}", source); LOG.warn("rename non-existing source: {}", source);
@ -171,10 +171,10 @@ public class SeaweedFileSystemStore {
entry = FilerProto.Entry.newBuilder(); entry = FilerProto.Entry.newBuilder();
entry.mergeFrom(existingEntry); entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now); entry.getAttributesBuilder().setMtime(now);
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
replication = existingEntry.getAttributes().getReplication();
} }
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
replication = existingEntry.getAttributes().getReplication();
} }
if (entry == null) { if (entry == null) {
entry = FilerProto.Entry.newBuilder() entry = FilerProto.Entry.newBuilder()
@ -266,4 +266,5 @@ public class SeaweedFileSystemStore {
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
} }
} }

View file

@ -1,14 +1,7 @@
package seaweed.hdfs; package seaweed.hdfs;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -29,7 +22,7 @@ import java.util.Map;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { public class SeaweedFileSystem extends FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
@ -87,6 +80,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
return new FSDataInputStream(inputStream); return new FSDataInputStream(inputStream);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null; return null;
} }
} }
@ -104,10 +98,36 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
return null; return null;
} }
} }
/**
* {@inheritDoc}
* @throws FileNotFoundException if the parent directory is not present -or
* is not a directory.
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
if (!getFileStatus(parent).isDirectory()) {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
return create(path, permission,
flags.contains(CreateFlag.OVERWRITE), bufferSize,
replication, blockSize, progress);
}
@Override @Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
@ -118,6 +138,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
return null; return null;
} }
} }

View file

@ -137,7 +137,7 @@ public class SeaweedFileSystemStore {
if (source.isRoot()) { if (source.isRoot()) {
return; return;
} }
LOG.warn("rename lookupEntry source: {}", source); LOG.warn("rename source: {} destination:{}", source, destination);
FilerProto.Entry entry = lookupEntry(source); FilerProto.Entry entry = lookupEntry(source);
if (entry == null) { if (entry == null) {
LOG.warn("rename non-existing source: {}", source); LOG.warn("rename non-existing source: {}", source);
@ -171,10 +171,10 @@ public class SeaweedFileSystemStore {
entry = FilerProto.Entry.newBuilder(); entry = FilerProto.Entry.newBuilder();
entry.mergeFrom(existingEntry); entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now); entry.getAttributesBuilder().setMtime(now);
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
replication = existingEntry.getAttributes().getReplication();
} }
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
replication = existingEntry.getAttributes().getReplication();
} }
if (entry == null) { if (entry == null) {
entry = FilerProto.Entry.newBuilder() entry = FilerProto.Entry.newBuilder()
@ -266,4 +266,5 @@ public class SeaweedFileSystemStore {
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
} }
} }