put file is working

This commit is contained in:
Chris Lu 2018-12-03 01:37:29 -08:00
parent 7ace0efd65
commit 5b50182658
3 changed files with 84 additions and 26 deletions

View file

@ -9,8 +9,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
@ -21,6 +22,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
private URI uri; private URI uri;
private Path workingDirectory = new Path("/"); private Path workingDirectory = new Path("/");
private SeaweedFileSystemStore seaweedFileSystemStore; private SeaweedFileSystemStore seaweedFileSystemStore;
@ -57,6 +60,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
} }
public FSDataInputStream open(Path path, int bufferSize) throws IOException { public FSDataInputStream open(Path path, int bufferSize) throws IOException {
LOG.debug("open path: {} bufferSize:{}", path, bufferSize);
path = qualify(path); path = qualify(path);
return null; return null;
@ -64,6 +70,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize, final Progressable progress) throws IOException { final short replication, final long blockSize, final Progressable progress) throws IOException {
LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
path = qualify(path); path = qualify(path);
try { try {
@ -76,6 +85,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
} }
public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
LOG.debug("append path: {} bufferSize:{}", path, bufferSize);
path = qualify(path); path = qualify(path);
try { try {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, ""); OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, "");
@ -85,7 +97,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
} }
} }
public boolean rename(Path src, Path dst) throws IOException { public boolean rename(Path src, Path dst) {
LOG.debug("rename path: {} => {}", src, dst);
if (src.isRoot()) { if (src.isRoot()) {
return false; return false;
@ -110,14 +124,17 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
Path qualifiedDstPath = qualify(adjustedDst); Path qualifiedDstPath = qualify(adjustedDst);
seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath); seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath);
return false; return true;
} }
public boolean delete(Path path, boolean recursive) throws IOException { public boolean delete(Path path, boolean recursive) {
LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path); path = qualify(path);
FileStatus fileStatus = getFileStatus(path); FileStatus fileStatus = getFileStatus(path);
if (fileStatus == null) { if (fileStatus == null) {
return true; return true;
} }
@ -126,7 +143,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
} }
public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { public FileStatus[] listStatus(Path path) throws IOException {
LOG.debug("listStatus path: {}", path);
path = qualify(path); path = qualify(path);
@ -147,25 +166,31 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
LOG.debug("mkdirs path: {}", path);
path = qualify(path); path = qualify(path);
try {
FileStatus fileStatus = getFileStatus(path); FileStatus fileStatus = getFileStatus(path);
if (fileStatus == null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
FsPermission.getUMask(getConf()));
}
if (fileStatus.isDirectory()) { if (fileStatus.isDirectory()) {
return true; return true;
} else { } else {
throw new FileAlreadyExistsException("Path is a file: " + path); throw new FileAlreadyExistsException("Path is a file: " + path);
} }
} catch (FileNotFoundException e) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
FsPermission.getUMask(getConf()));
}
} }
public FileStatus getFileStatus(Path path) throws IOException { public FileStatus getFileStatus(Path path) {
LOG.debug("getFileStatus path: {}", path);
path = qualify(path); path = qualify(path);

View file

@ -9,7 +9,6 @@ import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
@ -27,7 +26,7 @@ public class SeaweedFileSystemStore {
filerGrpcClient = new FilerGrpcClient(host, grpcPort); filerGrpcClient = new FilerGrpcClient(host, grpcPort);
} }
static String getParentDirectory(Path path) { public static String getParentDirectory(Path path) {
return path.isRoot() ? "/" : path.getParent().toUri().getPath(); return path.isRoot() ? "/" : path.getParent().toUri().getPath();
} }
@ -76,7 +75,7 @@ public class SeaweedFileSystemStore {
for (FilerProto.Entry entry : entries) { for (FilerProto.Entry entry : entries) {
FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry); FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry);
fileStatuses.add(fileStatus); fileStatuses.add(fileStatus);
} }
@ -84,17 +83,23 @@ public class SeaweedFileSystemStore {
} }
private List<FilerProto.Entry> lookupEntries(Path path) { private List<FilerProto.Entry> lookupEntries(Path path) {
LOG.debug("listEntries path: {}", path);
return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path.toUri().getPath()) .setDirectory(path.toUri().getPath())
.setLimit(100000) .setLimit(100000)
.build()).getEntriesList(); .build()).getEntriesList();
} }
public FileStatus getFileStatus(final Path path) throws FileNotFoundException { public FileStatus getFileStatus(final Path path) {
LOG.debug("getFileStatus path: {}", path); LOG.debug("doGetFileStatus path: {}", path);
FilerProto.Entry entry = lookupEntry(path); FilerProto.Entry entry = lookupEntry(path);
FileStatus fileStatus = getFileStatus(path, entry); if (entry == null) {
return null;
}
FileStatus fileStatus = doGetFileStatus(path, entry);
return fileStatus; return fileStatus;
} }
@ -119,7 +124,7 @@ public class SeaweedFileSystemStore {
return true; return true;
} }
private FileStatus getFileStatus(Path path, FilerProto.Entry entry) { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes(); FilerProto.FuseAttributes attributes = entry.getAttributes();
long length = attributes.getFileSize(); long length = attributes.getFileSize();
boolean isDir = entry.getIsDirectory(); boolean isDir = entry.getIsDirectory();
@ -134,7 +139,7 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path); modification_time, access_time, permission, owner, group, null, path);
} }
private FilerProto.Entry lookupEntry(Path path) throws FileNotFoundException { private FilerProto.Entry lookupEntry(Path path) {
String directory = getParentDirectory(path); String directory = getParentDirectory(path);
@ -146,19 +151,31 @@ public class SeaweedFileSystemStore {
.build()); .build());
return response.getEntry(); return response.getEntry();
} catch (io.grpc.StatusRuntimeException e) { } catch (io.grpc.StatusRuntimeException e) {
throw new FileNotFoundException(e.getMessage()); return null;
} }
} }
public void rename(Path source, Path destination) throws FileNotFoundException { public void rename(Path source, Path destination) {
LOG.debug("rename source: {} destination:{}", source, destination);
if (source.isRoot()) { if (source.isRoot()) {
return; return;
} }
LOG.warn("rename lookupEntry source: {}", source);
FilerProto.Entry entry = lookupEntry(source); FilerProto.Entry entry = lookupEntry(source);
if (entry == null) {
LOG.warn("rename non-existing source: {}", source);
return;
}
LOG.warn("rename moveEntry source: {}", source);
moveEntry(source.getParent(), entry, destination); moveEntry(source.getParent(), entry, destination);
} }
private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) { private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) {
LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination);
if (entry.getIsDirectory()) { if (entry.getIsDirectory()) {
Path entryPath = new Path(oldParent, entry.getName()); Path entryPath = new Path(oldParent, entry.getName());
List<FilerProto.Entry> entries = lookupEntries(entryPath); List<FilerProto.Entry> entries = lookupEntries(entryPath);
@ -170,8 +187,10 @@ public class SeaweedFileSystemStore {
} }
} }
FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(getParentDirectory(destination)) .setDirectory(getParentDirectory(destination))
.setEntry(newEntry)
.build()); .build());
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
@ -197,23 +216,28 @@ public class SeaweedFileSystemStore {
permission.toString()); permission.toString());
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
long now = System.currentTimeMillis() / 1000L;
FilerProto.Entry.Builder entry = FilerProto.Entry.newBuilder(); FilerProto.Entry.Builder entry = null;
long writePosition = 0; long writePosition = 0;
if (!overwrite) { if (!overwrite) {
FilerProto.Entry existingEntry = lookupEntry(path); FilerProto.Entry existingEntry = lookupEntry(path);
if (existingEntry != null) { if (existingEntry != null) {
entry.mergeFrom(existingEntry); entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now);
} }
writePosition = existingEntry.getAttributes().getFileSize(); writePosition = existingEntry.getAttributes().getFileSize();
replication = existingEntry.getAttributes().getReplication(); replication = existingEntry.getAttributes().getReplication();
} }
if (entry == null) { if (entry == null) {
entry = FilerProto.Entry.newBuilder() entry = FilerProto.Entry.newBuilder()
.setName(path.getName())
.setIsDirectory(false)
.setAttributes(FilerProto.FuseAttributes.newBuilder() .setAttributes(FilerProto.FuseAttributes.newBuilder()
.setFileMode(permissionToMode(permission, false)) .setFileMode(permissionToMode(permission, false))
.setReplication(replication) .setReplication(replication)
.setCrtime(System.currentTimeMillis() / 1000L) .setCrtime(now)
.setMtime(now)
.setUserName(userGroupInformation.getUserName()) .setUserName(userGroupInformation.getUserName())
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
); );

View file

@ -7,6 +7,8 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto; import seaweedfs.client.FilerProto;
@ -24,6 +26,8 @@ import java.util.concurrent.TimeUnit;
public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities { public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
private final FilerGrpcClient filerGrpcClient; private final FilerGrpcClient filerGrpcClient;
private final Path path; private final Path path;
private final int bufferSize; private final int bufferSize;
@ -71,6 +75,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
} }
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry);
try { try {
SeaweedWrite.writeMeta(filerGrpcClient, path, entry); SeaweedWrite.writeMeta(filerGrpcClient, path, entry);
} catch (Exception ex) { } catch (Exception ex) {
@ -186,6 +193,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
return; return;
} }
LOG.debug("close path: {}", path);
try { try {
flushInternal(); flushInternal();
threadExecutor.shutdown(); threadExecutor.shutdown();
@ -293,6 +301,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
throw lastError; throw lastError;
} }
} }
LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
flushWrittenBytesToServiceInternal(position); flushWrittenBytesToServiceInternal(position);
} }