mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
commit
6a93e26fc3
12
README.md
12
README.md
|
@ -112,15 +112,15 @@ On top of the object store, optional [Filer] can support directories and POSIX a
|
||||||
[Back to TOC](#table-of-contents)
|
[Back to TOC](#table-of-contents)
|
||||||
|
|
||||||
## Filer Features ##
|
## Filer Features ##
|
||||||
* [Filer server][Filer] provide "normal" directories and files via http.
|
* [Filer server][Filer] provides "normal" directories and files via http.
|
||||||
* [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB.
|
* [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB.
|
||||||
* [Mount filer][Mount] to read and write files directly as a local directory via FUSE.
|
* [Mount filer][Mount] reads and writes files directly as a local directory via FUSE.
|
||||||
* [Amazon S3 compatible API][AmazonS3API] to access files with S3 tooling.
|
* [Amazon S3 compatible API][AmazonS3API] accesses files with S3 tooling.
|
||||||
* [Hadoop Compatible File System][Hadoop] to access files from Hadoop/Spark/Flink/etc jobs.
|
* [Hadoop Compatible File System][Hadoop] accesses files from Hadoop/Spark/Flink/etc jobs.
|
||||||
* [Async Backup To Cloud][BackupToCloud] has extremely fast local access and backups to Amazon S3, Google Cloud Storage, Azure, BackBlaze.
|
* [Async Backup To Cloud][BackupToCloud] has extremely fast local access and backups to Amazon S3, Google Cloud Storage, Azure, BackBlaze.
|
||||||
* [WebDAV] access as a mapped drive on Mac and Windows, or from mobile devices.
|
* [WebDAV] accesses as a mapped drive on Mac and Windows, or from mobile devices.
|
||||||
* [AES256-GCM Encrypted Storage][FilerDataEncryption] safely stores the encrypted data.
|
* [AES256-GCM Encrypted Storage][FilerDataEncryption] safely stores the encrypted data.
|
||||||
* [File TTL][FilerTTL] automatically purge file metadata and actual file data.
|
* [File TTL][FilerTTL] automatically purges file metadata and actual file data.
|
||||||
* [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/)
|
* [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/)
|
||||||
|
|
||||||
[Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files
|
[Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -6,7 +6,6 @@ require (
|
||||||
cloud.google.com/go v0.44.3
|
cloud.google.com/go v0.44.3
|
||||||
github.com/Azure/azure-pipeline-go v0.2.2 // indirect
|
github.com/Azure/azure-pipeline-go v0.2.2 // indirect
|
||||||
github.com/Azure/azure-storage-blob-go v0.8.0
|
github.com/Azure/azure-storage-blob-go v0.8.0
|
||||||
github.com/DataDog/zstd v1.4.1 // indirect
|
|
||||||
github.com/OneOfOne/xxhash v1.2.2
|
github.com/OneOfOne/xxhash v1.2.2
|
||||||
github.com/Shopify/sarama v1.23.1
|
github.com/Shopify/sarama v1.23.1
|
||||||
github.com/aws/aws-sdk-go v1.23.13
|
github.com/aws/aws-sdk-go v1.23.13
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -32,8 +32,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
||||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||||
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg=
|
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg=
|
||||||
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||||
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
|
|
||||||
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
|
||||||
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190605020000-c4ba1fdf4d36/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
|
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190605020000-c4ba1fdf4d36/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
|
||||||
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
||||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||||
|
@ -234,8 +232,6 @@ github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk
|
||||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||||
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
|
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
|
||||||
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
|
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
|
||||||
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
|
|
||||||
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
|
||||||
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
|
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
|
||||||
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||||
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
description: SeaweedFS
|
description: SeaweedFS
|
||||||
name: seaweedfs
|
name: seaweedfs
|
||||||
version: 1.87
|
version: 1.88
|
|
@ -4,7 +4,7 @@ global:
|
||||||
registry: ""
|
registry: ""
|
||||||
repository: ""
|
repository: ""
|
||||||
imageName: chrislusf/seaweedfs
|
imageName: chrislusf/seaweedfs
|
||||||
imageTag: "1.87"
|
imageTag: "1.88"
|
||||||
imagePullPolicy: IfNotPresent
|
imagePullPolicy: IfNotPresent
|
||||||
imagePullSecrets: imagepullsecret
|
imagePullSecrets: imagepullsecret
|
||||||
restartPolicy: Always
|
restartPolicy: Always
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
<groupId>com.github.chrislusf</groupId>
|
<groupId>com.github.chrislusf</groupId>
|
||||||
<artifactId>seaweedfs-client</artifactId>
|
<artifactId>seaweedfs-client</artifactId>
|
||||||
<version>1.4.4</version>
|
<version>1.4.5</version>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>org.sonatype.oss</groupId>
|
<groupId>org.sonatype.oss</groupId>
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
<groupId>com.github.chrislusf</groupId>
|
<groupId>com.github.chrislusf</groupId>
|
||||||
<artifactId>seaweedfs-client</artifactId>
|
<artifactId>seaweedfs-client</artifactId>
|
||||||
<version>1.4.4</version>
|
<version>1.4.5</version>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>org.sonatype.oss</groupId>
|
<groupId>org.sonatype.oss</groupId>
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
<groupId>com.github.chrislusf</groupId>
|
<groupId>com.github.chrislusf</groupId>
|
||||||
<artifactId>seaweedfs-client</artifactId>
|
<artifactId>seaweedfs-client</artifactId>
|
||||||
<version>1.4.4</version>
|
<version>1.4.5</version>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>org.sonatype.oss</groupId>
|
<groupId>org.sonatype.oss</groupId>
|
||||||
|
|
|
@ -76,8 +76,7 @@ public class FileChunkManifest {
|
||||||
LOG.debug("doFetchFullChunkData:{}", chunkView);
|
LOG.debug("doFetchFullChunkData:{}", chunkView);
|
||||||
chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
|
chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
|
||||||
}
|
}
|
||||||
if(chunk.getIsChunkManifest()){
|
if (chunk.getIsChunkManifest()){
|
||||||
// only cache manifest chunks
|
|
||||||
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
|
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
|
||||||
SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
|
SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,6 +120,180 @@
|
||||||
</plugin>
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-client</artifactId>
|
||||||
|
<version>2.9.2</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>hadoop-hdfs-client</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>hadoop-mapreduce-client-app</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>hadoop-yarn-api</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>hadoop-annotations</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-common</artifactId>
|
||||||
|
<version>2.9.2</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-cli</artifactId>
|
||||||
|
<groupId>commons-cli</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-math3</artifactId>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>xmlenc</artifactId>
|
||||||
|
<groupId>xmlenc</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-io</artifactId>
|
||||||
|
<groupId>commons-io</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-net</artifactId>
|
||||||
|
<groupId>commons-net</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-collections</artifactId>
|
||||||
|
<groupId>commons-collections</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>servlet-api</artifactId>
|
||||||
|
<groupId>javax.servlet</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jetty</artifactId>
|
||||||
|
<groupId>org.mortbay.jetty</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jetty-util</artifactId>
|
||||||
|
<groupId>org.mortbay.jetty</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jetty-sslengine</artifactId>
|
||||||
|
<groupId>org.mortbay.jetty</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jsp-api</artifactId>
|
||||||
|
<groupId>javax.servlet.jsp</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jersey-core</artifactId>
|
||||||
|
<groupId>com.sun.jersey</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jersey-json</artifactId>
|
||||||
|
<groupId>com.sun.jersey</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jersey-server</artifactId>
|
||||||
|
<groupId>com.sun.jersey</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>log4j</artifactId>
|
||||||
|
<groupId>log4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jets3t</artifactId>
|
||||||
|
<groupId>net.java.dev.jets3t</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-lang</artifactId>
|
||||||
|
<groupId>commons-lang</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-configuration</artifactId>
|
||||||
|
<groupId>commons-configuration</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-lang3</artifactId>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jackson-core-asl</artifactId>
|
||||||
|
<groupId>org.codehaus.jackson</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jackson-mapper-asl</artifactId>
|
||||||
|
<groupId>org.codehaus.jackson</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>avro</artifactId>
|
||||||
|
<groupId>org.apache.avro</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>hadoop-auth</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>jsch</artifactId>
|
||||||
|
<groupId>com.jcraft</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>curator-client</artifactId>
|
||||||
|
<groupId>org.apache.curator</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>curator-recipes</artifactId>
|
||||||
|
<groupId>org.apache.curator</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>htrace-core4</artifactId>
|
||||||
|
<groupId>org.apache.htrace</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>zookeeper</artifactId>
|
||||||
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-compress</artifactId>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>stax2-api</artifactId>
|
||||||
|
<groupId>org.codehaus.woodstox</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>woodstox-core</artifactId>
|
||||||
|
<groupId>com.fasterxml.woodstox</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>hadoop-annotations</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
<distributionManagement>
|
<distributionManagement>
|
||||||
<snapshotRepository>
|
<snapshotRepository>
|
||||||
<id>ossrh</id>
|
<id>ossrh</id>
|
||||||
|
@ -127,7 +301,7 @@
|
||||||
</snapshotRepository>
|
</snapshotRepository>
|
||||||
</distributionManagement>
|
</distributionManagement>
|
||||||
<properties>
|
<properties>
|
||||||
<seaweedfs.client.version>1.4.4</seaweedfs.client.version>
|
<seaweedfs.client.version>1.4.5</seaweedfs.client.version>
|
||||||
<hadoop.version>2.9.2</hadoop.version>
|
<hadoop.version>2.9.2</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<seaweedfs.client.version>1.4.4</seaweedfs.client.version>
|
<seaweedfs.client.version>1.4.5</seaweedfs.client.version>
|
||||||
<hadoop.version>2.9.2</hadoop.version>
|
<hadoop.version>2.9.2</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
@ -147,6 +147,7 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-client</artifactId>
|
<artifactId>hadoop-client</artifactId>
|
||||||
<version>${hadoop.version}</version>
|
<version>${hadoop.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.github.chrislusf</groupId>
|
<groupId>com.github.chrislusf</groupId>
|
||||||
|
@ -157,6 +158,7 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-common</artifactId>
|
<artifactId>hadoop-common</artifactId>
|
||||||
<version>${hadoop.version}</version>
|
<version>${hadoop.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
|
@ -127,7 +127,7 @@
|
||||||
</snapshotRepository>
|
</snapshotRepository>
|
||||||
</distributionManagement>
|
</distributionManagement>
|
||||||
<properties>
|
<properties>
|
||||||
<seaweedfs.client.version>1.4.4</seaweedfs.client.version>
|
<seaweedfs.client.version>1.4.5</seaweedfs.client.version>
|
||||||
<hadoop.version>3.1.1</hadoop.version>
|
<hadoop.version>3.1.1</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<seaweedfs.client.version>1.4.4</seaweedfs.client.version>
|
<seaweedfs.client.version>1.4.5</seaweedfs.client.version>
|
||||||
<hadoop.version>3.1.1</hadoop.version>
|
<hadoop.version>3.1.1</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
@ -147,6 +147,7 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-client</artifactId>
|
<artifactId>hadoop-client</artifactId>
|
||||||
<version>${hadoop.version}</version>
|
<version>${hadoop.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.github.chrislusf</groupId>
|
<groupId>com.github.chrislusf</groupId>
|
||||||
|
@ -157,6 +158,7 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-common</artifactId>
|
<artifactId>hadoop-common</artifactId>
|
||||||
<version>${hadoop.version}</version>
|
<version>${hadoop.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
73
unmaintained/s3/presigned_put/presigned_put.go
Normal file
73
unmaintained/s3/presigned_put/presigned_put.go
Normal file
|
@ -0,0 +1,73 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
"encoding/base64"
|
||||||
|
"fmt"
|
||||||
|
"crypto/md5"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Downloads an item from an S3 Bucket in the region configured in the shared config
|
||||||
|
// or AWS_REGION environment variable.
|
||||||
|
//
|
||||||
|
// Usage:
|
||||||
|
// go run presigned_put.go
|
||||||
|
// For this exampl to work, the domainName is needd
|
||||||
|
// weed s3 -domainName=localhost
|
||||||
|
func main() {
|
||||||
|
h := md5.New()
|
||||||
|
content := strings.NewReader(stringContent)
|
||||||
|
content.WriteTo(h)
|
||||||
|
|
||||||
|
// Initialize a session in us-west-2 that the SDK will use to load
|
||||||
|
// credentials from the shared credentials file ~/.aws/credentials.
|
||||||
|
sess, err := session.NewSession(&aws.Config{
|
||||||
|
Region: aws.String("us-east-1"),
|
||||||
|
Endpoint: aws.String("http://localhost:8333"),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create S3 service client
|
||||||
|
svc := s3.New(sess)
|
||||||
|
|
||||||
|
putRequest, output := svc.PutObjectRequest(&s3.PutObjectInput{
|
||||||
|
Bucket: aws.String("dev"),
|
||||||
|
Key: aws.String("testKey"),
|
||||||
|
})
|
||||||
|
fmt.Printf("output: %+v\n", output)
|
||||||
|
|
||||||
|
md5s := base64.StdEncoding.EncodeToString(h.Sum(nil))
|
||||||
|
putRequest.HTTPRequest.Header.Set("Content-MD5", md5s)
|
||||||
|
|
||||||
|
url, err := putRequest.Presign(15 * time.Minute)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("error presigning request", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(url)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("PUT", url, strings.NewReader(stringContent))
|
||||||
|
req.Header.Set("Content-MD5", md5s)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("error creating request", url)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error put request: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Printf("response: %+v\n", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
var stringContent = `Generate a Pre-Signed URL for an Amazon S3 PUT Operation with a Specific Payload
|
||||||
|
You can generate a pre-signed URL for a PUT operation that checks whether users upload the correct content. When the SDK pre-signs a request, it computes the checksum of the request body and generates an MD5 checksum that is included in the pre-signed URL. Users must upload the same content that produces the same MD5 checksum generated by the SDK; otherwise, the operation fails. This is not the Content-MD5, but the signature. To enforce Content-MD5, simply add the header to the request.
|
||||||
|
|
||||||
|
The following example adds a Body field to generate a pre-signed PUT operation that requires a specific payload to be uploaded by users.
|
||||||
|
`
|
|
@ -2,7 +2,6 @@ package filer2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -15,7 +14,7 @@ import (
|
||||||
|
|
||||||
func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
||||||
|
|
||||||
fmt.Printf("start to stream content for chunks: %+v\n", chunks)
|
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
|
||||||
chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
|
chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
|
||||||
|
|
||||||
fileId2Url := make(map[string]string)
|
fileId2Url := make(map[string]string)
|
||||||
|
|
|
@ -101,6 +101,7 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
||||||
|
return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
|
||||||
return &File{
|
return &File{
|
||||||
Name: name,
|
Name: name,
|
||||||
dir: dir,
|
dir: dir,
|
||||||
|
@ -108,11 +109,14 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
||||||
entry: entry,
|
entry: entry,
|
||||||
entryViewCache: nil,
|
entryViewCache: nil,
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
|
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
|
||||||
|
|
||||||
|
return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
|
||||||
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
|
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,6 +316,8 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
||||||
|
|
||||||
dir.wfs.deleteFileChunks(entry.Chunks)
|
dir.wfs.deleteFileChunks(entry.Chunks)
|
||||||
|
|
||||||
|
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
|
||||||
|
|
||||||
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
|
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
|
||||||
|
|
||||||
glog.V(3).Infof("remove file: %v", req)
|
glog.V(3).Infof("remove file: %v", req)
|
||||||
|
@ -328,13 +334,17 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
||||||
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
|
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
|
||||||
|
|
||||||
t := util.NewFullPath(dir.FullPath(), req.Name)
|
t := util.NewFullPath(dir.FullPath(), req.Name)
|
||||||
|
dir.wfs.fsNodeCache.DeleteFsNode(t)
|
||||||
|
|
||||||
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
|
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
|
||||||
|
|
||||||
glog.V(3).Infof("remove directory entry: %v", req)
|
glog.V(3).Infof("remove directory entry: %v", req)
|
||||||
err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false)
|
err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err)
|
glog.V(3).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err)
|
||||||
|
if strings.Contains(err.Error(), "non-empty"){
|
||||||
|
return fuse.EEXIST
|
||||||
|
}
|
||||||
return fuse.ENOENT
|
return fuse.ENOENT
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -420,6 +430,8 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
|
||||||
|
|
||||||
func (dir *Dir) Forget() {
|
func (dir *Dir) Forget() {
|
||||||
glog.V(3).Infof("Forget dir %s", dir.FullPath())
|
glog.V(3).Infof("Forget dir %s", dir.FullPath())
|
||||||
|
|
||||||
|
dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) maybeLoadEntry() error {
|
func (dir *Dir) maybeLoadEntry() error {
|
||||||
|
|
|
@ -62,6 +62,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
|
||||||
}
|
}
|
||||||
|
|
||||||
// fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
|
// fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
|
||||||
|
dir.wfs.fsNodeCache.Move(oldPath, newPath)
|
||||||
delete(dir.wfs.handles, oldPath.AsInode())
|
delete(dir.wfs.handles, oldPath.AsInode())
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -213,6 +213,7 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
||||||
func (file *File) Forget() {
|
func (file *File) Forget() {
|
||||||
t := util.NewFullPath(file.dir.FullPath(), file.Name)
|
t := util.NewFullPath(file.dir.FullPath(), file.Name)
|
||||||
glog.V(3).Infof("Forget file %s", t)
|
glog.V(3).Infof("Forget file %s", t)
|
||||||
|
file.wfs.fsNodeCache.DeleteFsNode(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) maybeLoadEntry(ctx context.Context) error {
|
func (file *File) maybeLoadEntry(ctx context.Context) error {
|
||||||
|
|
|
@ -9,11 +9,12 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/fuse"
|
||||||
|
"github.com/seaweedfs/fuse/fs"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/seaweedfs/fuse"
|
|
||||||
"github.com/seaweedfs/fuse/fs"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileHandle struct {
|
type FileHandle struct {
|
||||||
|
@ -225,6 +226,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||||
fh.f.entry.Chunks = chunks
|
fh.f.entry.Chunks = chunks
|
||||||
// fh.f.entryViewCache = nil
|
// fh.f.entryViewCache = nil
|
||||||
|
|
||||||
|
// special handling of one chunk md5
|
||||||
|
if len(chunks) == 1 {
|
||||||
|
}
|
||||||
|
|
||||||
if err := filer_pb.CreateEntry(client, request); err != nil {
|
if err := filer_pb.CreateEntry(client, request); err != nil {
|
||||||
glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
|
glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
|
||||||
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
|
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
|
||||||
|
|
207
weed/filesys/fscache.go
Normal file
207
weed/filesys/fscache.go
Normal file
|
@ -0,0 +1,207 @@
|
||||||
|
package filesys
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"github.com/seaweedfs/fuse/fs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FsCache struct {
|
||||||
|
root *FsNode
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
type FsNode struct {
|
||||||
|
parent *FsNode
|
||||||
|
node fs.Node
|
||||||
|
name string
|
||||||
|
childrenLock sync.RWMutex
|
||||||
|
children map[string]*FsNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFsCache(root fs.Node) *FsCache {
|
||||||
|
return &FsCache{
|
||||||
|
root: &FsNode{
|
||||||
|
node: root,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
|
||||||
|
|
||||||
|
c.RLock()
|
||||||
|
defer c.RUnlock()
|
||||||
|
|
||||||
|
return c.doGetFsNode(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
|
||||||
|
t := c.root
|
||||||
|
for _, p := range path.Split() {
|
||||||
|
t = t.findChild(p)
|
||||||
|
if t == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return t.node
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
c.doSetFsNode(path, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
|
||||||
|
t := c.root
|
||||||
|
for _, p := range path.Split() {
|
||||||
|
t = t.ensureChild(p)
|
||||||
|
}
|
||||||
|
t.node = node
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
t := c.doGetFsNode(path)
|
||||||
|
if t != nil {
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
t = genNodeFn()
|
||||||
|
c.doSetFsNode(path, t)
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FsCache) DeleteFsNode(path util.FullPath) {
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
t := c.root
|
||||||
|
for _, p := range path.Split() {
|
||||||
|
t = t.findChild(p)
|
||||||
|
if t == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if t.parent != nil {
|
||||||
|
t.parent.disconnectChild(t)
|
||||||
|
}
|
||||||
|
t.deleteSelf()
|
||||||
|
}
|
||||||
|
|
||||||
|
// oldPath and newPath are full path including the new name
|
||||||
|
func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
// find old node
|
||||||
|
src := c.root
|
||||||
|
for _, p := range oldPath.Split() {
|
||||||
|
src = src.findChild(p)
|
||||||
|
if src == nil {
|
||||||
|
return src
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if src.parent != nil {
|
||||||
|
src.parent.disconnectChild(src)
|
||||||
|
}
|
||||||
|
|
||||||
|
// find new node
|
||||||
|
target := c.root
|
||||||
|
for _, p := range newPath.Split() {
|
||||||
|
target = target.ensureChild(p)
|
||||||
|
}
|
||||||
|
parent := target.parent
|
||||||
|
src.name = target.name
|
||||||
|
if dir, ok := src.node.(*Dir); ok {
|
||||||
|
dir.name = target.name // target is not Dir, but a shortcut
|
||||||
|
}
|
||||||
|
if f, ok := src.node.(*File); ok {
|
||||||
|
f.Name = target.name
|
||||||
|
if f.entry != nil {
|
||||||
|
f.entry.Name = f.Name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
parent.disconnectChild(target)
|
||||||
|
|
||||||
|
target.deleteSelf()
|
||||||
|
|
||||||
|
src.connectToParent(parent)
|
||||||
|
|
||||||
|
return src
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *FsNode) connectToParent(parent *FsNode) {
|
||||||
|
n.parent = parent
|
||||||
|
oldNode := parent.findChild(n.name)
|
||||||
|
if oldNode != nil {
|
||||||
|
oldNode.deleteSelf()
|
||||||
|
}
|
||||||
|
if dir, ok := n.node.(*Dir); ok {
|
||||||
|
dir.parent = parent.node.(*Dir)
|
||||||
|
}
|
||||||
|
if f, ok := n.node.(*File); ok {
|
||||||
|
f.dir = parent.node.(*Dir)
|
||||||
|
}
|
||||||
|
n.childrenLock.Lock()
|
||||||
|
parent.children[n.name] = n
|
||||||
|
n.childrenLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *FsNode) findChild(name string) *FsNode {
|
||||||
|
n.childrenLock.RLock()
|
||||||
|
defer n.childrenLock.RUnlock()
|
||||||
|
|
||||||
|
child, found := n.children[name]
|
||||||
|
if found {
|
||||||
|
return child
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *FsNode) ensureChild(name string) *FsNode {
|
||||||
|
n.childrenLock.Lock()
|
||||||
|
defer n.childrenLock.Unlock()
|
||||||
|
|
||||||
|
if n.children == nil {
|
||||||
|
n.children = make(map[string]*FsNode)
|
||||||
|
}
|
||||||
|
child, found := n.children[name]
|
||||||
|
if found {
|
||||||
|
return child
|
||||||
|
}
|
||||||
|
t := &FsNode{
|
||||||
|
parent: n,
|
||||||
|
node: nil,
|
||||||
|
name: name,
|
||||||
|
children: nil,
|
||||||
|
}
|
||||||
|
n.children[name] = t
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *FsNode) disconnectChild(child *FsNode) {
|
||||||
|
n.childrenLock.Lock()
|
||||||
|
delete(n.children, child.name)
|
||||||
|
n.childrenLock.Unlock()
|
||||||
|
child.parent = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *FsNode) deleteSelf() {
|
||||||
|
n.childrenLock.Lock()
|
||||||
|
for _, child := range n.children {
|
||||||
|
child.deleteSelf()
|
||||||
|
}
|
||||||
|
n.children = nil
|
||||||
|
n.childrenLock.Unlock()
|
||||||
|
|
||||||
|
n.node = nil
|
||||||
|
n.parent = nil
|
||||||
|
|
||||||
|
}
|
96
weed/filesys/fscache_test.go
Normal file
96
weed/filesys/fscache_test.go
Normal file
|
@ -0,0 +1,96 @@
|
||||||
|
package filesys
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPathSplit(t *testing.T) {
|
||||||
|
parts := util.FullPath("/").Split()
|
||||||
|
if len(parts) != 0 {
|
||||||
|
t.Errorf("expecting an empty list, but getting %d", len(parts))
|
||||||
|
}
|
||||||
|
|
||||||
|
parts = util.FullPath("/readme.md").Split()
|
||||||
|
if len(parts) != 1 {
|
||||||
|
t.Errorf("expecting an empty list, but getting %d", len(parts))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFsCache(t *testing.T) {
|
||||||
|
|
||||||
|
cache := newFsCache(nil)
|
||||||
|
|
||||||
|
x := cache.GetFsNode(util.FullPath("/y/x"))
|
||||||
|
if x != nil {
|
||||||
|
t.Errorf("wrong node!")
|
||||||
|
}
|
||||||
|
|
||||||
|
p := util.FullPath("/a/b/c")
|
||||||
|
cache.SetFsNode(p, &File{Name: "cc"})
|
||||||
|
tNode := cache.GetFsNode(p)
|
||||||
|
tFile := tNode.(*File)
|
||||||
|
if tFile.Name != "cc" {
|
||||||
|
t.Errorf("expecting a FsNode")
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
|
||||||
|
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
|
||||||
|
cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
|
||||||
|
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
|
||||||
|
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
|
||||||
|
|
||||||
|
b := cache.GetFsNode(util.FullPath("/a/b"))
|
||||||
|
if b != nil {
|
||||||
|
t.Errorf("unexpected node!")
|
||||||
|
}
|
||||||
|
|
||||||
|
a := cache.GetFsNode(util.FullPath("/a"))
|
||||||
|
if a == nil {
|
||||||
|
t.Errorf("missing node!")
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.DeleteFsNode(util.FullPath("/a"))
|
||||||
|
if b != nil {
|
||||||
|
t.Errorf("unexpected node!")
|
||||||
|
}
|
||||||
|
|
||||||
|
a = cache.GetFsNode(util.FullPath("/a"))
|
||||||
|
if a != nil {
|
||||||
|
t.Errorf("wrong DeleteFsNode!")
|
||||||
|
}
|
||||||
|
|
||||||
|
z := cache.GetFsNode(util.FullPath("/z"))
|
||||||
|
if z == nil {
|
||||||
|
t.Errorf("missing node!")
|
||||||
|
}
|
||||||
|
|
||||||
|
y := cache.GetFsNode(util.FullPath("/x/y"))
|
||||||
|
if y != nil {
|
||||||
|
t.Errorf("wrong node!")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFsCacheMove(t *testing.T) {
|
||||||
|
|
||||||
|
cache := newFsCache(nil)
|
||||||
|
|
||||||
|
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
|
||||||
|
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
|
||||||
|
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
|
||||||
|
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
|
||||||
|
|
||||||
|
cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
|
||||||
|
|
||||||
|
d := cache.GetFsNode(util.FullPath("/z/x/d"))
|
||||||
|
if d == nil {
|
||||||
|
t.Errorf("unexpected nil node!")
|
||||||
|
}
|
||||||
|
if d.(*File).Name != "dd" {
|
||||||
|
t.Errorf("unexpected non dd node!")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -63,6 +63,7 @@ type WFS struct {
|
||||||
stats statsCache
|
stats statsCache
|
||||||
|
|
||||||
root fs.Node
|
root fs.Node
|
||||||
|
fsNodeCache *FsCache
|
||||||
|
|
||||||
chunkCache *chunk_cache.ChunkCache
|
chunkCache *chunk_cache.ChunkCache
|
||||||
metaCache *meta_cache.MetaCache
|
metaCache *meta_cache.MetaCache
|
||||||
|
@ -82,7 +83,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
|
cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
|
||||||
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
|
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
|
||||||
if option.CacheSizeMB > 0 {
|
if option.CacheSizeMB > 0 {
|
||||||
os.MkdirAll(cacheDir, 0755)
|
os.MkdirAll(cacheDir, 0755)
|
||||||
|
@ -100,6 +101,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
||||||
})
|
})
|
||||||
|
|
||||||
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
|
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
|
||||||
|
wfs.fsNodeCache = newFsCache(wfs.root)
|
||||||
|
|
||||||
return wfs
|
return wfs
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ type MockClient struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
|
func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
|
||||||
n, originalSize, err := needle.CreateNeedleFromRequest(req, false, 1024*1024)
|
n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024)
|
||||||
if m.needleHandling != nil {
|
if m.needleHandling != nil {
|
||||||
m.needleHandling(n, originalSize, err)
|
m.needleHandling(n, originalSize, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package operation
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/md5"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -30,7 +29,7 @@ type UploadResult struct {
|
||||||
CipherKey []byte `json:"cipherKey,omitempty"`
|
CipherKey []byte `json:"cipherKey,omitempty"`
|
||||||
Mime string `json:"mime,omitempty"`
|
Mime string `json:"mime,omitempty"`
|
||||||
Gzip uint32 `json:"gzip,omitempty"`
|
Gzip uint32 `json:"gzip,omitempty"`
|
||||||
Md5 string `json:"md5,omitempty"`
|
ContentMd5 string `json:"contentMd5,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
|
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
|
||||||
|
@ -65,20 +64,12 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
|
||||||
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
|
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
|
||||||
func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
|
func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
|
||||||
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
|
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
|
||||||
if uploadResult != nil {
|
|
||||||
uploadResult.Md5 = util.Md5(data)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upload sends a POST request to a volume server to upload the content with fast compression
|
// Upload sends a POST request to a volume server to upload the content with fast compression
|
||||||
func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
|
func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
|
||||||
hash := md5.New()
|
|
||||||
reader = io.TeeReader(reader, hash)
|
|
||||||
uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
|
uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
|
||||||
if uploadResult != nil {
|
|
||||||
uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,6 +232,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
|
||||||
return nil, errors.New(ret.Error)
|
return nil, errors.New(ret.Error)
|
||||||
}
|
}
|
||||||
ret.ETag = etag
|
ret.ETag = etag
|
||||||
|
ret.ContentMd5 = resp.Header.Get("Content-MD5")
|
||||||
return &ret, nil
|
return &ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -107,6 +107,16 @@ func (iam *IdentityAccessManagement) lookupByAccessKey(accessKey string) (identi
|
||||||
return nil, nil, false
|
return nil, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (iam *IdentityAccessManagement) lookupAnonymous() (identity *Identity, found bool) {
|
||||||
|
|
||||||
|
for _, ident := range iam.identities {
|
||||||
|
if ident.Name == "anonymous" {
|
||||||
|
return ident, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) http.HandlerFunc {
|
func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) http.HandlerFunc {
|
||||||
|
|
||||||
if !iam.isEnabled() {
|
if !iam.isEnabled() {
|
||||||
|
@ -127,6 +137,7 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
|
||||||
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) ErrorCode {
|
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) ErrorCode {
|
||||||
var identity *Identity
|
var identity *Identity
|
||||||
var s3Err ErrorCode
|
var s3Err ErrorCode
|
||||||
|
var found bool
|
||||||
switch getRequestAuthType(r) {
|
switch getRequestAuthType(r) {
|
||||||
case authTypeStreamingSigned:
|
case authTypeStreamingSigned:
|
||||||
return ErrNone
|
return ErrNone
|
||||||
|
@ -146,7 +157,10 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
|
||||||
glog.V(3).Infof("jwt auth type")
|
glog.V(3).Infof("jwt auth type")
|
||||||
return ErrNotImplemented
|
return ErrNotImplemented
|
||||||
case authTypeAnonymous:
|
case authTypeAnonymous:
|
||||||
|
identity, found = iam.lookupAnonymous()
|
||||||
|
if !found {
|
||||||
return ErrAccessDenied
|
return ErrAccessDenied
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return ErrNotImplemented
|
return ErrNotImplemented
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,8 +40,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rAuthType := getRequestAuthType(r)
|
|
||||||
dataReader := r.Body
|
dataReader := r.Body
|
||||||
|
if s3a.iam.isEnabled() {
|
||||||
|
rAuthType := getRequestAuthType(r)
|
||||||
var s3ErrCode ErrorCode
|
var s3ErrCode ErrorCode
|
||||||
switch rAuthType {
|
switch rAuthType {
|
||||||
case authTypeStreamingSigned:
|
case authTypeStreamingSigned:
|
||||||
|
@ -55,6 +56,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
|
||||||
writeErrorResponse(w, s3ErrCode, r.URL)
|
writeErrorResponse(w, s3ErrCode, r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
defer dataReader.Close()
|
defer dataReader.Close()
|
||||||
|
|
||||||
if strings.HasSuffix(object, "/") {
|
if strings.HasSuffix(object, "/") {
|
||||||
|
|
|
@ -179,8 +179,9 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rAuthType := getRequestAuthType(r)
|
|
||||||
dataReader := r.Body
|
dataReader := r.Body
|
||||||
|
if s3a.iam.isEnabled() {
|
||||||
|
rAuthType := getRequestAuthType(r)
|
||||||
var s3ErrCode ErrorCode
|
var s3ErrCode ErrorCode
|
||||||
switch rAuthType {
|
switch rAuthType {
|
||||||
case authTypeStreamingSigned:
|
case authTypeStreamingSigned:
|
||||||
|
@ -194,6 +195,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
|
||||||
writeErrorResponse(w, s3ErrCode, r.URL)
|
writeErrorResponse(w, s3ErrCode, r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
defer dataReader.Close()
|
defer dataReader.Close()
|
||||||
|
|
||||||
uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
|
uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
|
||||||
|
|
|
@ -2,6 +2,7 @@ package s3api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -12,10 +13,24 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ListBucketResultV2 struct {
|
||||||
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
|
||||||
|
Name string `xml:"Name"`
|
||||||
|
Prefix string `xml:"Prefix"`
|
||||||
|
MaxKeys int `xml:"MaxKeys"`
|
||||||
|
Delimiter string `xml:"Delimiter,omitempty"`
|
||||||
|
IsTruncated bool `xml:"IsTruncated"`
|
||||||
|
Contents []ListEntry `xml:"Contents,omitempty"`
|
||||||
|
CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
|
||||||
|
ContinuationToken string `xml:"ContinuationToken,omitempty"`
|
||||||
|
NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
|
||||||
|
KeyCount int `xml:"KeyCount"`
|
||||||
|
StartAfter string `xml:"StartAfter,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
|
func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
|
||||||
|
@ -23,7 +38,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
|
||||||
// collect parameters
|
// collect parameters
|
||||||
bucket, _ := getBucketAndObject(r)
|
bucket, _ := getBucketAndObject(r)
|
||||||
|
|
||||||
originalPrefix, marker, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
|
originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
|
||||||
|
|
||||||
if maxKeys < 0 {
|
if maxKeys < 0 {
|
||||||
writeErrorResponse(w, ErrInvalidMaxKeys, r.URL)
|
writeErrorResponse(w, ErrInvalidMaxKeys, r.URL)
|
||||||
|
@ -34,7 +49,8 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if marker == "" {
|
marker := continuationToken
|
||||||
|
if continuationToken == "" {
|
||||||
marker = startAfter
|
marker = startAfter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,8 +60,22 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
|
||||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
responseV2 := &ListBucketResultV2{
|
||||||
|
XMLName: response.XMLName,
|
||||||
|
Name: response.Name,
|
||||||
|
CommonPrefixes: response.CommonPrefixes,
|
||||||
|
Contents: response.Contents,
|
||||||
|
ContinuationToken: continuationToken,
|
||||||
|
Delimiter: response.Delimiter,
|
||||||
|
IsTruncated: response.IsTruncated,
|
||||||
|
KeyCount: len(response.Contents),
|
||||||
|
MaxKeys: response.MaxKeys,
|
||||||
|
NextContinuationToken: response.NextMarker,
|
||||||
|
Prefix: response.Prefix,
|
||||||
|
StartAfter: startAfter,
|
||||||
|
}
|
||||||
|
|
||||||
writeSuccessResponseXML(w, encodeResponse(response))
|
writeSuccessResponseXML(w, encodeResponse(responseV2))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
|
func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -76,70 +106,39 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
|
||||||
writeSuccessResponseXML(w, encodeResponse(response))
|
writeSuccessResponseXML(w, encodeResponse(response))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
|
func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
|
||||||
// convert full path prefix into directory name and prefix for entry name
|
// convert full path prefix into directory name and prefix for entry name
|
||||||
dir, prefix := filepath.Split(originalPrefix)
|
reqDir, prefix := filepath.Split(originalPrefix)
|
||||||
if strings.HasPrefix(dir, "/") {
|
if strings.HasPrefix(reqDir, "/") {
|
||||||
dir = dir[1:]
|
reqDir = reqDir[1:]
|
||||||
}
|
}
|
||||||
|
if strings.HasSuffix(reqDir, "/") {
|
||||||
|
// remove trailing "/"
|
||||||
|
reqDir = reqDir[:len(reqDir)-1]
|
||||||
|
}
|
||||||
|
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
|
||||||
|
reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir)
|
||||||
|
|
||||||
|
var contents []ListEntry
|
||||||
|
var commonPrefixes []PrefixEntry
|
||||||
|
var isTruncated bool
|
||||||
|
var doErr error
|
||||||
|
var nextMarker string
|
||||||
|
|
||||||
// check filer
|
// check filer
|
||||||
err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.ListEntriesRequest{
|
_, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) {
|
||||||
Directory: fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, dir),
|
|
||||||
Prefix: prefix,
|
|
||||||
Limit: uint32(maxKeys + 1),
|
|
||||||
StartFromFileName: marker,
|
|
||||||
InclusiveStartFrom: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
stream, err := client.ListEntries(context.Background(), request)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("list buckets: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var contents []ListEntry
|
|
||||||
var commonPrefixes []PrefixEntry
|
|
||||||
var counter int
|
|
||||||
var lastEntryName string
|
|
||||||
var isTruncated bool
|
|
||||||
|
|
||||||
for {
|
|
||||||
resp, recvErr := stream.Recv()
|
|
||||||
if recvErr != nil {
|
|
||||||
if recvErr == io.EOF {
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
return recvErr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
entry := resp.Entry
|
|
||||||
counter++
|
|
||||||
if counter > maxKeys {
|
|
||||||
isTruncated = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
lastEntryName = entry.Name
|
|
||||||
if entry.IsDirectory {
|
if entry.IsDirectory {
|
||||||
if entry.Name != ".uploads" {
|
if delimiter == "/" {
|
||||||
prefix = fmt.Sprintf("%s%s/", dir, entry.Name)
|
prefix = fmt.Sprintf("%s%s/", dir, entry.Name)
|
||||||
|
|
||||||
commonPrefixes = append(commonPrefixes, PrefixEntry{
|
commonPrefixes = append(commonPrefixes, PrefixEntry{
|
||||||
Prefix: prefix,
|
Prefix: prefix[len(bucketPrefix):],
|
||||||
})
|
})
|
||||||
|
|
||||||
if delimiter != "/" {
|
|
||||||
response, _ := s3a.listFilerEntries(bucket, prefix, maxKeys, marker, delimiter)
|
|
||||||
for _, content := range response.Contents {
|
|
||||||
contents = append(contents, content)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
contents = append(contents, ListEntry{
|
contents = append(contents, ListEntry{
|
||||||
Key: fmt.Sprintf("%s%s", dir, entry.Name),
|
Key: fmt.Sprintf("%s%s", dir[len(bucketPrefix):], entry.Name),
|
||||||
LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
|
LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
|
||||||
ETag: "\"" + filer2.ETag(entry) + "\"",
|
ETag: "\"" + filer2.ETag(entry) + "\"",
|
||||||
Size: int64(filer2.TotalSize(entry.Chunks)),
|
Size: int64(filer2.TotalSize(entry.Chunks)),
|
||||||
|
@ -150,29 +149,123 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys
|
||||||
StorageClass: "STANDARD",
|
StorageClass: "STANDARD",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
if doErr != nil {
|
||||||
|
return doErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isTruncated {
|
||||||
|
nextMarker = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
response = ListBucketResult{
|
response = ListBucketResult{
|
||||||
Name: bucket,
|
Name: bucket,
|
||||||
Prefix: originalPrefix,
|
Prefix: originalPrefix,
|
||||||
Marker: marker,
|
Marker: marker,
|
||||||
NextMarker: lastEntryName,
|
NextMarker: nextMarker,
|
||||||
MaxKeys: maxKeys,
|
MaxKeys: maxKeys,
|
||||||
Delimiter: "/",
|
Delimiter: delimiter,
|
||||||
IsTruncated: isTruncated,
|
IsTruncated: isTruncated,
|
||||||
Contents: contents,
|
Contents: contents,
|
||||||
CommonPrefixes: commonPrefixes,
|
CommonPrefixes: commonPrefixes,
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("read directory: %v, found: %v, %+v", request, counter, response)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
|
||||||
|
// invariants
|
||||||
|
// prefix and marker should be under dir, marker may contain "/"
|
||||||
|
// maxKeys should be updated for each recursion
|
||||||
|
|
||||||
|
if prefix == "/" && delimiter == "/" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if maxKeys <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(marker, "/") {
|
||||||
|
sepIndex := strings.Index(marker, "/")
|
||||||
|
subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
|
||||||
|
// println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
|
||||||
|
subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
|
||||||
|
if subErr != nil {
|
||||||
|
err = subErr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
maxKeys -= subCounter
|
||||||
|
nextMarker = subDir + "/" + subNextMarker
|
||||||
|
counter += subCounter
|
||||||
|
// finished processing this sub directory
|
||||||
|
marker = subDir
|
||||||
|
}
|
||||||
|
|
||||||
|
// now marker is also a direct child of dir
|
||||||
|
request := &filer_pb.ListEntriesRequest{
|
||||||
|
Directory: dir,
|
||||||
|
Prefix: prefix,
|
||||||
|
Limit: uint32(maxKeys + 1),
|
||||||
|
StartFromFileName: marker,
|
||||||
|
InclusiveStartFrom: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, listErr := client.ListEntries(context.Background(), request)
|
||||||
|
if listErr != nil {
|
||||||
|
err = fmt.Errorf("list entires %+v: %v", request, listErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
resp, recvErr := stream.Recv()
|
||||||
|
if recvErr != nil {
|
||||||
|
if recvErr == io.EOF {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("iterating entires %+v: %v", request, recvErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if counter >= maxKeys {
|
||||||
|
isTruncated = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry := resp.Entry
|
||||||
|
nextMarker = entry.Name
|
||||||
|
if entry.IsDirectory {
|
||||||
|
// println("ListEntries", dir, "dir:", entry.Name)
|
||||||
|
if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
|
||||||
|
eachEntryFn(dir, entry)
|
||||||
|
if delimiter != "/" {
|
||||||
|
// println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
|
||||||
|
subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn)
|
||||||
|
if subErr != nil {
|
||||||
|
err = fmt.Errorf("doListFilerEntries2: %v", subErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
|
||||||
|
counter += subCounter
|
||||||
|
nextMarker = entry.Name + "/" + subNextMarker
|
||||||
|
if subIsTruncated {
|
||||||
|
isTruncated = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
counter++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// println("ListEntries", dir, "file:", entry.Name)
|
||||||
|
eachEntryFn(dir, entry)
|
||||||
|
counter++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) {
|
func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) {
|
||||||
prefix = values.Get("prefix")
|
prefix = values.Get("prefix")
|
||||||
token = values.Get("continuation-token")
|
token = values.Get("continuation-token")
|
||||||
|
|
|
@ -2,22 +2,11 @@ package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"mime"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"os"
|
"os"
|
||||||
filenamePath "path"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
@ -98,206 +87,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ttlSeconds = int32(ttl.Minutes()) * 60
|
ttlSeconds = int32(ttl.Minutes()) * 60
|
||||||
}
|
}
|
||||||
|
|
||||||
if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked {
|
fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if fs.option.Cipher {
|
|
||||||
reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
|
|
||||||
if err != nil {
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
|
||||||
} else if reply != nil {
|
|
||||||
writeJsonQuiet(w, r, http.StatusCreated, reply)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
|
|
||||||
|
|
||||||
if err != nil || fileId == "" || urlLocation == "" {
|
|
||||||
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
|
|
||||||
|
|
||||||
u, _ := url.Parse(urlLocation)
|
|
||||||
ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// send back post result
|
|
||||||
reply := FilerPostResult{
|
|
||||||
Name: ret.Name,
|
|
||||||
Size: int64(ret.Size),
|
|
||||||
Error: ret.Error,
|
|
||||||
Fid: fileId,
|
|
||||||
Url: urlLocation,
|
|
||||||
}
|
|
||||||
setEtag(w, ret.ETag)
|
|
||||||
writeJsonQuiet(w, r, http.StatusCreated, reply)
|
|
||||||
}
|
|
||||||
|
|
||||||
// update metadata in filer store
|
|
||||||
func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string,
|
|
||||||
collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) {
|
|
||||||
|
|
||||||
stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
|
|
||||||
start := time.Now()
|
|
||||||
defer func() {
|
|
||||||
stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
|
|
||||||
}()
|
|
||||||
|
|
||||||
modeStr := r.URL.Query().Get("mode")
|
|
||||||
if modeStr == "" {
|
|
||||||
modeStr = "0660"
|
|
||||||
}
|
|
||||||
mode, err := strconv.ParseUint(modeStr, 8, 32)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
|
|
||||||
mode = 0660
|
|
||||||
}
|
|
||||||
|
|
||||||
path := r.URL.Path
|
|
||||||
if strings.HasSuffix(path, "/") {
|
|
||||||
if ret.Name != "" {
|
|
||||||
path += ret.Name
|
|
||||||
}
|
|
||||||
}
|
|
||||||
existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
|
|
||||||
crTime := time.Now()
|
|
||||||
if err == nil && existingEntry != nil {
|
|
||||||
crTime = existingEntry.Crtime
|
|
||||||
}
|
|
||||||
entry := &filer2.Entry{
|
|
||||||
FullPath: util.FullPath(path),
|
|
||||||
Attr: filer2.Attr{
|
|
||||||
Mtime: time.Now(),
|
|
||||||
Crtime: crTime,
|
|
||||||
Mode: os.FileMode(mode),
|
|
||||||
Uid: OS_UID,
|
|
||||||
Gid: OS_GID,
|
|
||||||
Replication: replication,
|
|
||||||
Collection: collection,
|
|
||||||
TtlSec: ttlSeconds,
|
|
||||||
Mime: ret.Mime,
|
|
||||||
Md5: md5value,
|
|
||||||
},
|
|
||||||
Chunks: []*filer_pb.FileChunk{{
|
|
||||||
FileId: fileId,
|
|
||||||
Size: uint64(ret.Size),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: ret.ETag,
|
|
||||||
}},
|
|
||||||
}
|
|
||||||
if entry.Attr.Mime == "" {
|
|
||||||
if ext := filenamePath.Ext(path); ext != "" {
|
|
||||||
entry.Attr.Mime = mime.TypeByExtension(ext)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// glog.V(4).Infof("saving %s => %+v", path, entry)
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil {
|
|
||||||
fs.filer.DeleteChunks(entry.Chunks)
|
|
||||||
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
|
|
||||||
err = dbErr
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// send request to volume server
|
|
||||||
func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) {
|
|
||||||
|
|
||||||
stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
|
|
||||||
start := time.Now()
|
|
||||||
defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
|
|
||||||
|
|
||||||
ret = &operation.UploadResult{}
|
|
||||||
|
|
||||||
md5Hash := md5.New()
|
|
||||||
body := r.Body
|
|
||||||
if r.Method == "PUT" {
|
|
||||||
// only PUT or large chunked files has Md5 in attributes
|
|
||||||
body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash))
|
|
||||||
}
|
|
||||||
|
|
||||||
request := &http.Request{
|
|
||||||
Method: r.Method,
|
|
||||||
URL: u,
|
|
||||||
Proto: r.Proto,
|
|
||||||
ProtoMajor: r.ProtoMajor,
|
|
||||||
ProtoMinor: r.ProtoMinor,
|
|
||||||
Header: r.Header,
|
|
||||||
Body: body,
|
|
||||||
Host: r.Host,
|
|
||||||
ContentLength: r.ContentLength,
|
|
||||||
}
|
|
||||||
|
|
||||||
if auth != "" {
|
|
||||||
request.Header.Set("Authorization", "BEARER "+string(auth))
|
|
||||||
}
|
|
||||||
resp, doErr := util.Do(request)
|
|
||||||
if doErr != nil {
|
|
||||||
glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, doErr)
|
|
||||||
err = doErr
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
io.Copy(ioutil.Discard, resp.Body)
|
|
||||||
resp.Body.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
respBody, raErr := ioutil.ReadAll(resp.Body)
|
|
||||||
if raErr != nil {
|
|
||||||
glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, raErr)
|
|
||||||
err = raErr
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(4).Infoln("post result", string(respBody))
|
|
||||||
unmarshalErr := json.Unmarshal(respBody, &ret)
|
|
||||||
if unmarshalErr != nil {
|
|
||||||
glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
|
|
||||||
err = unmarshalErr
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ret.Error != "" {
|
|
||||||
err = errors.New(ret.Error)
|
|
||||||
glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// find correct final path
|
|
||||||
path := r.URL.Path
|
|
||||||
if strings.HasSuffix(path, "/") {
|
|
||||||
if ret.Name != "" {
|
|
||||||
path += ret.Name
|
|
||||||
} else {
|
|
||||||
err = fmt.Errorf("can not to write to folder %s without a file name", path)
|
|
||||||
fs.filer.DeleteFileByFileId(fileId)
|
|
||||||
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// use filer calculated md5 ETag, instead of the volume server crc ETag
|
|
||||||
if r.Method == "PUT" {
|
|
||||||
md5value = md5Hash.Sum(nil)
|
|
||||||
}
|
|
||||||
ret.ETag = getEtag(resp)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// curl -X DELETE http://localhost:8888/path/to
|
// curl -X DELETE http://localhost:8888/path/to
|
||||||
|
|
|
@ -3,9 +3,11 @@ package weed_server
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
|
"hash"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -21,11 +23,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||||
replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool {
|
replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) {
|
||||||
if r.Method != "POST" {
|
|
||||||
glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
|
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
|
||||||
query := r.URL.Query()
|
query := r.URL.Query()
|
||||||
|
@ -35,54 +33,43 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
|
||||||
if maxMB <= 0 && fs.option.MaxMB > 0 {
|
if maxMB <= 0 && fs.option.MaxMB > 0 {
|
||||||
maxMB = int32(fs.option.MaxMB)
|
maxMB = int32(fs.option.MaxMB)
|
||||||
}
|
}
|
||||||
if maxMB <= 0 {
|
|
||||||
glog.V(4).Infoln("AutoChunking not enabled")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
|
|
||||||
|
|
||||||
chunkSize := 1024 * 1024 * maxMB
|
chunkSize := 1024 * 1024 * maxMB
|
||||||
|
|
||||||
contentLength := int64(0)
|
|
||||||
if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
|
|
||||||
contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
|
|
||||||
if contentLength <= int64(chunkSize) {
|
|
||||||
glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if contentLength <= 0 {
|
|
||||||
glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
|
|
||||||
if err != nil {
|
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
|
||||||
} else if reply != nil {
|
|
||||||
writeJsonQuiet(w, r, http.StatusCreated, reply)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
|
||||||
contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
|
|
||||||
|
|
||||||
stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
|
stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
|
stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var reply *FilerPostResult
|
||||||
|
var err error
|
||||||
|
var md5bytes []byte
|
||||||
|
if r.Method == "POST" {
|
||||||
|
reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
|
||||||
|
} else {
|
||||||
|
reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||||
|
} else if reply != nil {
|
||||||
|
if len(md5bytes) > 0 {
|
||||||
|
w.Header().Set("Content-MD5", util.Base64Encode(md5bytes))
|
||||||
|
}
|
||||||
|
writeJsonQuiet(w, r, http.StatusCreated, reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
|
||||||
|
|
||||||
multipartReader, multipartReaderErr := r.MultipartReader()
|
multipartReader, multipartReaderErr := r.MultipartReader()
|
||||||
if multipartReaderErr != nil {
|
if multipartReaderErr != nil {
|
||||||
return nil, multipartReaderErr
|
return nil, nil, multipartReaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
part1, part1Err := multipartReader.NextPart()
|
part1, part1Err := multipartReader.NextPart()
|
||||||
if part1Err != nil {
|
if part1Err != nil {
|
||||||
return nil, part1Err
|
return nil, nil, part1Err
|
||||||
}
|
}
|
||||||
|
|
||||||
fileName := part1.FileName()
|
fileName := part1.FileName()
|
||||||
|
@ -90,46 +77,13 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
|
||||||
fileName = path.Base(fileName)
|
fileName = path.Base(fileName)
|
||||||
}
|
}
|
||||||
contentType := part1.Header.Get("Content-Type")
|
contentType := part1.Header.Get("Content-Type")
|
||||||
|
if contentType == "application/octet-stream" {
|
||||||
var fileChunks []*filer_pb.FileChunk
|
contentType = ""
|
||||||
|
|
||||||
md5Hash := md5.New()
|
|
||||||
var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash))
|
|
||||||
|
|
||||||
chunkOffset := int64(0)
|
|
||||||
|
|
||||||
for chunkOffset < contentLength {
|
|
||||||
limitedReader := io.LimitReader(partReader, int64(chunkSize))
|
|
||||||
|
|
||||||
// assign one file id for one chunk
|
|
||||||
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
|
|
||||||
if assignErr != nil {
|
|
||||||
return nil, assignErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// upload the chunk to the volume server
|
fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
|
||||||
uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
|
if err != nil {
|
||||||
if uploadErr != nil {
|
return nil, nil, err
|
||||||
return nil, uploadErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// if last chunk exhausted the reader exactly at the border
|
|
||||||
if uploadResult.Size == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save to chunk manifest structure
|
|
||||||
fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
|
|
||||||
|
|
||||||
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
|
|
||||||
|
|
||||||
// reset variables for the next chunk
|
|
||||||
chunkOffset = chunkOffset + int64(uploadResult.Size)
|
|
||||||
|
|
||||||
// if last chunk was not at full chunk size, but already exhausted the reader
|
|
||||||
if int64(uploadResult.Size) < int64(chunkSize) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
|
fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
|
||||||
|
@ -138,6 +92,48 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
md5bytes = md5Hash.Sum(nil)
|
||||||
|
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
|
||||||
|
|
||||||
|
fileName := ""
|
||||||
|
contentType := ""
|
||||||
|
|
||||||
|
fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
|
||||||
|
if replyerr != nil {
|
||||||
|
glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
md5bytes = md5Hash.Sum(nil)
|
||||||
|
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) {
|
||||||
|
|
||||||
|
// detect file mode
|
||||||
|
modeStr := r.URL.Query().Get("mode")
|
||||||
|
if modeStr == "" {
|
||||||
|
modeStr = "0660"
|
||||||
|
}
|
||||||
|
mode, err := strconv.ParseUint(modeStr, 8, 32)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
|
||||||
|
mode = 0660
|
||||||
|
}
|
||||||
|
|
||||||
|
// fix the path
|
||||||
path := r.URL.Path
|
path := r.URL.Path
|
||||||
if strings.HasSuffix(path, "/") {
|
if strings.HasSuffix(path, "/") {
|
||||||
if fileName != "" {
|
if fileName != "" {
|
||||||
|
@ -145,20 +141,28 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fix the crTime
|
||||||
|
existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
|
||||||
|
crTime := time.Now()
|
||||||
|
if err == nil && existingEntry != nil {
|
||||||
|
crTime = existingEntry.Crtime
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
glog.V(4).Infoln("saving", path)
|
glog.V(4).Infoln("saving", path)
|
||||||
entry := &filer2.Entry{
|
entry := &filer2.Entry{
|
||||||
FullPath: util.FullPath(path),
|
FullPath: util.FullPath(path),
|
||||||
Attr: filer2.Attr{
|
Attr: filer2.Attr{
|
||||||
Mtime: time.Now(),
|
Mtime: time.Now(),
|
||||||
Crtime: time.Now(),
|
Crtime: crTime,
|
||||||
Mode: 0660,
|
Mode: os.FileMode(mode),
|
||||||
Uid: OS_UID,
|
Uid: OS_UID,
|
||||||
Gid: OS_GID,
|
Gid: OS_GID,
|
||||||
Replication: replication,
|
Replication: replication,
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
TtlSec: ttlSec,
|
TtlSec: ttlSec,
|
||||||
Mime: contentType,
|
Mime: contentType,
|
||||||
Md5: md5Hash.Sum(nil),
|
Md5: md5bytes,
|
||||||
},
|
},
|
||||||
Chunks: fileChunks,
|
Chunks: fileChunks,
|
||||||
}
|
}
|
||||||
|
@ -173,10 +177,52 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
|
||||||
replyerr = dbErr
|
replyerr = dbErr
|
||||||
filerResult.Error = dbErr.Error()
|
filerResult.Error = dbErr.Error()
|
||||||
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
|
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
|
||||||
return
|
}
|
||||||
|
return filerResult, replyerr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) {
|
||||||
|
var fileChunks []*filer_pb.FileChunk
|
||||||
|
|
||||||
|
md5Hash := md5.New()
|
||||||
|
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
|
||||||
|
|
||||||
|
chunkOffset := int64(0)
|
||||||
|
|
||||||
|
for {
|
||||||
|
limitedReader := io.LimitReader(partReader, int64(chunkSize))
|
||||||
|
|
||||||
|
// assign one file id for one chunk
|
||||||
|
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
|
||||||
|
if assignErr != nil {
|
||||||
|
return nil, nil, 0, assignErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
// upload the chunk to the volume server
|
||||||
|
uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
|
||||||
|
if uploadErr != nil {
|
||||||
|
return nil, nil, 0, uploadErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// if last chunk exhausted the reader exactly at the border
|
||||||
|
if uploadResult.Size == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to chunk manifest structure
|
||||||
|
fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
|
||||||
|
|
||||||
|
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
|
||||||
|
|
||||||
|
// reset variables for the next chunk
|
||||||
|
chunkOffset = chunkOffset + int64(uploadResult.Size)
|
||||||
|
|
||||||
|
// if last chunk was not at full chunk size, but already exhausted the reader
|
||||||
|
if int64(uploadResult.Size) < int64(chunkSize) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fileChunks, md5Hash, chunkOffset, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) {
|
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) {
|
||||||
|
@ -209,4 +255,3 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe
|
||||||
return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
|
return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
TtlSec: ttlSeconds,
|
TtlSec: ttlSeconds,
|
||||||
Mime: pu.MimeType,
|
Mime: pu.MimeType,
|
||||||
|
Md5: util.Base64Md5ToBytes(pu.ContentMd5),
|
||||||
},
|
},
|
||||||
Chunks: fileChunks,
|
Chunks: fileChunks,
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,14 +19,13 @@ import (
|
||||||
|
|
||||||
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
|
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
|
||||||
var dn *topology.DataNode
|
var dn *topology.DataNode
|
||||||
t := ms.Topo
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if dn != nil {
|
if dn != nil {
|
||||||
|
|
||||||
// if the volume server disconnects and reconnects quickly
|
// if the volume server disconnects and reconnects quickly
|
||||||
// the unregister and register can race with each other
|
// the unregister and register can race with each other
|
||||||
t.UnRegisterDataNode(dn)
|
ms.Topo.UnRegisterDataNode(dn)
|
||||||
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
|
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
|
||||||
|
|
||||||
message := &master_pb.VolumeLocation{
|
message := &master_pb.VolumeLocation{
|
||||||
|
@ -62,11 +61,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Sequence.SetMax(heartbeat.MaxFileKey)
|
ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
|
||||||
|
|
||||||
if dn == nil {
|
if dn == nil {
|
||||||
dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
|
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
|
||||||
dc := t.GetOrCreateDataCenter(dcName)
|
dc := ms.Topo.GetOrCreateDataCenter(dcName)
|
||||||
rack := dc.GetOrCreateRack(rackName)
|
rack := dc.GetOrCreateRack(rackName)
|
||||||
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
|
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
|
||||||
int(heartbeat.Port), heartbeat.PublicUrl,
|
int(heartbeat.Port), heartbeat.PublicUrl,
|
||||||
|
@ -88,7 +87,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||||
dn.UpAdjustMaxVolumeCountDelta(delta)
|
dn.UpAdjustMaxVolumeCountDelta(delta)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
|
glog.V(5).Infof("master received heartbeat %s", heartbeat.String())
|
||||||
message := &master_pb.VolumeLocation{
|
message := &master_pb.VolumeLocation{
|
||||||
Url: dn.Url(),
|
Url: dn.Url(),
|
||||||
PublicUrl: dn.PublicUrl,
|
PublicUrl: dn.PublicUrl,
|
||||||
|
@ -102,12 +101,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||||
message.DeletedVids = append(message.DeletedVids, volInfo.Id)
|
message.DeletedVids = append(message.DeletedVids, volInfo.Id)
|
||||||
}
|
}
|
||||||
// update master internal volume layouts
|
// update master internal volume layouts
|
||||||
t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
|
ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
|
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
|
||||||
// process heartbeat.Volumes
|
// process heartbeat.Volumes
|
||||||
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
|
newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
|
||||||
|
|
||||||
for _, v := range newVolumes {
|
for _, v := range newVolumes {
|
||||||
glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
|
glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
|
||||||
|
@ -122,7 +121,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||||
if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
|
if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
|
||||||
|
|
||||||
// update master internal volume layouts
|
// update master internal volume layouts
|
||||||
t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
|
ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
|
||||||
|
|
||||||
for _, s := range heartbeat.NewEcShards {
|
for _, s := range heartbeat.NewEcShards {
|
||||||
message.NewVids = append(message.NewVids, s.Id)
|
message.NewVids = append(message.NewVids, s.Id)
|
||||||
|
@ -138,7 +137,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||||
|
|
||||||
if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
|
if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
|
||||||
glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
|
glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
|
||||||
newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
|
newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
|
||||||
|
|
||||||
// broadcast the ec vid changes to master clients
|
// broadcast the ec vid changes to master clients
|
||||||
for _, s := range newShards {
|
for _, s := range newShards {
|
||||||
|
@ -163,7 +162,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||||
}
|
}
|
||||||
|
|
||||||
// tell the volume servers about the leader
|
// tell the volume servers about the leader
|
||||||
newLeader, err := t.Leader()
|
newLeader, err := ms.Topo.Leader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("SendHeartbeat find leader: %v", err)
|
glog.Warningf("SendHeartbeat find leader: %v", err)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -2,7 +2,6 @@ package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
@ -87,12 +86,12 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
|
||||||
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
|
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
|
||||||
if vs.store.MaybeAdjustVolumeMax() {
|
if vs.store.MaybeAdjustVolumeMax() {
|
||||||
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
|
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
|
||||||
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
|
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) {
|
if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() {
|
||||||
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
|
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
|
||||||
newLeader = in.GetLeader()
|
newLeader = in.GetLeader()
|
||||||
doneChan <- nil
|
doneChan <- nil
|
||||||
return
|
return
|
||||||
|
@ -169,13 +168,13 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
case <-volumeTickChan:
|
case <-volumeTickChan:
|
||||||
glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
|
glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
|
||||||
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
|
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
|
||||||
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
|
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
case <-ecShardTickChan:
|
case <-ecShardTickChan:
|
||||||
glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
|
glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
|
||||||
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
|
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
|
||||||
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
|
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -185,16 +184,3 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func isSameIP(ip string, host string) bool {
|
|
||||||
ips, err := net.LookupIP(host)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for _, t := range ips {
|
|
||||||
if ip == t.String() {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
|
reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
|
||||||
if ne != nil {
|
if ne != nil {
|
||||||
writeJsonError(w, r, http.StatusBadRequest, ne)
|
writeJsonError(w, r, http.StatusBadRequest, ne)
|
||||||
return
|
return
|
||||||
|
@ -70,6 +70,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ret.ETag = reqNeedle.Etag()
|
ret.ETag = reqNeedle.Etag()
|
||||||
ret.Mime = string(reqNeedle.Mime)
|
ret.Mime = string(reqNeedle.Mime)
|
||||||
setEtag(w, ret.ETag)
|
setEtag(w, ret.ETag)
|
||||||
|
w.Header().Set("Content-MD5", contentMd5)
|
||||||
writeJsonQuiet(w, r, httpStatus, ret)
|
writeJsonQuiet(w, r, httpStatus, ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,7 @@ func (n *Needle) String() (str string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) {
|
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) {
|
||||||
n = new(Needle)
|
n = new(Needle)
|
||||||
pu, e := ParseUpload(r, sizeLimit)
|
pu, e := ParseUpload(r, sizeLimit)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
|
@ -58,6 +58,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit
|
||||||
originalSize = pu.OriginalDataSize
|
originalSize = pu.OriginalDataSize
|
||||||
n.LastModified = pu.ModifiedTime
|
n.LastModified = pu.ModifiedTime
|
||||||
n.Ttl = pu.Ttl
|
n.Ttl = pu.Ttl
|
||||||
|
contentMd5 = pu.ContentMd5
|
||||||
|
|
||||||
if len(pu.FileName) < 256 {
|
if len(pu.FileName) < 256 {
|
||||||
n.Name = []byte(pu.FileName)
|
n.Name = []byte(pu.FileName)
|
||||||
|
|
|
@ -29,6 +29,7 @@ type ParsedUpload struct {
|
||||||
Ttl *TTL
|
Ttl *TTL
|
||||||
IsChunkedFile bool
|
IsChunkedFile bool
|
||||||
UncompressedData []byte
|
UncompressedData []byte
|
||||||
|
ContentMd5 string
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
|
func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
|
||||||
|
@ -83,11 +84,13 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" {
|
// md5
|
||||||
h := md5.New()
|
h := md5.New()
|
||||||
h.Write(pu.UncompressedData)
|
h.Write(pu.UncompressedData)
|
||||||
if receivedChecksum := base64.StdEncoding.EncodeToString(h.Sum(nil)); expectedChecksum != receivedChecksum {
|
pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil))
|
||||||
e = fmt.Errorf("Content-MD5 did not match md5 of file data [%s] != [%s]", expectedChecksum, receivedChecksum)
|
if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" {
|
||||||
|
if expectedChecksum != pu.ContentMd5 {
|
||||||
|
e = fmt.Errorf("Content-MD5 did not match md5 of file data expected [%s] received [%s] size %d", expectedChecksum, pu.ContentMd5, len(pu.UncompressedData))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,7 +168,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, ver
|
||||||
func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Version) (err error) {
|
func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Version) (err error) {
|
||||||
n.ParseNeedleHeader(bytes)
|
n.ParseNeedleHeader(bytes)
|
||||||
if n.Size != size {
|
if n.Size != size {
|
||||||
return fmt.Errorf("entry not found: offset %d found id %d size %d, expected size %d", offset, n.Id, n.Size, size)
|
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
|
||||||
}
|
}
|
||||||
switch version {
|
switch version {
|
||||||
case Version1:
|
case Version1:
|
||||||
|
|
|
@ -2,6 +2,7 @@ package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
@ -109,8 +110,28 @@ func HashToInt32(data []byte) (v int32) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func Md5(data []byte) string {
|
func Base64Encode(data []byte) string {
|
||||||
|
return base64.StdEncoding.EncodeToString(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Base64Md5(data []byte) string {
|
||||||
|
return Base64Encode(Md5(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
func Md5(data []byte) []byte {
|
||||||
hash := md5.New()
|
hash := md5.New()
|
||||||
hash.Write(data)
|
hash.Write(data)
|
||||||
return fmt.Sprintf("%x", hash.Sum(nil))
|
return hash.Sum(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Md5String(data []byte) string {
|
||||||
|
return fmt.Sprintf("%x", Md5(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
func Base64Md5ToBytes(contentMd5 string) []byte {
|
||||||
|
data, err := base64.StdEncoding.DecodeString(contentMd5)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return data
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 87)
|
VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 88)
|
||||||
COMMIT = ""
|
COMMIT = ""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue