diff --git a/.github/workflows/cleanup.yml b/.github/workflows/cleanup.yml new file mode 100644 index 000000000..d8719c36f --- /dev/null +++ b/.github/workflows/cleanup.yml @@ -0,0 +1,22 @@ +name: Cleanup + +on: + push: + branches: [ master ] + +jobs: + + build: + name: Build + runs-on: ubuntu-latest + + steps: + + - name: Delete old release assets + uses: mknejp/delete-release-assets@v1 + with: + token: ${{ github.token }} + tag: dev + fail-if-no-assets: false + assets: | + weed-* diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 39108c30a..55457ba6a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - goos: [linux, windows, darwin ] + goos: [linux, windows, darwin, freebsd ] goarch: [amd64, arm] exclude: - goarch: arm @@ -24,14 +24,10 @@ jobs: - name: Check out code into the Go module directory uses: actions/checkout@v2 - - name: Delete old release assets - uses: mknejp/delete-release-assets@v1 + - name: Wait for the deletion + uses: jakejarvis/wait-action@master with: - token: ${{ github.token }} - tag: dev - fail-if-no-assets: false - assets: | - weed-* + time: '30s' - name: Set BUILD_TIME env run: echo BUILD_TIME=$(date -u +%Y-%m-%d-%H-%M) >> ${GITHUB_ENV} diff --git a/README.md b/README.md index 70e2ba0b2..01be6d791 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,8 @@ SeaweedFS started by implementing [Facebook's Haystack design paper](http://www. On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, MemSql, TiDB, Etcd, CockroachDB, etc. +For any distributed key value stores, the large values can be offloaded to SeaweedFS. With the fast access speed and linearly scalable capacity, SeaweedFS can work as a distributed [Key-Large-Value store][KeyLargeValueStore]. + [Back to TOC](#table-of-contents) ## Additional Features ## @@ -117,7 +119,10 @@ On top of the object store, optional [Filer] can support directories and POSIX a * [WebDAV] accesses as a mapped drive on Mac and Windows, or from mobile devices. * [AES256-GCM Encrypted Storage][FilerDataEncryption] safely stores the encrypted data. * [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB. + +## Kubernetes ## * [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/) +* [SeaweedFS Operator](https://github.com/seaweedfs/seaweedfs-operator) [Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files [SuperLargeFiles]: https://github.com/chrislusf/seaweedfs/wiki/Data-Structure-for-Large-Files @@ -134,6 +139,7 @@ On top of the object store, optional [Filer] can support directories and POSIX a [SeaweedFsCsiDriver]: https://github.com/seaweedfs/seaweedfs-csi-driver [ActiveActiveAsyncReplication]: https://github.com/chrislusf/seaweedfs/wiki/Filer-Active-Active-cross-cluster-continuous-synchronization [FilerStoreReplication]: https://github.com/chrislusf/seaweedfs/wiki/Filer-Store-Replication +[KeyLargeValueStore]: https://github.com/chrislusf/seaweedfs/wiki/Filer-as-a-Key-Large-Value-Store [Back to TOC](#table-of-contents) @@ -335,7 +341,7 @@ Usually hot data are fresh and warm data are old. SeaweedFS puts the newly creat With the O(1) access time, the network latency cost is kept at minimum. -If the hot~warm data is split as 20~80, with 20 servers, you can achieve storage capacity of 100 servers. That's a cost saving of 80%! Or you can repurpose the 80 servers to store new data also, and get 5X storage throughput. +If the hot/warm data is split as 20/80, with 20 servers, you can achieve storage capacity of 100 servers. That's a cost saving of 80%! Or you can repurpose the 80 servers to store new data also, and get 5X storage throughput. [Back to TOC](#table-of-contents) @@ -403,7 +409,7 @@ SeaweedFS has a centralized master group to look up free volumes, while Ceph use Same as SeaweedFS, Ceph is also based on the object store RADOS. Ceph is rather complicated with mixed reviews. -Ceph uses CRUSH hashing to automatically manage the data placement. SeaweedFS places data by assigned volumes. +Ceph uses CRUSH hashing to automatically manage the data placement, which is efficient to locate the data. But the data has to be placed according to the CRUSH algorithm. Any wrong configuration would cause data loss. SeaweedFS places data by assigning them to any writable volumes. If writes to one volume failed, just pick another volume to write. Adding more volumes are also as simple as it can be. SeaweedFS is optimized for small files. Small files are stored as one continuous block of content, with at most 8 unused bytes between files. Small file access is O(1) disk read. @@ -436,9 +442,7 @@ MinIO has specific requirements on storage layout. It is not flexible to adjust ## Dev Plan ## -* More tools and documentation, on how to manage and scale the system. For example, how to move volumes, automatically balancing data, how to grow volumes, how to check system status, etc. -* Integrate with Kubernetes. build [SeaweedFS Operator](https://github.com/seaweedfs/seaweedfs-operator). -* Add ftp server. +* More tools and documentation, on how to manage and scale the system. * Read and write stream data. * Support structured data. diff --git a/docker/Dockerfile.go_build b/docker/Dockerfile.go_build index 726046b56..36d9fd07a 100644 --- a/docker/Dockerfile.go_build +++ b/docker/Dockerfile.go_build @@ -1,5 +1,5 @@ FROM frolvlad/alpine-glibc as builder -RUN apk add git go g++ +RUN apk add git go g++ fuse RUN mkdir -p /go/src/github.com/chrislusf/ RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs ARG BRANCH=${BRANCH:-master} @@ -14,6 +14,7 @@ COPY --from=builder /root/go/bin/weed /usr/bin/ RUN mkdir -p /etc/seaweedfs COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh +RUN apk add fuse # for weed mount # volume server gprc port EXPOSE 18080 diff --git a/docker/Dockerfile.go_build_large b/docker/Dockerfile.go_build_large index 8fc85e868..08cfbd547 100644 --- a/docker/Dockerfile.go_build_large +++ b/docker/Dockerfile.go_build_large @@ -1,5 +1,5 @@ FROM frolvlad/alpine-glibc as builder -RUN apk add git go g++ +RUN apk add git go g++ fuse RUN mkdir -p /go/src/github.com/chrislusf/ RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs ARG BRANCH=${BRANCH:-master} @@ -14,6 +14,7 @@ COPY --from=builder /root/go/bin/weed /usr/bin/ RUN mkdir -p /etc/seaweedfs COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh +RUN apk add fuse # for weed mount # volume server gprc port EXPOSE 18080 diff --git a/docker/Dockerfile.local b/docker/Dockerfile.local index 693d8a952..199c21e81 100644 --- a/docker/Dockerfile.local +++ b/docker/Dockerfile.local @@ -4,6 +4,7 @@ COPY ./weed /usr/bin/ RUN mkdir -p /etc/seaweedfs COPY ./filer.toml /etc/seaweedfs/filer.toml COPY ./entrypoint.sh /entrypoint.sh +RUN apk add fuse # for weed mount # volume server grpc port EXPOSE 18080 diff --git a/docker/local-dev-compose.yml b/docker/local-dev-compose.yml index f6fd0f4ce..752086acd 100644 --- a/docker/local-dev-compose.yml +++ b/docker/local-dev-compose.yml @@ -12,7 +12,7 @@ services: ports: - 8080:8080 - 18080:18080 - command: "volume -mserver=master:9333 -port=8080 -ip=volume" + command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1" depends_on: - master filer: @@ -33,3 +33,14 @@ services: - master - volume - filer + mount: + image: chrislusf/seaweedfs:local + privileged: true + cap_add: + - SYS_ADMIN + mem_limit: 4096m + command: '-v=4 mount -filer="filer:8888" -dirAutoCreate -dir=/mnt/seaweedfs -cacheCapacityMB=0 ' + depends_on: + - master + - volume + - filer diff --git a/go.mod b/go.mod index d93a44379..467b3b1fc 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/aws/aws-sdk-go v1.33.5 github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 github.com/cespare/xxhash v1.1.0 - github.com/chrislusf/raft v1.0.3 + github.com/chrislusf/raft v1.0.4 github.com/coreos/go-semver v0.3.0 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/disintegration/imaging v1.6.2 @@ -66,6 +66,7 @@ require ( github.com/spf13/viper v1.4.0 github.com/stretchr/testify v1.6.1 github.com/syndtr/goleveldb v1.0.0 + github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c github.com/tidwall/gjson v1.3.2 github.com/tidwall/match v1.0.1 github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365 diff --git a/go.sum b/go.sum index 66fb72a6b..6f4519aef 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chrislusf/raft v1.0.3 h1:11YrnzJtVa5z7m9lhY2p8VcPHoUlC1UswyoAo+U1m1k= github.com/chrislusf/raft v1.0.3/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= +github.com/chrislusf/raft v1.0.4 h1:THhbsVik2hxdE0/VXX834f64Wn9RzgVPp+E+XCWZdKM= +github.com/chrislusf/raft v1.0.4/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -225,6 +227,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -542,6 +545,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhD github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -551,8 +555,6 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/seaweedfs/fuse v1.0.7 h1:tESMXhI3gXzN+dlWsCUrkIZDiWA4dZX18rQMoqmvazw= -github.com/seaweedfs/fuse v1.0.7/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8= github.com/seaweedfs/fuse v1.0.8 h1:HBPJTC77OlxwSd2JiTwvLPn8bWTElqQp3xs9vf3C15s= github.com/seaweedfs/fuse v1.0.8/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8= github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E= @@ -618,6 +620,8 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c h1:g+WoO5jjkqGAzHWCjJB1zZfXPIAaDpzXIEJ0eS6B5Ok= +github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8= github.com/tidwall/gjson v1.3.2 h1:+7p3qQFaH3fOMXAJSrdZwGKcOO/lYdGS0HqGhPqDdTI= github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= @@ -941,7 +945,9 @@ honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXe honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= modernc.org/b v1.0.0 h1:vpvqeyp17ddcQWF29Czawql4lDdABCDRbXRAS4+aF2o= modernc.org/b v1.0.0/go.mod h1:uZWcZfRj1BpYzfN9JTerzlNUnnPsV9O2ZA8JsRcubNg= +modernc.org/mathutil v1.1.1 h1:FeylZSVX8S+58VsyJlkEj2bcpdytmp9MmDKZkKx8OIE= modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc= modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 0e8ce3a35..9d621a496 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "2.17" + imageTag: "2.19" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/command/filer.go b/weed/command/filer.go index a3008eb29..146840e00 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -74,6 +74,7 @@ func init() { filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file") filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file") filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file") + filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") } var cmdFiler = &Command{ diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 83cb352ff..7e75b082d 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -58,6 +58,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { return true } + util.LoadConfiguration("security", false) // try to connect to filer, filerBucketsPath may be useful later grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool @@ -78,8 +79,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { dir := util.ResolvePath(*option.dir) chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB - util.LoadConfiguration("security", false) - fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) if dir == "" { fmt.Printf("Please specify the mount directory via \"-dir\"") diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 6cfd46427..600a4a8d4 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -44,6 +44,8 @@ func runScaffold(cmd *Command, args []string) bool { content = SECURITY_TOML_EXAMPLE case "master": content = MASTER_TOML_EXAMPLE + case "shell": + content = SHELL_TOML_EXAMPLE } if content == "" { println("need a valid -config option") @@ -85,7 +87,13 @@ buckets_folder = "/buckets" # local on disk, mostly for simple single-machine setup, fairly scalable # faster than previous leveldb, recommended. enabled = true -dir = "." # directory to store level db files +dir = "./filerldb2" # directory to store level db files + +[rocksdb] +# local on disk, similar to leveldb +# since it is using a C wrapper, you need to install rocksdb and build it by yourself +enabled = false +dir = "./filerrdb" # directory to store rocksdb files [mysql] # or tidb # CREATE TABLE IF NOT EXISTS filemeta ( @@ -459,5 +467,19 @@ copy_other = 1 # create n x 1 = n actual volumes # if you are doing your own replication or periodic sync of volumes. treat_replication_as_minimums = false +` + SHELL_TOML_EXAMPLE = ` + +[cluster] +default = "c1" + +[cluster.c1] +master = "localhost:9333" # comma-separated master servers +filer = "localhost:8888" # filer host and port + +[cluster.c2] +master = "" +filer = "" + ` ) diff --git a/weed/command/server.go b/weed/command/server.go index 7e63f8e8a..bd25f94b1 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -113,6 +113,7 @@ func init() { s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file") s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file") + s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") diff --git a/weed/command/shell.go b/weed/command/shell.go index 6dd768f47..c9976e809 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -11,12 +11,14 @@ import ( var ( shellOptions shell.ShellOptions shellInitialFiler *string + shellCluster *string ) func init() { cmdShell.Run = runShell // break init cycle - shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers") - shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port") + shellOptions.Masters = cmdShell.Flag.String("master", "", "comma-separated master servers, e.g. localhost:9333") + shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port, e.g. localhost:8888") + shellCluster = cmdShell.Flag.String("cluster", "", "cluster defined in shell.toml") } var cmdShell = &Command{ @@ -24,6 +26,8 @@ var cmdShell = &Command{ Short: "run interactive administrative commands", Long: `run interactive administrative commands. + Generate shell.toml via "weed scaffold -config=shell" + `, } @@ -32,6 +36,23 @@ func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + if *shellOptions.Masters == "" && *shellInitialFiler == "" { + util.LoadConfiguration("shell", false) + v := util.GetViper() + cluster := v.GetString("cluster.default") + if *shellCluster != "" { + cluster = *shellCluster + } + if cluster == "" { + *shellOptions.Masters, *shellInitialFiler = "localhost:9333", "localhost:8888" + } else { + *shellOptions.Masters = v.GetString("cluster." + cluster + ".master") + *shellInitialFiler = v.GetString("cluster." + cluster + ".filer") + } + } + + fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler) + var err error shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler) if err != nil { diff --git a/weed/command/upload.go b/weed/command/upload.go index 45b15535b..7115da587 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -1,8 +1,12 @@ package command import ( + "context" "encoding/json" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" "os" "path/filepath" @@ -65,6 +69,15 @@ func runUpload(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master) + if err != nil { + fmt.Printf("upload: %v", err) + return false + } + if *upload.replication == "" { + *upload.replication = defaultCollection + } + if len(args) == 0 { if *upload.dir == "" { return false @@ -104,3 +117,15 @@ func runUpload(cmd *Command, args []string) bool { } return true } + +func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) { + err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", masterAddress, err) + } + replication = resp.DefaultReplication + return nil + }) + return +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 13dedea1e..920d79da5 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -134,69 +134,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr return nil } - dirParts := strings.Split(string(entry.FullPath), "/") - - // fmt.Printf("directory parts: %+v\n", dirParts) - - var lastDirectoryEntry *Entry - - for i := 1; i < len(dirParts); i++ { - dirPath := "/" + util.Join(dirParts[:i]...) - // fmt.Printf("%d directory: %+v\n", i, dirPath) - - // check the store directly - glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) - - // no such existing directory - if dirEntry == nil { - - // create the directory - now := time.Now() - - dirEntry = &Entry{ - FullPath: util.FullPath(dirPath), - Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | entry.Mode | 0110, - Uid: entry.Uid, - Gid: entry.Gid, - Collection: entry.Collection, - Replication: entry.Replication, - UserName: entry.UserName, - GroupNames: entry.GroupNames, - }, - } - - glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) - mkdirErr := f.Store.InsertEntry(ctx, dirEntry) - if mkdirErr != nil { - if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { - glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) - return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) - } - } else { - f.maybeAddBucket(dirEntry) - f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) - } - - } else if !dirEntry.IsDirectory() { - glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) - return fmt.Errorf("%s is a file", dirPath) - } - - // remember the direct parent directory entry - if i == len(dirParts)-1 { - lastDirectoryEntry = dirEntry - } - - } - - if lastDirectoryEntry == nil { - glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath) - return fmt.Errorf("parent folder not found: %v", entry.FullPath) - } + oldEntry, _ := f.FindEntry(ctx, entry.FullPath) /* if !hasWritePermission(lastDirectoryEntry, entry) { @@ -206,9 +144,13 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } */ - oldEntry, _ := f.FindEntry(ctx, entry.FullPath) - if oldEntry == nil { + + dirParts := strings.Split(string(entry.FullPath), "/") + if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { + return err + } + glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name()) if err := f.Store.InsertEntry(ctx, entry); err != nil { glog.Errorf("insert entry %s: %v", entry.FullPath, err) @@ -236,6 +178,65 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr return nil } +func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) { + + if level == 0 { + return nil + } + + dirPath := "/" + util.Join(dirParts[:level]...) + // fmt.Printf("%d directory: %+v\n", i, dirPath) + + // check the store directly + glog.V(4).Infof("find uncached directory: %s", dirPath) + dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) + + // no such existing directory + if dirEntry == nil { + + // ensure parent directory + if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil { + return err + } + + // create the directory + now := time.Now() + + dirEntry = &Entry{ + FullPath: util.FullPath(dirPath), + Attr: Attr{ + Mtime: now, + Crtime: now, + Mode: os.ModeDir | entry.Mode | 0110, + Uid: entry.Uid, + Gid: entry.Gid, + Collection: entry.Collection, + Replication: entry.Replication, + UserName: entry.UserName, + GroupNames: entry.GroupNames, + }, + } + + glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) + mkdirErr := f.Store.InsertEntry(ctx, dirEntry) + if mkdirErr != nil { + if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { + glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) + return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) + } + } else { + f.maybeAddBucket(dirEntry) + f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) + } + + } else if !dirEntry.IsDirectory() { + glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) + return fmt.Errorf("%s is a file", dirPath) + } + + return nil +} + func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) { if oldEntry != nil { entry.Attr.Crtime = oldEntry.Attr.Crtime diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go index 4d4f4abc3..87ad9e452 100644 --- a/weed/filer/filer_buckets.go +++ b/weed/filer/filer_buckets.go @@ -29,7 +29,7 @@ func (f *Filer) LoadBuckets() { limit := math.MaxInt32 - entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "") + entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "") if err != nil { glog.V(1).Infof("no buckets found: %v", err) diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index da92c4f4b..0d1fd7e47 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -68,7 +68,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry lastFileName := "" includeLastFile := false for { - entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "") + entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "") if err != nil { glog.Errorf("list folder %s: %v", entry.FullPath, err) return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 40755e6a7..563e0eb51 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -113,13 +113,13 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() - dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "") + dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "") if listDayErr != nil { return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr) } for _, dayEntry := range dayEntries { // println("checking day", dayEntry.FullPath) - hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "") + hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "") if listHourMinuteErr != nil { return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) } diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 2fe83b49d..7489d8e34 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -19,12 +19,16 @@ func splitPattern(pattern string) (prefix string, restPattern string) { return "", restPattern } -func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, namePattern string) (entries []*Entry, err error) { +// For now, prefix and namePattern are mutually exclusive +func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string, namePattern string) (entries []*Entry, err error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] } - prefix, restNamePattern := splitPattern(namePattern) + prefixInNamePattern, restNamePattern := splitPattern(namePattern) + if prefixInNamePattern != "" { + prefix = prefixInNamePattern + } var missedCount int var lastFileName string diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index b879f3a6e..1aefbf5d4 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -170,8 +170,12 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath we func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix) + lastFileStart := directoryPrefix + if startFileName != "" { + lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName) + } - iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil) + iter := store.db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil) for iter.Next() { key := iter.Key() if !bytes.HasPrefix(key, directoryPrefix) { diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go index c5bfb8474..edf629814 100644 --- a/weed/filer/leveldb/leveldb_store_test.go +++ b/weed/filer/leveldb/leveldb_store_test.go @@ -2,9 +2,11 @@ package leveldb import ( "context" + "fmt" "io/ioutil" "os" "testing" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/util" @@ -49,14 +51,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "") + entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") + entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") + entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") if err != nil { t.Errorf("list entries: %v", err) return @@ -86,3 +88,28 @@ func TestEmptyRoot(t *testing.T) { } } + +func BenchmarkInsertEntry(b *testing.B) { + testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench") + defer os.RemoveAll(dir) + store := &LevelDBStore{} + store.initialize(dir) + testFiler.SetStore(store) + + ctx := context.Background() + + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + entry := &filer.Entry{ + FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)), + Attr: filer.Attr{ + Crtime: time.Now(), + Mtime: time.Now(), + Mode: os.FileMode(0644), + }, + } + store.InsertEntry(ctx, entry) + } +} diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 4b41554b9..ceda64153 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -179,7 +179,10 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath w func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, prefix, store.dbCount) - lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount) + lastFileStart := directoryPrefix + if startFileName != "" { + lastFileStart, _ = genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount) + } iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil) for iter.Next() { diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go index 22c0d6052..a7f5ae95b 100644 --- a/weed/filer/leveldb2/leveldb2_store_test.go +++ b/weed/filer/leveldb2/leveldb2_store_test.go @@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "") + entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") + entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") + entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer/rocksdb/README.md b/weed/filer/rocksdb/README.md new file mode 100644 index 000000000..6bae6d34e --- /dev/null +++ b/weed/filer/rocksdb/README.md @@ -0,0 +1,41 @@ +# Prepare the compilation environment on linux +- sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test +- sudo apt-get update -qq +- sudo apt-get install gcc-6 g++-6 libsnappy-dev zlib1g-dev libbz2-dev -qq +- export CXX="g++-6" CC="gcc-6" + +- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags2_2.0-1.1ubuntu1_amd64.deb +- sudo dpkg -i libgflags2_2.0-1.1ubuntu1_amd64.deb +- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags-dev_2.0-1.1ubuntu1_amd64.deb +- sudo dpkg -i libgflags-dev_2.0-1.1ubuntu1_amd64.deb + +# Prepare the compilation environment on mac os +``` +brew install snappy +``` + +# install rocksdb: +``` + export ROCKSDB_HOME=/Users/chris/dev/rocksdb + + git clone https://github.com/facebook/rocksdb.git $ROCKSDB_HOME + pushd $ROCKSDB_HOME + make clean + make install-static + popd +``` + +# install gorocksdb + +``` +export CGO_CFLAGS="-I$ROCKSDB_HOME/include" +export CGO_LDFLAGS="-L$ROCKSDB_HOME -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd" + +go get github.com/tecbot/gorocksdb +``` +# compile with rocksdb + +``` +cd ~/go/src/github.com/chrislusf/seaweedfs/weed +go install -tags rocksdb +``` diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go new file mode 100644 index 000000000..a8992cf03 --- /dev/null +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -0,0 +1,293 @@ +// +build rocksdb + +package rocksdb + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" + "github.com/tecbot/gorocksdb" + "io" +) + +func init() { + filer.Stores = append(filer.Stores, &RocksDBStore{}) +} + +type RocksDBStore struct { + path string + db *gorocksdb.DB +} + +func (store *RocksDBStore) GetName() string { + return "rocksdb" +} + +func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + dir := configuration.GetString(prefix + "dir") + return store.initialize(dir) +} + +func (store *RocksDBStore) initialize(dir string) (err error) { + glog.Infof("filer store rocksdb dir: %s", dir) + if err := weed_util.TestFolderWritable(dir); err != nil { + return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) + } + + options := gorocksdb.NewDefaultOptions() + options.SetCreateIfMissing(true) + store.db, err = gorocksdb.OpenDb(options, dir) + + return +} + +func (store *RocksDBStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *RocksDBStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *RocksDBStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.DirAndName() + key := genKey(dir, name) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + wo := gorocksdb.NewDefaultWriteOptions() + err = store.db.Put(wo, key, value) + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) + + return nil +} + +func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + key := genKey(dir, name) + + ro := gorocksdb.NewDefaultReadOptions() + data, err := store.db.GetBytes(ro, key) + + if data == nil { + return nil, filer_pb.ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) + + return entry, nil +} + +func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + key := genKey(dir, name) + + wo := gorocksdb.NewDefaultWriteOptions() + err = store.db.Delete(wo, key) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + batch := new(gorocksdb.WriteBatch) + + ro := gorocksdb.NewDefaultReadOptions() + ro.SetFillCache(false) + iter := store.db.NewIterator(ro) + defer iter.Close() + err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool { + batch.Delete(key) + return true + }) + if err != nil { + return fmt.Errorf("delete list %s : %v", fullpath, err) + } + + wo := gorocksdb.NewDefaultWriteOptions() + err = store.db.Write(wo, batch) + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error { + + if len(lastKey) == 0 { + iter.Seek(prefix) + } else { + iter.Seek(lastKey) + + if !includeLastKey { + k := iter.Key() + v := iter.Value() + key := k.Data() + defer k.Free() + defer v.Free() + + if !bytes.HasPrefix(key, prefix) { + return nil + } + + if bytes.Equal(key, lastKey) { + iter.Next() + } + + } + } + + i := 0 + for ; iter.Valid(); iter.Next() { + + if limit > 0 { + i++ + if i > limit { + break + } + } + + k := iter.Key() + v := iter.Value() + key := k.Data() + value := v.Data() + + if !bytes.HasPrefix(key, prefix) { + k.Free() + v.Free() + break + } + + ret := fn(key, value) + + k.Free() + v.Free() + + if !ret { + break + } + + } + + if err := iter.Err(); err != nil { + return fmt.Errorf("prefix scan iterator: %v", err) + } + return nil +} + +func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer.Entry, err error) { + return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "") +} + +func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + + directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix) + lastFileStart := directoryPrefix + if startFileName != "" { + lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName) + } + + ro := gorocksdb.NewDefaultReadOptions() + ro.SetFillCache(false) + iter := store.db.NewIterator(ro) + defer iter.Close() + err = enumerate(iter, directoryPrefix, lastFileStart, inclusive, limit, func(key, value []byte) bool { + fileName := getNameFromKey(key) + if fileName == "" { + return true + } + limit-- + if limit < 0 { + return false + } + entry := &filer.Entry{ + FullPath: weed_util.NewFullPath(string(fullpath), fileName), + } + + // println("list", entry.FullPath, "chunks", len(entry.Chunks)) + if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(value)); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + return false + } + entries = append(entries, entry) + return true + }) + if err != nil { + return entries, fmt.Errorf("prefix list %s : %v", fullpath, err) + } + + return entries, err +} + +func genKey(dirPath, fileName string) (key []byte) { + key = hashToBytes(dirPath) + key = append(key, []byte(fileName)...) + return key +} + +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = hashToBytes(string(fullpath)) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func getNameFromKey(key []byte) string { + + return string(key[md5.Size:]) + +} + +// hash directory, and use last byte for partitioning +func hashToBytes(dir string) []byte { + h := md5.New() + io.WriteString(h, dir) + + b := h.Sum(nil) + + return b +} + +func (store *RocksDBStore) Shutdown() { + store.db.Close() +} diff --git a/weed/filer/rocksdb/rocksdb_store_kv.go b/weed/filer/rocksdb/rocksdb_store_kv.go new file mode 100644 index 000000000..093a905e8 --- /dev/null +++ b/weed/filer/rocksdb/rocksdb_store_kv.go @@ -0,0 +1,51 @@ +// +build rocksdb + +package rocksdb + +import ( + "context" + "fmt" + "github.com/tecbot/gorocksdb" + + "github.com/chrislusf/seaweedfs/weed/filer" +) + +func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + wo := gorocksdb.NewDefaultWriteOptions() + err = store.db.Put(wo, key, value) + + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + + ro := gorocksdb.NewDefaultReadOptions() + value, err = store.db.GetBytes(ro, key) + + if value == nil { + return nil, filer.ErrKvNotFound + } + + if err != nil { + return nil, fmt.Errorf("kv get: %v", err) + } + + return +} + +func (store *RocksDBStore) KvDelete(ctx context.Context, key []byte) (err error) { + + wo := gorocksdb.NewDefaultWriteOptions() + err = store.db.Delete(wo, key) + + if err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go new file mode 100644 index 000000000..5c48e9bd9 --- /dev/null +++ b/weed/filer/rocksdb/rocksdb_store_test.go @@ -0,0 +1,117 @@ +// +build rocksdb + +package rocksdb + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestCreateAndFind(t *testing.T) { + testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") + defer os.RemoveAll(dir) + store := &RocksDBStore{} + store.initialize(dir) + testFiler.SetStore(store) + + fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") + + ctx := context.Background() + + entry1 := &filer.Entry{ + FullPath: fullpath, + Attr: filer.Attr{ + Mode: 0440, + Uid: 1234, + Gid: 5678, + }, + } + + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + t.Errorf("create entry %v: %v", entry1.FullPath, err) + return + } + + entry, err := testFiler.FindEntry(ctx, fullpath) + + if err != nil { + t.Errorf("find entry: %v", err) + return + } + + if entry.FullPath != entry1.FullPath { + t.Errorf("find wrong entry: %v", entry.FullPath) + return + } + + // checking one upper directory + entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "") + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + + // checking one upper directory + entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} + +func TestEmptyRoot(t *testing.T) { + testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") + defer os.RemoveAll(dir) + store := &RocksDBStore{} + store.initialize(dir) + testFiler.SetStore(store) + + ctx := context.Background() + + // checking one upper directory + entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + if err != nil { + t.Errorf("list entries: %v", err) + return + } + if len(entries) != 0 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} + +func BenchmarkInsertEntry(b *testing.B) { + testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench") + defer os.RemoveAll(dir) + store := &RocksDBStore{} + store.initialize(dir) + testFiler.SetStore(store) + + ctx := context.Background() + + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + entry := &filer.Entry{ + FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)), + Attr: filer.Attr{ + Crtime: time.Now(), + Mtime: time.Now(), + Mode: os.FileMode(0644), + }, + } + store.InsertEntry(ctx, entry) + } +} diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 25843c892..e785b68a9 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -32,7 +32,7 @@ type FilePart struct { type SubmitResult struct { FileName string `json:"fileName,omitempty"` - FileUrl string `json:"fileUrl,omitempty"` + FileUrl string `json:"url,omitempty"` Fid string `json:"fid,omitempty"` Size uint32 `json:"size,omitempty"` Error string `json:"error,omitempty"` @@ -69,6 +69,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart file.Replication = replication file.Collection = collection file.DataCenter = dataCenter + file.Ttl = ttl results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index c305fee6f..b8af6381a 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -156,7 +156,36 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt // check whether the request has valid access keys func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) (*Identity, s3err.ErrorCode) { - identity, s3Err := iam.authUser(r) + var identity *Identity + var s3Err s3err.ErrorCode + var found bool + switch getRequestAuthType(r) { + case authTypeStreamingSigned: + return identity, s3err.ErrNone + case authTypeUnknown: + glog.V(3).Infof("unknown auth type") + return identity, s3err.ErrAccessDenied + case authTypePresignedV2, authTypeSignedV2: + glog.V(3).Infof("v2 auth type") + identity, s3Err = iam.isReqAuthenticatedV2(r) + case authTypeSigned, authTypePresigned: + glog.V(3).Infof("v4 auth type") + identity, s3Err = iam.reqSignatureV4Verify(r) + case authTypePostPolicy: + glog.V(3).Infof("post policy auth type") + return identity, s3err.ErrNone + case authTypeJWT: + glog.V(3).Infof("jwt auth type") + return identity, s3err.ErrNotImplemented + case authTypeAnonymous: + identity, found = iam.lookupAnonymous() + if !found { + return identity, s3err.ErrAccessDenied + } + default: + return identity, s3err.ErrNotImplemented + } + if s3Err != s3err.ErrNone { return identity, s3Err } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 5f1b2d819..efc06f0d6 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -61,7 +61,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix) + entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix, "") if err != nil { return err @@ -326,7 +326,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures) resp = &filer_pb.DeleteEntryResponse{} - if err != nil { + if err != nil && err != filer_pb.ErrNotFound { resp.Error = err.Error() } return resp, nil diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index fa86737ac..a0c9aedb9 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -75,7 +75,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. includeLastFile := false for { - entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "") + entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "") if err != nil { return err } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index f303ba1d4..ba01ce4f7 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -36,7 +36,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") namePattern := r.FormValue("namePattern") - entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, namePattern) + entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "", namePattern) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index eee39152b..fe4e68140 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -111,7 +111,10 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := "" - contentType := "" + contentType := r.Header.Get("Content-Type") + if contentType == "application/octet-stream" { + contentType = "" + } fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so) if err != nil { diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go new file mode 100644 index 000000000..5fcc7e88f --- /dev/null +++ b/weed/server/filer_server_rocksdb.go @@ -0,0 +1,7 @@ +// +build rocksdb + +package weed_server + +import ( + _ "github.com/chrislusf/seaweedfs/weed/filer/rocksdb" +) diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index 8f457be1d..7201503f1 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -193,9 +193,9 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error mtype = contentType } - pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" - // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" } + pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" + // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" return } diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 0a4df63d0..eaed51654 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -151,7 +151,10 @@ func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, erro } func (dn *DataNode) GetDataCenter() *DataCenter { - return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) + rack := dn.Parent() + dcNode := rack.Parent() + dcValue := dcNode.GetValue() + return dcValue.(*DataCenter) } func (dn *DataNode) GetRack() *Rack { diff --git a/weed/util/constants.go b/weed/util/constants.go index 95370746b..cfb33516b 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 17) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 19) COMMIT = "" )