mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
add HCFS over ftp
This commit is contained in:
parent
8693cdacae
commit
6b8892c5ac
120
other/java/hdfs-over-ftp/pom.xml
Normal file
120
other/java/hdfs-over-ftp/pom.xml
Normal file
|
@ -0,0 +1,120 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>hdfs-over-ftp</groupId>
|
||||
<artifactId>hdfs-over-ftp</artifactId>
|
||||
<version>1.0</version>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.4.3</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.springfox</groupId>
|
||||
<artifactId>springfox-swagger2</artifactId>
|
||||
<version>2.9.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.springfox</groupId>
|
||||
<artifactId>springfox-swagger-ui</artifactId>
|
||||
<version>2.9.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<version>3.2.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
<version>3.2.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.ftpserver</groupId>
|
||||
<artifactId>ftpserver-core</artifactId>
|
||||
<version>1.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.chrislusf</groupId>
|
||||
<artifactId>seaweedfs-hadoop3-client</artifactId>
|
||||
<version>1.6.2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.1</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<encoding>UTF-8</encoding>
|
||||
<compilerArguments>
|
||||
<verbose />
|
||||
<bootclasspath>${java.home}/lib/rt.jar</bootclasspath>
|
||||
</compilerArguments>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>2.6</version>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>org.apache.hadoop.seaweed.ftp.ApplicationServer</mainClass>
|
||||
<addClasspath>true</addClasspath>
|
||||
<classpathPrefix>lib/</classpathPrefix>
|
||||
</manifest>
|
||||
<manifestEntries>
|
||||
<Class-Path>./</Class-Path>
|
||||
</manifestEntries>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<appendAssemblyId>false</appendAssemblyId>
|
||||
<descriptors>
|
||||
<descriptor>src/main/resources/assembly.xml</descriptor>
|
||||
</descriptors>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make-assembly</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,14 @@
|
|||
package org.apache.hadoop.seaweed.ftp;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
|
||||
@SpringBootApplication
|
||||
public class ApplicationServer {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ApplicationServer.class, args);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package org.apache.hadoop.seaweed.ftp.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import springfox.documentation.builders.ApiInfoBuilder;
|
||||
import springfox.documentation.builders.PathSelectors;
|
||||
import springfox.documentation.builders.RequestHandlerSelectors;
|
||||
import springfox.documentation.spi.DocumentationType;
|
||||
import springfox.documentation.spring.web.plugins.Docket;
|
||||
import springfox.documentation.swagger2.annotations.EnableSwagger2;
|
||||
|
||||
@Configuration
|
||||
@EnableSwagger2
|
||||
public class SwaggerConfig {
|
||||
@Bean
|
||||
public Docket createRestApi() {
|
||||
return new Docket(DocumentationType.SWAGGER_2)
|
||||
.pathMapping("/")
|
||||
.select()
|
||||
.apis(RequestHandlerSelectors.basePackage("org.apache.hadoop.seaweed.ftp"))
|
||||
.paths(PathSelectors.any())
|
||||
.build().apiInfo(new ApiInfoBuilder()
|
||||
.title("FTP API Doc")
|
||||
.version("1.0")
|
||||
.build());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package org.apache.hadoop.seaweed.ftp.controller;
|
||||
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import org.apache.hadoop.seaweed.ftp.service.HFtpService;
|
||||
import org.apache.hadoop.seaweed.ftp.controller.vo.Result;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PutMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/manager")
|
||||
@Api(tags = "FTP操作管理")
|
||||
public class FtpManagerController {
|
||||
|
||||
private static Logger log = Logger.getLogger(FtpManagerController.class);
|
||||
|
||||
@Autowired
|
||||
private HFtpService hdfsOverFtpServer;
|
||||
|
||||
@GetMapping("/status")
|
||||
@ApiOperation("查看FTP服务状态")
|
||||
public Result status() {
|
||||
Map map = new HashMap<>();
|
||||
try {
|
||||
boolean status = hdfsOverFtpServer.statusServer();
|
||||
map.put("is_running", status);
|
||||
return new Result(true, map, "FTP 服务状态获取成功");
|
||||
}catch (Exception e) {
|
||||
log.error(e);
|
||||
map.put("is_running", false);
|
||||
return new Result(true, map, "FTP 服务状态获取成功");
|
||||
}
|
||||
}
|
||||
|
||||
@PutMapping("/start")
|
||||
@ApiOperation("启动FTP服务")
|
||||
public Result start() {
|
||||
try {
|
||||
boolean status = hdfsOverFtpServer.statusServer();
|
||||
if(!status) {
|
||||
hdfsOverFtpServer.startServer();
|
||||
}
|
||||
return new Result(true, "FTP 服务启动成功");
|
||||
}catch (Exception e) {
|
||||
log.error(e);
|
||||
return new Result(false, "FTP 服务启动失败");
|
||||
}
|
||||
}
|
||||
|
||||
@PutMapping("/stop")
|
||||
@ApiOperation("停止FTP服务")
|
||||
public Result stop() {
|
||||
try {
|
||||
boolean status = hdfsOverFtpServer.statusServer();
|
||||
if(status) {
|
||||
hdfsOverFtpServer.stopServer();
|
||||
}
|
||||
return new Result(true, "FTP 服务停止成功");
|
||||
}catch (Exception e) {
|
||||
log.error(e);
|
||||
return new Result(false, "FTP 服务停止失败");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
package org.apache.hadoop.seaweed.ftp.controller;
|
||||
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import org.apache.ftpserver.ftplet.User;
|
||||
import org.apache.ftpserver.usermanager.Md5PasswordEncryptor;
|
||||
import org.apache.ftpserver.usermanager.UserFactory;
|
||||
import org.apache.hadoop.seaweed.ftp.controller.vo.FtpUser;
|
||||
import org.apache.hadoop.seaweed.ftp.controller.vo.Result;
|
||||
import org.apache.hadoop.seaweed.ftp.users.HdfsUserManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/user")
|
||||
@Api(tags = "FTP用户管理")
|
||||
public class UserController {
|
||||
|
||||
private static Logger log = Logger.getLogger(UserController.class);
|
||||
|
||||
/***
|
||||
* {
|
||||
* "name": "test",
|
||||
* "password": "test",
|
||||
* "homeDirectory": "/buckets/test/"
|
||||
* }
|
||||
* @param ftpUser
|
||||
* @return
|
||||
*/
|
||||
@PostMapping("/add")
|
||||
@ApiOperation("新增/编辑用户")
|
||||
public Result add(@RequestBody FtpUser ftpUser) {
|
||||
try {
|
||||
HdfsUserManager userManagerFactory = new HdfsUserManager();
|
||||
userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
|
||||
userManagerFactory.setPasswordEncryptor(new Md5PasswordEncryptor());
|
||||
|
||||
UserFactory userFactory = new UserFactory();
|
||||
userFactory.setHomeDirectory(ftpUser.getHomeDirectory());
|
||||
userFactory.setName(ftpUser.getName());
|
||||
userFactory.setPassword(ftpUser.getPassword());
|
||||
userFactory.setEnabled(ftpUser.isEnabled());
|
||||
userFactory.setMaxIdleTime(ftpUser.getMaxIdleTime());
|
||||
|
||||
User user = userFactory.createUser();
|
||||
userManagerFactory.save(user, ftpUser.isRenamePush());
|
||||
return new Result(true, "新建用户成功");
|
||||
}catch (Exception e) {
|
||||
log.error(e);
|
||||
return new Result(false, "新建用户失败");
|
||||
}
|
||||
}
|
||||
|
||||
@DeleteMapping("/delete/{user}")
|
||||
@ApiOperation("删除用户")
|
||||
public Result delete(@PathVariable(value = "user") String user) {
|
||||
try {
|
||||
HdfsUserManager userManagerFactory = new HdfsUserManager();
|
||||
userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
|
||||
userManagerFactory.delete(user);
|
||||
return new Result(true, "删除用户成功");
|
||||
}catch (Exception e) {
|
||||
log.error(e);
|
||||
return new Result(false, "删除用户失败");
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("/show/{userName}")
|
||||
@ApiOperation("查看用户")
|
||||
public Result show(@PathVariable(value = "userName") String userName) {
|
||||
try {
|
||||
HdfsUserManager userManagerFactory = new HdfsUserManager();
|
||||
userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
|
||||
User user = userManagerFactory.getUserByName(userName);
|
||||
FtpUser ftpUser = new FtpUser(user.getHomeDirectory(), user.getPassword(), user.getEnabled(), user.getName(), user.getMaxIdleTime(), HdfsUserManager.getUserRenamePush(userName));
|
||||
return new Result(true, ftpUser, "获取用户信息成功");
|
||||
}catch (Exception e) {
|
||||
log.error(e);
|
||||
return new Result(false, "获取用户信息失败");
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("/list")
|
||||
@ApiOperation("列举用户")
|
||||
public Result list() {
|
||||
try {
|
||||
HdfsUserManager userManagerFactory = new HdfsUserManager();
|
||||
userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
|
||||
String[] allUserNames = userManagerFactory.getAllUserNames();
|
||||
return new Result(true, allUserNames, "列举用户成功");
|
||||
}catch (Exception e) {
|
||||
log.error(e);
|
||||
return new Result(false, "列举用户失败");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package org.apache.hadoop.seaweed.ftp.controller.vo;
|
||||
|
||||
public class FtpUser {
|
||||
|
||||
private String homeDirectory;
|
||||
private String password;
|
||||
private boolean enabled;
|
||||
private String name;
|
||||
private int maxIdleTime;
|
||||
private boolean renamePush;
|
||||
|
||||
public FtpUser() {
|
||||
}
|
||||
|
||||
public FtpUser(String homeDirectory, String password, boolean enabled, String name, int maxIdleTime, boolean renamePush) {
|
||||
this.homeDirectory = homeDirectory;
|
||||
this.password = password;
|
||||
this.enabled = enabled;
|
||||
this.name = name;
|
||||
this.maxIdleTime = maxIdleTime;
|
||||
this.renamePush = renamePush;
|
||||
}
|
||||
|
||||
public String getHomeDirectory() {
|
||||
return homeDirectory;
|
||||
}
|
||||
|
||||
public void setHomeDirectory(String homeDirectory) {
|
||||
this.homeDirectory = homeDirectory;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public int getMaxIdleTime() {
|
||||
return maxIdleTime;
|
||||
}
|
||||
|
||||
public void setMaxIdleTime(int maxIdleTime) {
|
||||
this.maxIdleTime = maxIdleTime;
|
||||
}
|
||||
|
||||
public boolean isRenamePush() {
|
||||
return renamePush;
|
||||
}
|
||||
|
||||
public void setRenamePush(boolean renamePush) {
|
||||
this.renamePush = renamePush;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package org.apache.hadoop.seaweed.ftp.controller.vo;
|
||||
|
||||
public class Result {
|
||||
|
||||
private boolean status;
|
||||
private Object data;
|
||||
private String message;
|
||||
|
||||
public Result(boolean status, String message) {
|
||||
this.status = status;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public Result(boolean status, Object data, String message) {
|
||||
this.status = status;
|
||||
this.message = message;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public boolean isStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(boolean status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public Object getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public void setData(Object data) {
|
||||
this.data = data;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
package org.apache.hadoop.seaweed.ftp.service;
|
||||
|
||||
import org.apache.ftpserver.DataConnectionConfiguration;
|
||||
import org.apache.ftpserver.DataConnectionConfigurationFactory;
|
||||
import org.apache.ftpserver.FtpServer;
|
||||
import org.apache.ftpserver.FtpServerFactory;
|
||||
import org.apache.ftpserver.command.CommandFactoryFactory;
|
||||
import org.apache.ftpserver.listener.ListenerFactory;
|
||||
import org.apache.hadoop.seaweed.ftp.service.filesystem.HdfsFileSystemManager;
|
||||
import org.apache.hadoop.seaweed.ftp.service.filesystem.HdfsOverFtpSystem;
|
||||
import org.apache.hadoop.seaweed.ftp.users.HdfsUserManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
* reference: https://github.com/iponweb/hdfs-over-ftp
|
||||
*/
|
||||
@Component
|
||||
public class HFtpService {
|
||||
|
||||
private static Logger log = Logger.getLogger(HFtpService.class);
|
||||
|
||||
@Value("${ftp.port}")
|
||||
private int port = 0;
|
||||
|
||||
@Value("${ftp.passive-address}")
|
||||
private String passiveAddress;
|
||||
|
||||
@Value("${ftp.passive-ports}")
|
||||
private String passivePorts;
|
||||
|
||||
@Value("${hdfs.uri}")
|
||||
private String hdfsUri;
|
||||
|
||||
@Value("${seaweedFs.enable}")
|
||||
private boolean seaweedFsEnable;
|
||||
|
||||
@Value("${seaweedFs.access}")
|
||||
private String seaweedFsAccess;
|
||||
|
||||
@Value("${seaweedFs.replication}")
|
||||
private String seaweedFsReplication;
|
||||
|
||||
private FtpServer ftpServer = null;
|
||||
|
||||
public void startServer() throws Exception {
|
||||
log.info("Starting HDFS-Over-Ftp server. port: " + port + " passive-address: " + passiveAddress + " passive-ports: " + passivePorts + " hdfs-uri: " + hdfsUri);
|
||||
|
||||
HdfsOverFtpSystem.setHdfsUri(hdfsUri);
|
||||
HdfsOverFtpSystem.setSeaweedFsEnable(seaweedFsEnable);
|
||||
HdfsOverFtpSystem.setSeaweedFsAccess(seaweedFsAccess);
|
||||
HdfsOverFtpSystem.setSeaweedFsReplication(seaweedFsReplication);
|
||||
|
||||
FtpServerFactory server = new FtpServerFactory();
|
||||
server.setFileSystem(new HdfsFileSystemManager());
|
||||
|
||||
ListenerFactory factory = new ListenerFactory();
|
||||
factory.setPort(port);
|
||||
|
||||
DataConnectionConfigurationFactory dccFactory = new DataConnectionConfigurationFactory();
|
||||
dccFactory.setPassiveAddress("0.0.0.0");
|
||||
dccFactory.setPassivePorts(passivePorts);
|
||||
dccFactory.setPassiveExternalAddress(passiveAddress);
|
||||
DataConnectionConfiguration dcc = dccFactory.createDataConnectionConfiguration();
|
||||
factory.setDataConnectionConfiguration(dcc);
|
||||
|
||||
server.addListener("default", factory.createListener());
|
||||
|
||||
HdfsUserManager userManager = new HdfsUserManager();
|
||||
final File file = loadResource("/users.properties");
|
||||
userManager.setFile(file);
|
||||
server.setUserManager(userManager);
|
||||
|
||||
CommandFactoryFactory cmFact = new CommandFactoryFactory();
|
||||
cmFact.setUseDefaultCommands(true);
|
||||
server.setCommandFactory(cmFact.createCommandFactory());
|
||||
|
||||
// start the server
|
||||
ftpServer = server.createServer();
|
||||
ftpServer.start();
|
||||
}
|
||||
|
||||
public void stopServer() {
|
||||
log.info("Stopping Hdfs-Over-Ftp server. port: " + port + " passive-address: " + passiveAddress + " passive-ports: " + passivePorts + " hdfs-uri: " + hdfsUri);
|
||||
ftpServer.stop();
|
||||
}
|
||||
|
||||
public boolean statusServer() {
|
||||
try {
|
||||
return !ftpServer.isStopped();
|
||||
}catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static File loadResource(String resourceName) {
|
||||
return new File(System.getProperty("user.dir") + resourceName);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,333 @@
|
|||
package org.apache.hadoop.seaweed.ftp.service.filesystem;
|
||||
|
||||
import org.apache.ftpserver.ftplet.FtpFile;
|
||||
import org.apache.ftpserver.ftplet.User;
|
||||
import org.apache.hadoop.fs.*;
|
||||
import org.apache.hadoop.seaweed.ftp.users.HdfsUser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This class implements all actions to HDFS
|
||||
*/
|
||||
public class HdfsFileObject implements FtpFile {
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(HdfsFileObject.class);
|
||||
|
||||
private Path homePath;
|
||||
private Path path;
|
||||
private Path fullPath;
|
||||
private HdfsUser user;
|
||||
|
||||
/**
|
||||
* Constructs HdfsFileObject from path
|
||||
*
|
||||
* @param path path to represent object
|
||||
* @param user accessor of the object
|
||||
*/
|
||||
public HdfsFileObject(String homePath, String path, User user) {
|
||||
this.homePath = new Path(homePath);
|
||||
this.path = new Path(path);
|
||||
this.fullPath = new Path(homePath + path);
|
||||
this.user = (HdfsUser) user;
|
||||
}
|
||||
|
||||
public String getAbsolutePath() {
|
||||
// strip the last '/' if necessary
|
||||
String fullName = path.toString();
|
||||
int filelen = fullName.length();
|
||||
if ((filelen != 1) && (fullName.charAt(filelen - 1) == '/')) {
|
||||
fullName = fullName.substring(0, filelen - 1);
|
||||
}
|
||||
|
||||
return fullName;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return path.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* HDFS has no hidden objects
|
||||
*
|
||||
* @return always false
|
||||
*/
|
||||
public boolean isHidden() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the object is a directory
|
||||
*
|
||||
* @return true if the object is a directory
|
||||
*/
|
||||
public boolean isDirectory() {
|
||||
try {
|
||||
log.debug("is directory? : " + fullPath);
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
FileStatus fs = dfs.getFileStatus(fullPath);
|
||||
return fs.isDir();
|
||||
} catch (IOException e) {
|
||||
log.debug(fullPath + " is not dir", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the object is a file
|
||||
*
|
||||
* @return true if the object is a file
|
||||
*/
|
||||
public boolean isFile() {
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
return dfs.isFile(fullPath);
|
||||
} catch (IOException e) {
|
||||
log.debug(fullPath + " is not file", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the object does exist
|
||||
*
|
||||
* @return true if the object does exist
|
||||
*/
|
||||
public boolean doesExist() {
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
dfs.getFileStatus(fullPath);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
// log.debug(path + " does not exist", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isReadable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isWritable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isRemovable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get owner of the object
|
||||
*
|
||||
* @return owner of the object
|
||||
*/
|
||||
public String getOwnerName() {
|
||||
return "root";
|
||||
/*
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
FileStatus fs = dfs.getFileStatus(fullPath);
|
||||
String owner = fs.getOwner();
|
||||
if(owner.length() == 0) {
|
||||
return "root";
|
||||
}
|
||||
return owner;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/**
|
||||
* Get group of the object
|
||||
*
|
||||
* @return group of the object
|
||||
*/
|
||||
public String getGroupName() {
|
||||
return "root";
|
||||
/*
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
FileStatus fs = dfs.getFileStatus(fullPath);
|
||||
String group = fs.getGroup();
|
||||
if(group.length() == 0) {
|
||||
return "root";
|
||||
}
|
||||
return group;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/**
|
||||
* Get link count
|
||||
*
|
||||
* @return 3 is for a directory and 1 is for a file
|
||||
*/
|
||||
public int getLinkCount() {
|
||||
return isDirectory() ? 3 : 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get last modification date
|
||||
*
|
||||
* @return last modification date as a long
|
||||
*/
|
||||
public long getLastModified() {
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
FileStatus fs = dfs.getFileStatus(fullPath);
|
||||
return fs.getModificationTime();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean setLastModified(long l) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a size of the object
|
||||
*
|
||||
* @return size of the object in bytes
|
||||
*/
|
||||
public long getSize() {
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
FileStatus fs = dfs.getFileStatus(fullPath);
|
||||
log.debug("getSize(): " + fullPath + " : " + fs.getLen());
|
||||
return fs.getLen();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public Object getPhysicalFile() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new dir from the object
|
||||
*
|
||||
* @return true if dir is created
|
||||
*/
|
||||
public boolean mkdir() {
|
||||
try {
|
||||
FileSystem fs = HdfsOverFtpSystem.getDfs();
|
||||
fs.mkdirs(fullPath);
|
||||
// fs.setOwner(path, user.getName(), user.getMainGroup());
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete object from the HDFS filesystem
|
||||
*
|
||||
* @return true if the object is deleted
|
||||
*/
|
||||
public boolean delete() {
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
dfs.delete(fullPath, true);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean move(FtpFile ftpFile) {
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
dfs.rename(fullPath, new Path(fullPath.getParent() + File.separator + ftpFile.getName()));
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* List files of the directory
|
||||
*
|
||||
* @return List of files in the directory
|
||||
*/
|
||||
public List<FtpFile> listFiles() {
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
FileStatus fileStats[] = dfs.listStatus(fullPath);
|
||||
|
||||
// get the virtual name of the base directory
|
||||
String virtualFileStr = getAbsolutePath();
|
||||
if (virtualFileStr.charAt(virtualFileStr.length() - 1) != '/') {
|
||||
virtualFileStr += '/';
|
||||
}
|
||||
|
||||
FtpFile[] virtualFiles = new FtpFile[fileStats.length];
|
||||
for (int i = 0; i < fileStats.length; i++) {
|
||||
File fileObj = new File(fileStats[i].getPath().toString());
|
||||
String fileName = virtualFileStr + fileObj.getName();
|
||||
virtualFiles[i] = new HdfsFileObject(homePath.toString(), fileName, user);
|
||||
}
|
||||
return Collections.unmodifiableList(Arrays.asList(virtualFiles));
|
||||
} catch (IOException e) {
|
||||
log.debug("", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates output stream to write to the object
|
||||
*
|
||||
* @param l is not used here
|
||||
* @return OutputStream
|
||||
* @throws IOException
|
||||
*/
|
||||
public OutputStream createOutputStream(long l) {
|
||||
try {
|
||||
FileSystem fs = HdfsOverFtpSystem.getDfs();
|
||||
FSDataOutputStream out = fs.create(fullPath);
|
||||
// fs.setOwner(fullPath, user.getName(), user.getMainGroup());
|
||||
return out;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates input stream to read from the object
|
||||
*
|
||||
* @param l is not used here
|
||||
* @return OutputStream
|
||||
* @throws IOException
|
||||
*/
|
||||
public InputStream createInputStream(long l) {
|
||||
try {
|
||||
FileSystem dfs = HdfsOverFtpSystem.getDfs();
|
||||
FSDataInputStream in = dfs.open(fullPath);
|
||||
return in;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package org.apache.hadoop.seaweed.ftp.service.filesystem;
|
||||
|
||||
import org.apache.ftpserver.ftplet.FileSystemFactory;
|
||||
import org.apache.ftpserver.ftplet.FileSystemView;
|
||||
import org.apache.ftpserver.ftplet.User;
|
||||
|
||||
/**
|
||||
* Impelented FileSystemManager to use HdfsFileSystemView
|
||||
*/
|
||||
public class HdfsFileSystemManager implements FileSystemFactory {
|
||||
public FileSystemView createFileSystemView(User user) {
|
||||
return new HdfsFileSystemView(user);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
package org.apache.hadoop.seaweed.ftp.service.filesystem;
|
||||
|
||||
import org.apache.ftpserver.ftplet.FileSystemView;
|
||||
import org.apache.ftpserver.ftplet.FtpFile;
|
||||
import org.apache.ftpserver.ftplet.User;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
* Implemented FileSystemView to use HdfsFileObject
|
||||
*/
|
||||
public class HdfsFileSystemView implements FileSystemView {
|
||||
|
||||
private String homePath;
|
||||
private String currPath = File.separator;
|
||||
private User user;
|
||||
|
||||
/**
|
||||
* Constructor - set the user object.
|
||||
*/
|
||||
protected HdfsFileSystemView(User user) {
|
||||
if (user == null) {
|
||||
throw new IllegalArgumentException("user can not be null");
|
||||
}
|
||||
if (user.getHomeDirectory() == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"User home directory can not be null");
|
||||
}
|
||||
|
||||
this.homePath = user.getHomeDirectory();
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
public FtpFile getHomeDirectory() {
|
||||
return new HdfsFileObject(homePath, File.separator, user);
|
||||
}
|
||||
|
||||
public FtpFile getWorkingDirectory() {
|
||||
FtpFile fileObj;
|
||||
if (currPath.equals(File.separator)) {
|
||||
fileObj = new HdfsFileObject(homePath, File.separator, user);
|
||||
} else {
|
||||
fileObj = new HdfsFileObject(homePath, currPath, user);
|
||||
|
||||
}
|
||||
return fileObj;
|
||||
}
|
||||
|
||||
public boolean changeWorkingDirectory(String dir) {
|
||||
|
||||
Path path;
|
||||
if (dir.startsWith(File.separator) || new Path(currPath).equals(new Path(dir))) {
|
||||
path = new Path(dir);
|
||||
} else if (currPath.length() > 1) {
|
||||
path = new Path(currPath + File.separator + dir);
|
||||
} else {
|
||||
if(dir.startsWith("/")) {
|
||||
path = new Path(dir);
|
||||
}else {
|
||||
path = new Path(File.separator + dir);
|
||||
}
|
||||
}
|
||||
|
||||
// 防止退回根目录
|
||||
if (path.getName().equals("..")) {
|
||||
path = new Path(File.separator);
|
||||
}
|
||||
|
||||
HdfsFileObject file = new HdfsFileObject(homePath, path.toString(), user);
|
||||
if (file.isDirectory()) {
|
||||
currPath = path.toString();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public FtpFile getFile(String file) {
|
||||
String path;
|
||||
if (file.startsWith(File.separator)) {
|
||||
path = file;
|
||||
} else if (currPath.length() > 1) {
|
||||
path = currPath + File.separator + file;
|
||||
} else {
|
||||
path = File.separator + file;
|
||||
}
|
||||
return new HdfsFileObject(homePath, path, user);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the file content random accessible?
|
||||
*/
|
||||
public boolean isRandomAccessible() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose file system view - does nothing.
|
||||
*/
|
||||
public void dispose() {
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package org.apache.hadoop.seaweed.ftp.service.filesystem;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Class to store DFS connection
|
||||
*/
|
||||
public class HdfsOverFtpSystem {
|
||||
|
||||
private static FileSystem fs = null;
|
||||
|
||||
private static String hdfsUri;
|
||||
|
||||
private static boolean seaweedFsEnable;
|
||||
|
||||
private static String seaweedFsAccess;
|
||||
|
||||
private static String seaweedFsReplication;
|
||||
|
||||
private final static Logger log = LoggerFactory.getLogger(HdfsOverFtpSystem.class);
|
||||
|
||||
private static void hdfsInit() throws IOException {
|
||||
Configuration configuration = new Configuration();
|
||||
|
||||
configuration.set("fs.defaultFS", hdfsUri);
|
||||
if(seaweedFsEnable) {
|
||||
configuration.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
|
||||
configuration.set("fs.seaweed.volume.server.access", seaweedFsAccess);
|
||||
configuration.set("fs.seaweed.replication", seaweedFsReplication);
|
||||
}
|
||||
fs = FileSystem.get(configuration);
|
||||
log.info("HDFS load success");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get dfs
|
||||
*
|
||||
* @return dfs
|
||||
* @throws IOException
|
||||
*/
|
||||
public static FileSystem getDfs() throws IOException {
|
||||
if (fs == null) {
|
||||
hdfsInit();
|
||||
}
|
||||
return fs;
|
||||
}
|
||||
|
||||
public static void setHdfsUri(String hdfsUri) {
|
||||
HdfsOverFtpSystem.hdfsUri = hdfsUri;
|
||||
}
|
||||
|
||||
public static String getHdfsUri() {
|
||||
return hdfsUri;
|
||||
}
|
||||
|
||||
public static void setSeaweedFsEnable(boolean seaweedFsEnable) {
|
||||
HdfsOverFtpSystem.seaweedFsEnable = seaweedFsEnable;
|
||||
}
|
||||
|
||||
public static void setSeaweedFsAccess(String seaweedFsAccess) {
|
||||
HdfsOverFtpSystem.seaweedFsAccess = seaweedFsAccess;
|
||||
}
|
||||
|
||||
public static void setSeaweedFsReplication(String seaweedFsReplication) {
|
||||
HdfsOverFtpSystem.seaweedFsReplication = seaweedFsReplication;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,239 @@
|
|||
package org.apache.hadoop.seaweed.ftp.users;
|
||||
|
||||
import org.apache.ftpserver.ftplet.Authority;
|
||||
import org.apache.ftpserver.ftplet.AuthorizationRequest;
|
||||
import org.apache.ftpserver.ftplet.User;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class HdfsUser implements User, Serializable {
|
||||
|
||||
private static final long serialVersionUID = -47371353779731294L;
|
||||
|
||||
private String name = null;
|
||||
|
||||
private String password = null;
|
||||
|
||||
private int maxIdleTimeSec = 0; // no limit
|
||||
|
||||
private String homeDir = null;
|
||||
|
||||
private boolean isEnabled = true;
|
||||
|
||||
private List<? extends Authority> authorities = new ArrayList<Authority>();
|
||||
|
||||
private ArrayList<String> groups = new ArrayList<String>();
|
||||
|
||||
private Logger log = Logger.getLogger(HdfsUser.class);
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public HdfsUser() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy constructor.
|
||||
*/
|
||||
public HdfsUser(User user) {
|
||||
name = user.getName();
|
||||
password = user.getPassword();
|
||||
authorities = user.getAuthorities();
|
||||
maxIdleTimeSec = user.getMaxIdleTime();
|
||||
homeDir = user.getHomeDirectory();
|
||||
isEnabled = user.getEnabled();
|
||||
}
|
||||
|
||||
public ArrayList<String> getGroups() {
|
||||
return groups;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the main group of the user
|
||||
*
|
||||
* @return main group of the user
|
||||
*/
|
||||
public String getMainGroup() {
|
||||
if (groups.size() > 0) {
|
||||
return groups.get(0);
|
||||
} else {
|
||||
log.error("User " + name + " is not a memer of any group");
|
||||
return "error";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if user is a member of the group
|
||||
*
|
||||
* @param group to check
|
||||
* @return true if the user id a member of the group
|
||||
*/
|
||||
public boolean isGroupMember(String group) {
|
||||
for (String userGroup : groups) {
|
||||
if (userGroup.equals(group)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set users' groups
|
||||
*
|
||||
* @param groups to set
|
||||
*/
|
||||
public void setGroups(ArrayList<String> groups) {
|
||||
if (groups.size() < 1) {
|
||||
log.error("User " + name + " is not a memer of any group");
|
||||
}
|
||||
this.groups = groups;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the user name.
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set user name.
|
||||
*/
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the user password.
|
||||
*/
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set user password.
|
||||
*/
|
||||
public void setPassword(String pass) {
|
||||
password = pass;
|
||||
}
|
||||
|
||||
public List<Authority> getAuthorities() {
|
||||
if (authorities != null) {
|
||||
return Collections.unmodifiableList(authorities);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setAuthorities(List<Authority> authorities) {
|
||||
if (authorities != null) {
|
||||
this.authorities = Collections.unmodifiableList(authorities);
|
||||
} else {
|
||||
this.authorities = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the maximum idle time in second.
|
||||
*/
|
||||
public int getMaxIdleTime() {
|
||||
return maxIdleTimeSec;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the maximum idle time in second.
|
||||
*/
|
||||
public void setMaxIdleTime(int idleSec) {
|
||||
maxIdleTimeSec = idleSec;
|
||||
if (maxIdleTimeSec < 0) {
|
||||
maxIdleTimeSec = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the user enable status.
|
||||
*/
|
||||
public boolean getEnabled() {
|
||||
return isEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the user enable status.
|
||||
*/
|
||||
public void setEnabled(boolean enb) {
|
||||
isEnabled = enb;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the user home directory.
|
||||
*/
|
||||
public String getHomeDirectory() {
|
||||
return homeDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the user home directory.
|
||||
*/
|
||||
public void setHomeDirectory(String home) {
|
||||
homeDir = home;
|
||||
}
|
||||
|
||||
/**
|
||||
* String representation.
|
||||
*/
|
||||
public String toString() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public AuthorizationRequest authorize(AuthorizationRequest request) {
|
||||
List<Authority> authorities = getAuthorities();
|
||||
|
||||
// check for no authorities at all
|
||||
if (authorities == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
boolean someoneCouldAuthorize = false;
|
||||
for (Authority authority : authorities) {
|
||||
if (authority.canAuthorize(request)) {
|
||||
someoneCouldAuthorize = true;
|
||||
|
||||
request = authority.authorize(request);
|
||||
|
||||
// authorization failed, return null
|
||||
if (request == null) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (someoneCouldAuthorize) {
|
||||
return request;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public List<Authority> getAuthorities(Class<? extends Authority> clazz) {
|
||||
List<Authority> selected = new ArrayList<Authority>();
|
||||
|
||||
for (Authority authority : authorities) {
|
||||
if (authority.getClass().equals(clazz)) {
|
||||
selected.add(authority);
|
||||
}
|
||||
}
|
||||
|
||||
return selected;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,453 @@
|
|||
package org.apache.hadoop.seaweed.ftp.users;
|
||||
|
||||
import org.apache.ftpserver.FtpServerConfigurationException;
|
||||
import org.apache.ftpserver.ftplet.*;
|
||||
import org.apache.ftpserver.usermanager.*;
|
||||
import org.apache.ftpserver.usermanager.impl.*;
|
||||
import org.apache.ftpserver.util.BaseProperties;
|
||||
import org.apache.ftpserver.util.IoUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
public class HdfsUserManager extends AbstractUserManager {
|
||||
|
||||
private final Logger LOG = LoggerFactory
|
||||
.getLogger(HdfsUserManager.class);
|
||||
|
||||
private final static String DEPRECATED_PREFIX = "FtpServer.user.";
|
||||
|
||||
private final static String PREFIX = "ftpserver.user.";
|
||||
|
||||
private static BaseProperties userDataProp;
|
||||
|
||||
private File userDataFile = new File("users.conf");
|
||||
|
||||
private boolean isConfigured = false;
|
||||
|
||||
private PasswordEncryptor passwordEncryptor = new Md5PasswordEncryptor();
|
||||
|
||||
|
||||
/**
|
||||
* Retrieve the file used to load and store users
|
||||
*
|
||||
* @return The file
|
||||
*/
|
||||
public File getFile() {
|
||||
return userDataFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the file used to store and read users. Must be set before
|
||||
* {@link #configure()} is called.
|
||||
*
|
||||
* @param propFile A file containing users
|
||||
*/
|
||||
public void setFile(File propFile) {
|
||||
if (isConfigured) {
|
||||
throw new IllegalStateException("Must be called before configure()");
|
||||
}
|
||||
|
||||
this.userDataFile = propFile;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Retrieve the password encryptor used for this user manager
|
||||
*
|
||||
* @return The password encryptor. Default to {@link Md5PasswordEncryptor}
|
||||
* if no other has been provided
|
||||
*/
|
||||
public PasswordEncryptor getPasswordEncryptor() {
|
||||
return passwordEncryptor;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the password encryptor to use for this user manager
|
||||
*
|
||||
* @param passwordEncryptor The password encryptor
|
||||
*/
|
||||
public void setPasswordEncryptor(PasswordEncryptor passwordEncryptor) {
|
||||
this.passwordEncryptor = passwordEncryptor;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Lazy init the user manager
|
||||
*/
|
||||
private void lazyInit() {
|
||||
if (!isConfigured) {
|
||||
configure();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure user manager.
|
||||
*/
|
||||
public void configure() {
|
||||
isConfigured = true;
|
||||
try {
|
||||
userDataProp = new BaseProperties();
|
||||
|
||||
if (userDataFile != null && userDataFile.exists()) {
|
||||
FileInputStream fis = null;
|
||||
try {
|
||||
fis = new FileInputStream(userDataFile);
|
||||
userDataProp.load(fis);
|
||||
} finally {
|
||||
IoUtils.close(fis);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new FtpServerConfigurationException(
|
||||
"Error loading user data file : "
|
||||
+ userDataFile.getAbsolutePath(), e);
|
||||
}
|
||||
|
||||
convertDeprecatedPropertyNames();
|
||||
}
|
||||
|
||||
private void convertDeprecatedPropertyNames() {
|
||||
Enumeration<?> keys = userDataProp.propertyNames();
|
||||
|
||||
boolean doSave = false;
|
||||
|
||||
while (keys.hasMoreElements()) {
|
||||
String key = (String) keys.nextElement();
|
||||
|
||||
if (key.startsWith(DEPRECATED_PREFIX)) {
|
||||
String newKey = PREFIX
|
||||
+ key.substring(DEPRECATED_PREFIX.length());
|
||||
userDataProp.setProperty(newKey, userDataProp.getProperty(key));
|
||||
userDataProp.remove(key);
|
||||
|
||||
doSave = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (doSave) {
|
||||
try {
|
||||
saveUserData();
|
||||
} catch (FtpException e) {
|
||||
throw new FtpServerConfigurationException(
|
||||
"Failed to save updated user data", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void save(User usr, boolean renamePush) throws FtpException {
|
||||
lazyInit();
|
||||
userDataProp.setProperty(PREFIX + usr.getName() + ".rename.push", renamePush);
|
||||
save(usr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Save user data. Store the properties.
|
||||
*/
|
||||
public synchronized void save(User usr) throws FtpException {
|
||||
lazyInit();
|
||||
|
||||
// null value check
|
||||
if (usr.getName() == null) {
|
||||
throw new NullPointerException("User name is null.");
|
||||
}
|
||||
String thisPrefix = PREFIX + usr.getName() + '.';
|
||||
|
||||
// set other properties
|
||||
userDataProp.setProperty(thisPrefix + ATTR_PASSWORD, getPassword(usr));
|
||||
|
||||
String home = usr.getHomeDirectory();
|
||||
if (home == null) {
|
||||
home = "/";
|
||||
}
|
||||
userDataProp.setProperty(thisPrefix + ATTR_HOME, home);
|
||||
userDataProp.setProperty(thisPrefix + ATTR_ENABLE, usr.getEnabled());
|
||||
userDataProp.setProperty(thisPrefix + ATTR_WRITE_PERM, usr
|
||||
.authorize(new WriteRequest()) != null);
|
||||
userDataProp.setProperty(thisPrefix + ATTR_MAX_IDLE_TIME, usr
|
||||
.getMaxIdleTime());
|
||||
|
||||
TransferRateRequest transferRateRequest = new TransferRateRequest();
|
||||
transferRateRequest = (TransferRateRequest) usr
|
||||
.authorize(transferRateRequest);
|
||||
|
||||
if (transferRateRequest != null) {
|
||||
userDataProp.setProperty(thisPrefix + ATTR_MAX_UPLOAD_RATE,
|
||||
transferRateRequest.getMaxUploadRate());
|
||||
userDataProp.setProperty(thisPrefix + ATTR_MAX_DOWNLOAD_RATE,
|
||||
transferRateRequest.getMaxDownloadRate());
|
||||
} else {
|
||||
userDataProp.remove(thisPrefix + ATTR_MAX_UPLOAD_RATE);
|
||||
userDataProp.remove(thisPrefix + ATTR_MAX_DOWNLOAD_RATE);
|
||||
}
|
||||
|
||||
// request that always will succeed
|
||||
ConcurrentLoginRequest concurrentLoginRequest = new ConcurrentLoginRequest(
|
||||
0, 0);
|
||||
concurrentLoginRequest = (ConcurrentLoginRequest) usr
|
||||
.authorize(concurrentLoginRequest);
|
||||
|
||||
if (concurrentLoginRequest != null) {
|
||||
userDataProp.setProperty(thisPrefix + ATTR_MAX_LOGIN_NUMBER,
|
||||
concurrentLoginRequest.getMaxConcurrentLogins());
|
||||
userDataProp.setProperty(thisPrefix + ATTR_MAX_LOGIN_PER_IP,
|
||||
concurrentLoginRequest.getMaxConcurrentLoginsPerIP());
|
||||
} else {
|
||||
userDataProp.remove(thisPrefix + ATTR_MAX_LOGIN_NUMBER);
|
||||
userDataProp.remove(thisPrefix + ATTR_MAX_LOGIN_PER_IP);
|
||||
}
|
||||
|
||||
saveUserData();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws FtpException
|
||||
*/
|
||||
private void saveUserData() throws FtpException {
|
||||
File dir = userDataFile.getAbsoluteFile().getParentFile();
|
||||
if (dir != null && !dir.exists() && !dir.mkdirs()) {
|
||||
String dirName = dir.getAbsolutePath();
|
||||
throw new FtpServerConfigurationException(
|
||||
"Cannot create directory for user data file : " + dirName);
|
||||
}
|
||||
|
||||
// save user data
|
||||
FileOutputStream fos = null;
|
||||
try {
|
||||
fos = new FileOutputStream(userDataFile);
|
||||
userDataProp.store(fos, "Generated file - don't edit (please)");
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Failed saving user data", ex);
|
||||
throw new FtpException("Failed saving user data", ex);
|
||||
} finally {
|
||||
IoUtils.close(fos);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public synchronized void list() throws FtpException {
|
||||
lazyInit();
|
||||
|
||||
Map dataMap = new HashMap();
|
||||
Enumeration<String> propNames = (Enumeration<String>) userDataProp.propertyNames();
|
||||
ArrayList<String> a = Collections.list(propNames);
|
||||
a.remove("i18nMap");//去除i18nMap
|
||||
for(String attrName : a){
|
||||
// dataMap.put(attrName, propNames.);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete an user. Removes all this user entries from the properties. After
|
||||
* removing the corresponding from the properties, save the data.
|
||||
*/
|
||||
public synchronized void delete(String usrName) throws FtpException {
|
||||
lazyInit();
|
||||
|
||||
// remove entries from properties
|
||||
String thisPrefix = PREFIX + usrName + '.';
|
||||
Enumeration<?> propNames = userDataProp.propertyNames();
|
||||
ArrayList<String> remKeys = new ArrayList<String>();
|
||||
while (propNames.hasMoreElements()) {
|
||||
String thisKey = propNames.nextElement().toString();
|
||||
if (thisKey.startsWith(thisPrefix)) {
|
||||
remKeys.add(thisKey);
|
||||
}
|
||||
}
|
||||
Iterator<String> remKeysIt = remKeys.iterator();
|
||||
while (remKeysIt.hasNext()) {
|
||||
userDataProp.remove(remKeysIt.next());
|
||||
}
|
||||
|
||||
saveUserData();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get user password. Returns the encrypted value.
|
||||
* <p/>
|
||||
* <pre>
|
||||
* If the password value is not null
|
||||
* password = new password
|
||||
* else
|
||||
* if user does exist
|
||||
* password = old password
|
||||
* else
|
||||
* password = ""
|
||||
* </pre>
|
||||
*/
|
||||
private String getPassword(User usr) {
|
||||
String name = usr.getName();
|
||||
String password = usr.getPassword();
|
||||
|
||||
if (password != null) {
|
||||
password = passwordEncryptor.encrypt(password);
|
||||
} else {
|
||||
String blankPassword = passwordEncryptor.encrypt("");
|
||||
|
||||
if (doesExist(name)) {
|
||||
String key = PREFIX + name + '.' + ATTR_PASSWORD;
|
||||
password = userDataProp.getProperty(key, blankPassword);
|
||||
} else {
|
||||
password = blankPassword;
|
||||
}
|
||||
}
|
||||
return password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all user names.
|
||||
*/
|
||||
public synchronized String[] getAllUserNames() {
|
||||
lazyInit();
|
||||
|
||||
// get all user names
|
||||
String suffix = '.' + ATTR_HOME;
|
||||
ArrayList<String> ulst = new ArrayList<String>();
|
||||
Enumeration<?> allKeys = userDataProp.propertyNames();
|
||||
int prefixlen = PREFIX.length();
|
||||
int suffixlen = suffix.length();
|
||||
while (allKeys.hasMoreElements()) {
|
||||
String key = (String) allKeys.nextElement();
|
||||
if (key.endsWith(suffix)) {
|
||||
String name = key.substring(prefixlen);
|
||||
int endIndex = name.length() - suffixlen;
|
||||
name = name.substring(0, endIndex);
|
||||
ulst.add(name);
|
||||
}
|
||||
}
|
||||
|
||||
Collections.sort(ulst);
|
||||
return ulst.toArray(new String[0]);
|
||||
}
|
||||
|
||||
private ArrayList<String> parseGroups(String groupsLine) {
|
||||
String groupsArray[] = groupsLine.split(",");
|
||||
return new ArrayList(Arrays.asList(groupsArray));
|
||||
}
|
||||
|
||||
public static synchronized boolean getUserRenamePush(String userName) {
|
||||
return userDataProp.getBoolean(PREFIX + userName + ".rename.push", false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Load user data.
|
||||
*/
|
||||
public synchronized User getUserByName(String userName) {
|
||||
lazyInit();
|
||||
|
||||
if (!doesExist(userName)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String baseKey = PREFIX + userName + '.';
|
||||
HdfsUser user = new HdfsUser();
|
||||
user.setName(userName);
|
||||
user.setEnabled(userDataProp.getBoolean(baseKey + ATTR_ENABLE, true));
|
||||
user.setHomeDirectory(userDataProp
|
||||
.getProperty(baseKey + ATTR_HOME, "/"));
|
||||
|
||||
// user.setGroups(parseGroups(userDataProp
|
||||
// .getProperty(baseKey + "groups")));
|
||||
|
||||
List<Authority> authorities = new ArrayList<Authority>();
|
||||
|
||||
if (userDataProp.getBoolean(baseKey + ATTR_WRITE_PERM, false)) {
|
||||
authorities.add(new WritePermission());
|
||||
}
|
||||
|
||||
int maxLogin = userDataProp.getInteger(baseKey + ATTR_MAX_LOGIN_NUMBER,
|
||||
0);
|
||||
int maxLoginPerIP = userDataProp.getInteger(baseKey
|
||||
+ ATTR_MAX_LOGIN_PER_IP, 0);
|
||||
|
||||
authorities.add(new ConcurrentLoginPermission(maxLogin, maxLoginPerIP));
|
||||
|
||||
int uploadRate = userDataProp.getInteger(
|
||||
baseKey + ATTR_MAX_UPLOAD_RATE, 0);
|
||||
int downloadRate = userDataProp.getInteger(baseKey
|
||||
+ ATTR_MAX_DOWNLOAD_RATE, 0);
|
||||
|
||||
authorities.add(new TransferRatePermission(downloadRate, uploadRate));
|
||||
|
||||
user.setAuthorities(authorities);
|
||||
|
||||
user.setMaxIdleTime(userDataProp.getInteger(baseKey
|
||||
+ ATTR_MAX_IDLE_TIME, 0));
|
||||
|
||||
return user;
|
||||
}
|
||||
|
||||
/**
|
||||
* User existance check
|
||||
*/
|
||||
public synchronized boolean doesExist(String name) {
|
||||
lazyInit();
|
||||
|
||||
String key = PREFIX + name + '.' + ATTR_HOME;
|
||||
return userDataProp.containsKey(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* User authenticate method
|
||||
*/
|
||||
public synchronized User authenticate(Authentication authentication)
|
||||
throws AuthenticationFailedException {
|
||||
lazyInit();
|
||||
|
||||
if (authentication instanceof UsernamePasswordAuthentication) {
|
||||
UsernamePasswordAuthentication upauth = (UsernamePasswordAuthentication) authentication;
|
||||
|
||||
String user = upauth.getUsername();
|
||||
String password = upauth.getPassword();
|
||||
|
||||
if (user == null) {
|
||||
throw new AuthenticationFailedException("Authentication failed");
|
||||
}
|
||||
|
||||
if (password == null) {
|
||||
password = "";
|
||||
}
|
||||
|
||||
String storedPassword = userDataProp.getProperty(PREFIX + user + '.'
|
||||
+ ATTR_PASSWORD);
|
||||
|
||||
if (storedPassword == null) {
|
||||
// user does not exist
|
||||
throw new AuthenticationFailedException("Authentication failed");
|
||||
}
|
||||
|
||||
if (passwordEncryptor.matches(password, storedPassword)) {
|
||||
return getUserByName(user);
|
||||
} else {
|
||||
throw new AuthenticationFailedException("Authentication failed");
|
||||
}
|
||||
|
||||
} else if (authentication instanceof AnonymousAuthentication) {
|
||||
if (doesExist("anonymous")) {
|
||||
return getUserByName("anonymous");
|
||||
} else {
|
||||
throw new AuthenticationFailedException("Authentication failed");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"Authentication not supported by this user manager");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the user manager - remove existing entries.
|
||||
*/
|
||||
public synchronized void dispose() {
|
||||
if (userDataProp != null) {
|
||||
userDataProp.clear();
|
||||
userDataProp = null;
|
||||
}
|
||||
}
|
||||
}
|
15
other/java/hdfs-over-ftp/src/main/resources/application.yml
Normal file
15
other/java/hdfs-over-ftp/src/main/resources/application.yml
Normal file
|
@ -0,0 +1,15 @@
|
|||
server:
|
||||
port: 8080
|
||||
|
||||
ftp:
|
||||
port: 2222
|
||||
passive-address: localhost
|
||||
passive-ports: 30000-30999
|
||||
|
||||
hdfs:
|
||||
uri: seaweedfs://localhost:8888
|
||||
|
||||
seaweedFs:
|
||||
enable: true
|
||||
access: direct # direct/filerProxy/publicUrl
|
||||
replication: "000"
|
39
other/java/hdfs-over-ftp/src/main/resources/assembly.xml
Normal file
39
other/java/hdfs-over-ftp/src/main/resources/assembly.xml
Normal file
|
@ -0,0 +1,39 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
|
||||
|
||||
<id>package</id>
|
||||
<formats>
|
||||
<!-- 指定打包格式,支持的打包格式有zip、tar、tar.gz (or tgz)、tar.bz2 (or tbz2)、jar、dir、war,可以同时指定多个打包格式 -->
|
||||
<format>tar.gz</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>src/main/resources</directory>
|
||||
<outputDirectory>/</outputDirectory>
|
||||
<includes>
|
||||
<include>application.yml</include>
|
||||
<include>logback-spring.xml</include>
|
||||
<include>users.properties</include>
|
||||
<include>kafka-producer.properties</include>
|
||||
</includes>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${project.build.directory}</directory>
|
||||
<outputDirectory>/</outputDirectory>
|
||||
<includes>
|
||||
<include>*.jar</include>
|
||||
</includes>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<useProjectArtifact>false</useProjectArtifact>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
<scope>runtime</scope>
|
||||
<unpack>false</unpack>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
|
@ -0,0 +1,40 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<configuration>
|
||||
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
|
||||
<property name="LOG_HOME" value="${user.dir}/logs/" />
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="Stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<!-- 日志输出编码 -->
|
||||
<layout class="ch.qos.logback.classic.PatternLayout">
|
||||
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
|
||||
</pattern>
|
||||
</layout>
|
||||
</appender>
|
||||
|
||||
<!-- 按照每天生成日志文件 -->
|
||||
<appender name="RollingFile"
|
||||
class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<File>${LOG_HOME}/fileLog.log</File>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/fileLog.log.%d.%i</fileNamePattern>
|
||||
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
|
||||
<maxFileSize>100 MB</maxFileSize>
|
||||
</timeBasedFileNamingAndTriggeringPolicy>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>
|
||||
%d %p (%file:%line\)- %m%n
|
||||
</pattern>
|
||||
<charset>UTF-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 日志输出级别 -->
|
||||
<root level="info">
|
||||
<appender-ref ref="Stdout" />
|
||||
<appender-ref ref="RollingFile" />
|
||||
</root>
|
||||
|
||||
</configuration>
|
12
other/java/hdfs-over-ftp/users.properties
Normal file
12
other/java/hdfs-over-ftp/users.properties
Normal file
|
@ -0,0 +1,12 @@
|
|||
#Generated file - don't edit (please)
|
||||
#Thu Mar 11 19:11:12 CST 2021
|
||||
ftpserver.user.test.idletime=0
|
||||
ftpserver.user.test.maxloginperip=0
|
||||
ftpserver.user.test.userpassword=44664D4D827C740293D2AA244FB60445
|
||||
ftpserver.user.test.enableflag=true
|
||||
ftpserver.user.test.maxloginnumber=0
|
||||
ftpserver.user.test.rename.push=true
|
||||
ftpserver.user.test.homedirectory=/buckets/test/
|
||||
ftpserver.user.test.downloadrate=0
|
||||
ftpserver.user.test.writepermission=true
|
||||
ftpserver.user.test.uploadrate=0
|
Loading…
Reference in a new issue