mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Hadoop file system: 1.4.3
added buffered fs input stream
This commit is contained in:
parent
703057bff9
commit
6b41c5250b
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
<groupId>com.github.chrislusf</groupId>
|
<groupId>com.github.chrislusf</groupId>
|
||||||
<artifactId>seaweedfs-client</artifactId>
|
<artifactId>seaweedfs-client</artifactId>
|
||||||
<version>1.4.1</version>
|
<version>1.4.3</version>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>org.sonatype.oss</groupId>
|
<groupId>org.sonatype.oss</groupId>
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
<groupId>com.github.chrislusf</groupId>
|
<groupId>com.github.chrislusf</groupId>
|
||||||
<artifactId>seaweedfs-client</artifactId>
|
<artifactId>seaweedfs-client</artifactId>
|
||||||
<version>1.4.1</version>
|
<version>1.4.3</version>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>org.sonatype.oss</groupId>
|
<groupId>org.sonatype.oss</groupId>
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
<groupId>com.github.chrislusf</groupId>
|
<groupId>com.github.chrislusf</groupId>
|
||||||
<artifactId>seaweedfs-client</artifactId>
|
<artifactId>seaweedfs-client</artifactId>
|
||||||
<version>1.4.1</version>
|
<version>1.4.3</version>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>org.sonatype.oss</groupId>
|
<groupId>org.sonatype.oss</groupId>
|
||||||
|
|
|
@ -127,7 +127,7 @@
|
||||||
</snapshotRepository>
|
</snapshotRepository>
|
||||||
</distributionManagement>
|
</distributionManagement>
|
||||||
<properties>
|
<properties>
|
||||||
<seaweedfs.client.version>1.4.1</seaweedfs.client.version>
|
<seaweedfs.client.version>1.4.3</seaweedfs.client.version>
|
||||||
<hadoop.version>2.9.2</hadoop.version>
|
<hadoop.version>2.9.2</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<seaweedfs.client.version>1.4.1</seaweedfs.client.version>
|
<seaweedfs.client.version>1.4.3</seaweedfs.client.version>
|
||||||
<hadoop.version>2.9.2</hadoop.version>
|
<hadoop.version>2.9.2</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
|
|
@ -1,49 +0,0 @@
|
||||||
package seaweed.hdfs;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.PositionedReadable;
|
|
||||||
import org.apache.hadoop.fs.Seekable;
|
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
|
||||||
import java.io.FilterInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
|
|
||||||
class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable {
|
|
||||||
|
|
||||||
SeaweedInputStream t;
|
|
||||||
|
|
||||||
protected BufferedSeaweedInputStream(InputStream in, int bufferSize) {
|
|
||||||
super(new BufferedInputStream(in, bufferSize));
|
|
||||||
t = (SeaweedInputStream)in;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
|
|
||||||
return this.t.read(position,buffer,offset,length);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
|
|
||||||
this.t.readFully(position,buffer,offset,length);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void readFully(long position, byte[] buffer) throws IOException {
|
|
||||||
this.t.readFully(position,buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void seek(long pos) throws IOException {
|
|
||||||
this.t.seek(pos);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getPos() throws IOException {
|
|
||||||
return this.t.getPos();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
|
||||||
return this.t.seekToNewSource(targetPos);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -75,8 +75,8 @@ public class SeaweedFileSystem extends FileSystem {
|
||||||
path = qualify(path);
|
path = qualify(path);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
|
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
|
||||||
return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024));
|
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
|
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package seaweed.hdfs;
|
package seaweed.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -207,8 +208,8 @@ public class SeaweedFileSystemStore {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
|
public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
|
||||||
int bufferSize) throws IOException {
|
int bufferSize) throws IOException {
|
||||||
|
|
||||||
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
|
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
|
||||||
|
|
||||||
|
|
|
@ -127,7 +127,7 @@
|
||||||
</snapshotRepository>
|
</snapshotRepository>
|
||||||
</distributionManagement>
|
</distributionManagement>
|
||||||
<properties>
|
<properties>
|
||||||
<seaweedfs.client.version>1.4.1</seaweedfs.client.version>
|
<seaweedfs.client.version>1.4.3</seaweedfs.client.version>
|
||||||
<hadoop.version>3.1.1</hadoop.version>
|
<hadoop.version>3.1.1</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<seaweedfs.client.version>1.4.1</seaweedfs.client.version>
|
<seaweedfs.client.version>1.4.3</seaweedfs.client.version>
|
||||||
<hadoop.version>3.1.1</hadoop.version>
|
<hadoop.version>3.1.1</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
|
|
@ -1,49 +0,0 @@
|
||||||
package seaweed.hdfs;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.PositionedReadable;
|
|
||||||
import org.apache.hadoop.fs.Seekable;
|
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
|
||||||
import java.io.FilterInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
|
|
||||||
class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable {
|
|
||||||
|
|
||||||
SeaweedInputStream t;
|
|
||||||
|
|
||||||
protected BufferedSeaweedInputStream(InputStream in, int bufferSize) {
|
|
||||||
super(new BufferedInputStream(in, bufferSize));
|
|
||||||
t = (SeaweedInputStream)in;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
|
|
||||||
return this.t.read(position,buffer,offset,length);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
|
|
||||||
this.t.readFully(position,buffer,offset,length);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void readFully(long position, byte[] buffer) throws IOException {
|
|
||||||
this.t.readFully(position,buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void seek(long pos) throws IOException {
|
|
||||||
this.t.seek(pos);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getPos() throws IOException {
|
|
||||||
return this.t.getPos();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
|
||||||
return this.t.seekToNewSource(targetPos);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -75,8 +75,8 @@ public class SeaweedFileSystem extends FileSystem {
|
||||||
path = qualify(path);
|
path = qualify(path);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
|
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
|
||||||
return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024));
|
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
|
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package seaweed.hdfs;
|
package seaweed.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -207,8 +208,8 @@ public class SeaweedFileSystemStore {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
|
public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
|
||||||
int bufferSize) throws IOException {
|
int bufferSize) throws IOException {
|
||||||
|
|
||||||
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
|
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue