diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml
index 65e7c3ec6..a23a682d1 100644
--- a/.github/workflows/codeql.yml
+++ b/.github/workflows/codeql.yml
@@ -3,6 +3,10 @@ name: "Code Scanning - Action"
on:
pull_request:
+concurrency:
+ group: ${{ github.head_ref }}/codeql
+ cancel-in-progress: true
+
jobs:
CodeQL-Build:
# CodeQL runs on ubuntu-latest, windows-latest, and macos-latest
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
new file mode 100644
index 000000000..6bca28820
--- /dev/null
+++ b/.github/workflows/e2e.yml
@@ -0,0 +1,89 @@
+name: "End to End"
+
+on:
+ push:
+ branches: [ master ]
+ pull_request:
+ branches: [ master ]
+
+concurrency:
+ group: ${{ github.head_ref }}/e2e
+ cancel-in-progress: true
+
+permissions:
+ contents: read
+
+defaults:
+ run:
+ working-directory: docker
+
+jobs:
+ e2e:
+ name: FUSE Mount
+ runs-on: ubuntu-22.04
+ timeout-minutes: 15
+ steps:
+ - name: Set up Go 1.x
+ uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f # v2
+ with:
+ go-version: ^1.13
+ id: go
+
+ - name: Check out code into the Go module directory
+ uses: actions/checkout@2541b1294d2704b0964813337f33b291d3f8596b # v2
+
+ - name: Install dependencies
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y fuse
+
+ - name: Start SeaweedFS
+ timeout-minutes: 5
+ run: make build_e2e && docker compose -f ./compose/e2e-mount.yml up --wait
+
+ - name: Run FIO
+ timeout-minutes: 5
+ run: |
+ echo "Starting FIO at: $(date)"
+ # Concurrent r/w
+ echo 'Run randrw with size=16M bs=4k'
+ docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randrw --bs=4k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1
+
+ echo 'Run randrw with size=16M bs=128k'
+ docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randrw --bs=128k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1
+
+ echo 'Run randrw with size=16M bs=1m'
+ docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randrw --bs=1m --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1
+
+ # Verified write
+ echo 'Run randwrite with size=16M bs=4k'
+ docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randwrite --bs=4k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1 --do_verify=0 --verify=crc32c --verify_backlog=1
+
+ echo 'Run randwrite with size=16M bs=128k'
+ docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randwrite --bs=128k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1 --do_verify=0 --verify=crc32c --verify_backlog=1
+
+ echo 'Run randwrite with size=16M bs=1m'
+ docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randwrite --bs=1m --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1 --do_verify=0 --verify=crc32c --verify_backlog=1
+
+ - name: Save logs
+ if: always()
+ run: |
+ docker compose -f ./compose/e2e-mount.yml logs > output.log
+ echo 'Showing last 500 log lines of mount service:'
+ docker compose -f ./compose/e2e-mount.yml logs --tail 500 mount
+
+ - name: Check for data races
+ if: always()
+ continue-on-error: true # TODO: remove this comment to enable build failure on data races (after all are fixed)
+ run: grep -A50 'DATA RACE' output.log && exit 1 || exit 0
+
+ - name: Archive logs
+ if: always()
+ uses: actions/upload-artifact@v3
+ with:
+ name: output-logs
+ path: docker/output.log
+
+ - name: Cleanup
+ if: always()
+ run: docker compose -f ./compose/e2e-mount.yml down --volumes --remove-orphans --rmi all
diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index 5aca3a9e6..efe15d4af 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -21,7 +21,7 @@ jobs:
steps:
- name: Set up Go 1.x
- uses: actions/setup-go@84cbf8094393cdc5fe1fe1671ff2647332956b1a # v2
+ uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f # v2
with:
go-version: ^1.13
id: go
diff --git a/docker/Dockerfile.e2e b/docker/Dockerfile.e2e
new file mode 100644
index 000000000..70f173128
--- /dev/null
+++ b/docker/Dockerfile.e2e
@@ -0,0 +1,30 @@
+FROM ubuntu:22.04
+
+LABEL author="Chris Lu"
+
+RUN apt-get update && apt-get install -y curl fio fuse
+RUN mkdir -p /etc/seaweedfs /data/filerldb2
+
+COPY ./weed /usr/bin/
+COPY ./filer.toml /etc/seaweedfs/filer.toml
+COPY ./entrypoint.sh /entrypoint.sh
+
+# volume server grpc port
+EXPOSE 18080
+# volume server http port
+EXPOSE 8080
+# filer server grpc port
+EXPOSE 18888
+# filer server http port
+EXPOSE 8888
+# master server shared grpc port
+EXPOSE 19333
+# master server shared http port
+EXPOSE 9333
+
+VOLUME /data
+WORKDIR /data
+
+RUN chmod +x /entrypoint.sh
+
+ENTRYPOINT ["/entrypoint.sh"]
diff --git a/docker/Dockerfile.gccgo_build b/docker/Dockerfile.gccgo_build
index 01f51a07d..7408450fb 100644
--- a/docker/Dockerfile.gccgo_build
+++ b/docker/Dockerfile.gccgo_build
@@ -1,5 +1,5 @@
FROM gcc:11 as builder
-RUN mkdir -p /go/src/github.com/chrislusf/
+RUN mkdir -p /go/src/github.com/seaweedfs/
RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs
ARG BRANCH=${BRANCH:-master}
RUN cd /go/src/github.com/seaweedfs/seaweedfs && git checkout $BRANCH
diff --git a/docker/Dockerfile.go_build b/docker/Dockerfile.go_build
index 0237f4b3b..8e38dd54e 100644
--- a/docker/Dockerfile.go_build
+++ b/docker/Dockerfile.go_build
@@ -1,6 +1,6 @@
FROM golang:1.19-alpine as builder
RUN apk add git g++ fuse
-RUN mkdir -p /go/src/github.com/chrislusf/
+RUN mkdir -p /go/src/github.com/seaweedfs/
RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs
ARG BRANCH=${BRANCH:-master}
ARG TAGS
diff --git a/docker/Dockerfile.local b/docker/Dockerfile.local
index 947edffda..53cfd9571 100644
--- a/docker/Dockerfile.local
+++ b/docker/Dockerfile.local
@@ -5,6 +5,7 @@ RUN mkdir -p /etc/seaweedfs
COPY ./filer.toml /etc/seaweedfs/filer.toml
COPY ./entrypoint.sh /entrypoint.sh
RUN apk add fuse # for weed mount
+RUN apk add curl # for health checks
# volume server grpc port
EXPOSE 18080
diff --git a/docker/Dockerfile.rocksdb_dev_env b/docker/Dockerfile.rocksdb_dev_env
new file mode 100644
index 000000000..816dec1ac
--- /dev/null
+++ b/docker/Dockerfile.rocksdb_dev_env
@@ -0,0 +1,16 @@
+FROM golang:1.19-buster as builder
+
+RUN apt-get update
+RUN apt-get install -y build-essential libsnappy-dev zlib1g-dev libbz2-dev libgflags-dev liblz4-dev libzstd-dev
+
+ENV ROCKSDB_VERSION v7.5.3
+
+# build RocksDB
+RUN cd /tmp && \
+ git clone https://github.com/facebook/rocksdb.git /tmp/rocksdb --depth 1 --single-branch --branch $ROCKSDB_VERSION && \
+ cd rocksdb && \
+ PORTABLE=1 make static_lib && \
+ make install-static
+
+ENV CGO_CFLAGS "-I/tmp/rocksdb/include"
+ENV CGO_LDFLAGS "-L/tmp/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
diff --git a/docker/Dockerfile.rocksdb_large b/docker/Dockerfile.rocksdb_large
index 75e61fef2..9bf72905e 100644
--- a/docker/Dockerfile.rocksdb_large
+++ b/docker/Dockerfile.rocksdb_large
@@ -3,7 +3,7 @@ FROM golang:1.19-buster as builder
RUN apt-get update
RUN apt-get install -y build-essential libsnappy-dev zlib1g-dev libbz2-dev libgflags-dev liblz4-dev libzstd-dev
-ENV ROCKSDB_VERSION v7.4.4
+ENV ROCKSDB_VERSION v7.5.3
# build RocksDB
RUN cd /tmp && \
@@ -16,7 +16,7 @@ ENV CGO_CFLAGS "-I/tmp/rocksdb/include"
ENV CGO_LDFLAGS "-L/tmp/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
# build SeaweedFS
-RUN mkdir -p /go/src/github.com/chrislusf/
+RUN mkdir -p /go/src/github.com/seaweedfs/
RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs
ARG BRANCH=${BRANCH:-master}
RUN cd /go/src/github.com/seaweedfs/seaweedfs && git checkout $BRANCH
diff --git a/docker/Dockerfile.rocksdb_large_local b/docker/Dockerfile.rocksdb_large_local
new file mode 100644
index 000000000..989bf3a10
--- /dev/null
+++ b/docker/Dockerfile.rocksdb_large_local
@@ -0,0 +1,45 @@
+FROM chrislusf/rocksdb_dev_env as builder
+
+# build SeaweedFS
+RUN mkdir -p /go/src/github.com/seaweedfs/
+ADD . /go/src/github.com/seaweedfs/seaweedfs
+RUN ls -al /go/src/github.com/seaweedfs/ && \
+ cd /go/src/github.com/seaweedfs/seaweedfs/weed \
+ && export LDFLAGS="-X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=$(git rev-parse --short HEAD)" \
+ && go install -tags "5BytesOffset rocksdb" -ldflags "-extldflags -static ${LDFLAGS}"
+
+
+FROM alpine AS final
+LABEL author="Chris Lu"
+COPY --from=builder /go/bin/weed /usr/bin/
+RUN mkdir -p /etc/seaweedfs
+COPY --from=builder /go/src/github.com/seaweedfs/seaweedfs/docker/filer_rocksdb.toml /etc/seaweedfs/filer.toml
+COPY --from=builder /go/src/github.com/seaweedfs/seaweedfs/docker/entrypoint.sh /entrypoint.sh
+RUN apk add fuse snappy gflags tmux
+
+# volume server gprc port
+EXPOSE 18080
+# volume server http port
+EXPOSE 8080
+# filer server gprc port
+EXPOSE 18888
+# filer server http port
+EXPOSE 8888
+# master server shared gprc port
+EXPOSE 19333
+# master server shared http port
+EXPOSE 9333
+# s3 server http port
+EXPOSE 8333
+# webdav server http port
+EXPOSE 7333
+
+RUN mkdir -p /data/filer_rocksdb
+
+VOLUME /data
+
+WORKDIR /data
+
+RUN chmod +x /entrypoint.sh
+
+ENTRYPOINT ["/entrypoint.sh"]
diff --git a/docker/Makefile b/docker/Makefile
index 793ef17de..faedb080f 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -8,11 +8,17 @@ cgo ?= 0
binary:
export SWCOMMIT=$(shell git rev-parse --short HEAD)
export SWLDFLAGS="-X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=$(SWCOMMIT)"
- cd ../weed; CGO_ENABLED=$(cgo) GOOS=linux go build $(options) -tags "$(tags)" -ldflags "-extldflags -static $(SWLDFLAGS)"; mv weed ../docker/
+ cd ../weed && CGO_ENABLED=$(cgo) GOOS=linux go build $(options) -tags "$(tags)" -ldflags "-extldflags -static $(SWLDFLAGS)" && mv weed ../docker/
+
+binary_race: options = -race
+binary_race: cgo = 1
+binary_race: binary
build: binary
docker build --no-cache -t chrislusf/seaweedfs:local -f Dockerfile.local .
- rm ./weed
+
+build_e2e: binary_race
+ docker build --no-cache -t chrislusf/seaweedfs:e2e -f Dockerfile.e2e .
go_build: # make go_build tags=elastic,ydb,gocdk,hdfs,5BytesOffset
docker build --build-arg TAGS=$(tags) --no-cache -t chrislusf/seaweedfs:go_build -f Dockerfile.go_build .
@@ -20,6 +26,12 @@ go_build: # make go_build tags=elastic,ydb,gocdk,hdfs,5BytesOffset
go_build_large_disk:
docker build --build-arg TAGS=large_disk --no-cache -t chrislusf/seaweedfs:large_disk -f Dockerfile.go_build .
+build_rocksdb_dev_env:
+ docker build --no-cache -t chrislusf/rocksdb_dev_env -f Dockerfile.rocksdb_dev_env .
+
+build_rocksdb_local:
+ cd .. ; docker build --no-cache -t chrislusf/seaweedfs:rocksdb_local -f docker/Dockerfile.rocksdb_large_local .
+
build_rocksdb:
docker build --no-cache -t chrislusf/seaweedfs:rocksdb -f Dockerfile.rocksdb_large .
@@ -29,9 +41,7 @@ s3tests_build:
dev: build
docker-compose -f compose/local-dev-compose.yml -p seaweedfs up
-dev_race: options = -race
-dev_race: cgo = 1
-dev_race: build
+dev_race: binary_race
docker-compose -f compose/local-dev-compose.yml -p seaweedfs up
dev_tls: build certstrap
diff --git a/docker/compose/e2e-mount.yml b/docker/compose/e2e-mount.yml
new file mode 100644
index 000000000..d5da9c221
--- /dev/null
+++ b/docker/compose/e2e-mount.yml
@@ -0,0 +1,53 @@
+version: '3.9'
+
+services:
+ master:
+ image: chrislusf/seaweedfs:e2e
+ command: "-v=4 master -ip=master -ip.bind=0.0.0.0 -raftBootstrap"
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ]
+ interval: 1s
+ timeout: 60s
+
+ volume:
+ image: chrislusf/seaweedfs:e2e
+ command: "-v=4 volume -mserver=master:9333 -ip=volume -ip.bind=0.0.0.0 -preStopSeconds=1"
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://localhost:8080/healthz" ]
+ interval: 1s
+ timeout: 30s
+ depends_on:
+ master:
+ condition: service_healthy
+
+ filer:
+ image: chrislusf/seaweedfs:e2e
+ command: "-v=4 filer -master=master:9333 -ip=filer -ip.bind=0.0.0.0"
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://localhost:8888" ]
+ interval: 1s
+ timeout: 30s
+ depends_on:
+ volume:
+ condition: service_healthy
+
+ mount:
+ image: chrislusf/seaweedfs:e2e
+ command: "-v=4 mount -filer=filer:8888 -filer.path=/ -dirAutoCreate -dir=/mnt/seaweedfs"
+ cap_add:
+ - SYS_ADMIN
+ devices:
+ - /dev/fuse
+ security_opt:
+ - apparmor:unconfined
+ deploy:
+ resources:
+ limits:
+ memory: 4096m
+ healthcheck:
+ test: [ "CMD", "mountpoint", "-q", "--", "/mnt/seaweedfs" ]
+ interval: 1s
+ timeout: 30s
+ depends_on:
+ filer:
+ condition: service_healthy
diff --git a/go.mod b/go.mod
index 02adc8e20..d18389b70 100644
--- a/go.mod
+++ b/go.mod
@@ -3,13 +3,13 @@ module github.com/seaweedfs/seaweedfs
go 1.19
require (
- cloud.google.com/go v0.102.1 // indirect
- cloud.google.com/go/pubsub v1.24.0
- cloud.google.com/go/storage v1.25.0
+ cloud.google.com/go v0.104.0 // indirect
+ cloud.google.com/go/pubsub v1.25.1
+ cloud.google.com/go/storage v1.26.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Shopify/sarama v1.36.0
- github.com/aws/aws-sdk-go v1.44.76
+ github.com/aws/aws-sdk-go v1.44.91
github.com/beorn7/perks v1.0.1 // indirect
github.com/bwmarrin/snowflake v0.3.0
github.com/cespare/xxhash/v2 v2.1.2 // indirect
@@ -61,7 +61,7 @@ require (
github.com/klauspost/reedsolomon v1.10.0
github.com/kurin/blazer v0.5.3
github.com/lib/pq v1.10.6
- github.com/linxGnu/grocksdb v1.7.5
+ github.com/linxGnu/grocksdb v1.7.7
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-ieproxy v0.0.3 // indirect
@@ -90,7 +90,6 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.12.0
- github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.8.0
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
@@ -112,31 +111,31 @@ require (
go.opencensus.io v0.23.0 // indirect
gocloud.dev v0.26.0
gocloud.dev/pubsub/natspubsub v0.26.0
- gocloud.dev/pubsub/rabbitpubsub v0.25.0
+ gocloud.dev/pubsub/rabbitpubsub v0.26.0
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd
golang.org/x/image v0.0.0-20200119044424-58c23975cae1
golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced
- golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 // indirect
+ golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094 // indirect
golang.org/x/sys v0.0.0-20220818161305-2296e01440c6
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
- google.golang.org/api v0.92.0
+ google.golang.org/api v0.94.0
google.golang.org/appengine v1.6.7 // indirect
- google.golang.org/genproto v0.0.0-20220720214146-176da50484ac // indirect
- google.golang.org/grpc v1.48.0
+ google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc // indirect
+ google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.1
gopkg.in/inf.v0 v0.9.1 // indirect
modernc.org/b v1.0.0 // indirect
modernc.org/cc/v3 v3.36.0 // indirect
modernc.org/ccgo/v3 v3.16.8 // indirect
modernc.org/libc v1.16.19 // indirect
- modernc.org/mathutil v1.4.1 // indirect
+ modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.1.1 // indirect
modernc.org/opt v0.1.1 // indirect
modernc.org/sqlite v1.18.1
- modernc.org/strutil v1.1.2
+ modernc.org/strutil v1.1.3
modernc.org/token v1.0.0 // indirect
)
@@ -144,13 +143,14 @@ require (
github.com/Jille/raft-grpc-transport v1.2.0
github.com/arangodb/go-driver v1.3.3
github.com/fluent/fluent-logger-golang v1.9.0
- github.com/google/flatbuffers v2.0.6+incompatible
+ github.com/google/flatbuffers v2.0.8+incompatible
github.com/hanwen/go-fuse/v2 v2.1.1-0.20220627082937-d01fda7edf17
github.com/hashicorp/raft v1.3.10
- github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0
+ github.com/hashicorp/raft-boltdb/v2 v2.2.2
+ github.com/rabbitmq/amqp091-go v1.4.0
github.com/tikv/client-go/v2 v2.0.1
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2
- github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1
+ github.com/ydb-platform/ydb-go-sdk/v3 v3.37.4
google.golang.org/grpc/security/advancedtls v0.0.0-20220622233350-5cdb09fa29c1
)
@@ -218,6 +218,7 @@ require (
github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f // indirect
github.com/ydb-platform/ydb-go-yc v0.8.3 // indirect
github.com/ydb-platform/ydb-go-yc-metadata v0.5.2 // indirect
+ go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/api/v3 v3.5.4 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect
go.uber.org/atomic v1.9.0 // indirect
diff --git a/go.sum b/go.sum
index 0382d91ec..a62300b54 100644
--- a/go.sum
+++ b/go.sum
@@ -33,8 +33,8 @@ cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2Z
cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U=
cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A=
cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc=
-cloud.google.com/go v0.102.1 h1:vpK6iQWv/2uUeFJth4/cBHsQAGjn1iIE6AAlxipRaA0=
-cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU=
+cloud.google.com/go v0.104.0 h1:gSmWO7DY1vOm0MVU6DNXM11BWHHsTUmsC5cv1fuW5X8=
+cloud.google.com/go v0.104.0/go.mod h1:OO6xxXdJyvuJPcEPBLN9BJPD+jep5G1+2U5B5gkRYtA=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
@@ -66,8 +66,8 @@ cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.19.0/go.mod h1:/O9kmSe9bb9KRnIAWkzmqhPjHo6LtzGOBYd/kr06XSs=
-cloud.google.com/go/pubsub v1.24.0 h1:aCS6wSMzrc602OeXUMA66KGlyXxpdkHdwN+FSBv/sUg=
-cloud.google.com/go/pubsub v1.24.0/go.mod h1:rWv09Te1SsRpRGPiWOMDKraMQTJyJps4MkUCoMGUgqw=
+cloud.google.com/go/pubsub v1.25.1 h1:l0wCNZKuEp2Q54wAy8283EV9O57+7biWOXnnU2/Tq/A=
+cloud.google.com/go/pubsub v1.25.1/go.mod h1:bY6l7rF8kCcwz6V3RaQ6kK4p5g7qc7PqjRoE9wDOqOU=
cloud.google.com/go/secretmanager v1.3.0/go.mod h1:+oLTkouyiYiabAQNugCeTS3PAArGiMJuBqvJnJsyH+U=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
@@ -77,8 +77,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA=
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
-cloud.google.com/go/storage v1.25.0 h1:D2Dn0PslpK7Z3B2AvuUHyIC762bDbGJdlmQlCBR71os=
-cloud.google.com/go/storage v1.25.0/go.mod h1:Qys4JU+jeup3QnuKKAosWuxrD95C4MSqxfVDnSirDsI=
+cloud.google.com/go/storage v1.26.0 h1:lYAGjknyDJirSzfwUlkv4Nsnj7od7foxQNH/fqZqles=
+cloud.google.com/go/storage v1.26.0/go.mod h1:mk/N7YwIKEWyTvXAWQCIeiCTdLoRH6Pd5xmSnolQLTI=
cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A=
cloud.google.com/go/trace v1.2.0/go.mod h1:Wc8y/uYyOhPy12KEnXG9XGrvfMz5F5SrYecQlbW1rwM=
contrib.go.opencensus.io/exporter/aws v0.0.0-20200617204711-c478e41e60e9/go.mod h1:uu1P0UCM/6RbsMrgPa98ll8ZcHM858i/AD06a9aLRCA=
@@ -148,15 +148,14 @@ github.com/arangodb/go-driver v1.3.3/go.mod h1:5GAx3XvK72DJPhJgyjZOtYAGc4SpY7rZD
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e h1:Xg+hGrY2LcQBbxd0ZFdbGSyRKTYMZCfBbw/pMJFOk1g=
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e/go.mod h1:mq7Shfa/CaixoDxiyAAc5jZ6CVBAyPaNQCGS7mkj4Ho=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
-github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.3.10 h1:FR+drcQStOe+32sYyJYyZ7FIdgoGGBnwLl+flodp8Uo=
github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
-github.com/aws/aws-sdk-go v1.44.76 h1:5e8yGO/XeNYKckOjpBKUd5wStf0So3CrQIiOMCVLpOI=
-github.com/aws/aws-sdk-go v1.44.76/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
+github.com/aws/aws-sdk-go v1.44.91 h1:SRWmuX7PTyhBdLuvSfM7KWrWISJsrRsUPcFDSFduRxY=
+github.com/aws/aws-sdk-go v1.44.91/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM=
@@ -408,8 +407,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
-github.com/google/flatbuffers v2.0.6+incompatible h1:XHFReMv7nFFusa+CEokzWbzaYocKXI6C7hdU5Kgh9Lw=
-github.com/google/flatbuffers v2.0.6+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
+github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM=
+github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -527,8 +526,10 @@ github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7H
github.com/hashicorp/raft v1.3.1/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft v1.3.10 h1:LR5QZX1VQd0DFWZfeCwWawyeKfpS/Tm1yjnJIY5X4Tw=
github.com/hashicorp/raft v1.3.10/go.mod h1:J8naEwc6XaaCfts7+28whSeRvCqTd6e20BlCU3LtEO4=
-github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0 h1:CO8dBMLH6dvE1jTn/30ZZw3iuPsNfajshWoJTnVc5cc=
-github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0/go.mod h1:nTakvJ4XYq45UXtn0DbwR4aU9ZdjlnIenpbs6Cd+FM0=
+github.com/hashicorp/raft-boltdb v0.0.0-20210409134258-03c10cc3d4ea h1:RxcPJuutPRM8PUOyiweMmkuNO+RJyfy2jds2gfvgNmU=
+github.com/hashicorp/raft-boltdb v0.0.0-20210409134258-03c10cc3d4ea/go.mod h1:qRd6nFJYYS6Iqnc/8HcUmko2/2Gw8qTFEmxDLii6W5I=
+github.com/hashicorp/raft-boltdb/v2 v2.2.2 h1:rlkPtOllgIcKLxVT4nutqlTH2NRFn+tO1wwZk/4Dxqw=
+github.com/hashicorp/raft-boltdb/v2 v2.2.2/go.mod h1:N8YgaZgNJLpZC+h+by7vDu5rzsRgONThTEeUS3zWbfY=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
@@ -651,8 +652,8 @@ github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs=
github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
-github.com/linxGnu/grocksdb v1.7.5 h1:nB/Z6OsxvO4VJqn4CYaUubYh0523YBsAPvrW5Gc7cnY=
-github.com/linxGnu/grocksdb v1.7.5/go.mod h1:8RUIOKch4MgftfkmgG4FPVkPUwvbLWlX/FcDkJFXIbo=
+github.com/linxGnu/grocksdb v1.7.7 h1:b6o8gagb4FL+P55qUzPchBR/C0u1lWjJOWQSWbhvTWg=
+github.com/linxGnu/grocksdb v1.7.7/go.mod h1:0hTf+iA+GOr0jDX4CgIYyJZxqOH9XlBh6KVj8+zmF34=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
@@ -807,6 +808,9 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
+github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
+github.com/rabbitmq/amqp091-go v1.4.0 h1:T2G+J9W9OY4p64Di23J6yH7tOkMocgnESvYeBjuG9cY=
+github.com/rabbitmq/amqp091-go v1.4.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
@@ -850,8 +854,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=
github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI=
-github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
-github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
@@ -919,8 +921,8 @@ github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f/go.mo
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4=
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4=
github.com/ydb-platform/ydb-go-sdk/v3 v3.25.3/go.mod h1:PFizF/vJsdAgEwjK3DVSBD52kdmRkWfSIS2q2pA+e88=
-github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1 h1:+DTSx1uMFMtNHEpC1ZyM6t2vaBQCnymqHV1+/abZDWM=
-github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1/go.mod h1:eD5OyVA8MuMq3+BYBMKGUfa2faTZhbx+LE+y1RgitFE=
+github.com/ydb-platform/ydb-go-sdk/v3 v3.37.4 h1:wQtx05MHEuYnIt56wos9vaz3N7/Ue04PiSYWk7o7Akw=
+github.com/ydb-platform/ydb-go-sdk/v3 v3.37.4/go.mod h1:eD5OyVA8MuMq3+BYBMKGUfa2faTZhbx+LE+y1RgitFE=
github.com/ydb-platform/ydb-go-yc v0.8.3 h1:92UUUMsfvtMl6mho8eQ9lbkiPrF3a9CT+RrVRAKNRwo=
github.com/ydb-platform/ydb-go-yc v0.8.3/go.mod h1:zUolAFGzJ5XG8uwiseTLr9Lapm7L7hdVdZgLSuv9FXE=
github.com/ydb-platform/ydb-go-yc-metadata v0.5.2 h1:nMtixUijP0Z7iHJNT9fOL+dbmEzZxqU6Xk87ll7hqXg=
@@ -933,6 +935,9 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
+go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
+go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
+go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd/api/v3 v3.5.2/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
go.etcd.io/etcd/api/v3 v3.5.4 h1:OHVyt3TopwtUQ2GKdd5wu3PmmipR4FTwCqoEjSyRdIc=
go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
@@ -983,13 +988,12 @@ go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
-gocloud.dev v0.25.0/go.mod h1:7HegHVCYZrMiU3IE1qtnzf/vRrDwLYnRNR3EhWX8x9Y=
gocloud.dev v0.26.0 h1:4rM/SVL0lLs+rhC0Gmc+gt/82DBpb7nbpIZKXXnfMXg=
gocloud.dev v0.26.0/go.mod h1:mkUgejbnbLotorqDyvedJO20XcZNTynmSeVSQS9btVg=
gocloud.dev/pubsub/natspubsub v0.26.0 h1:f1hynU3D37dREFD9JzZLpCMoFJB5MyrHsujf9c6q+lE=
gocloud.dev/pubsub/natspubsub v0.26.0/go.mod h1:eec1REZLqHheRuVb43qcLhyRj/3/hGNRO8xNZYrhzqw=
-gocloud.dev/pubsub/rabbitpubsub v0.25.0 h1:jDAHvIH0h40quEuqusYXfK28sCABAMAnjLqLybu/aeo=
-gocloud.dev/pubsub/rabbitpubsub v0.25.0/go.mod h1:gfOrMlNXnxzIYB3dK1mNenXeBwJjm2ZSRBgNzxan0/Y=
+gocloud.dev/pubsub/rabbitpubsub v0.26.0 h1:oJqftlDNHWq5oB6VahB0QU1v0pecxXJVvIltRlBcehE=
+gocloud.dev/pubsub/rabbitpubsub v0.26.0/go.mod h1:21IRXtS7aNovPySnKZNAwKmWcP9ju32eHdMRuG7RTbs=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
@@ -1142,8 +1146,8 @@ golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb/go.mod h1:jaDAt6Dkxork7LmZnYtzbRWj0W47D86a3TGe0YHBvmE=
-golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 h1:+jnHzr9VPj32ykQVai5DNahi9+NSp7yYuCsl5eAQtL0=
-golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2/go.mod h1:jaDAt6Dkxork7LmZnYtzbRWj0W47D86a3TGe0YHBvmE=
+golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094 h1:2o1E+E8TpNLklK9nHiPiK1uzIYrIHt+cQx3ynCwq9V8=
+golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -1204,6 +1208,7 @@ golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -1406,8 +1411,8 @@ google.golang.org/api v0.75.0/go.mod h1:pU9QmyHLnzlpar1Mjt4IbapUCy8J+6HD6GeELN69
google.golang.org/api v0.78.0/go.mod h1:1Sg78yoMLOhlQTeF+ARBoytAcH1NNyyl390YMy6rKmw=
google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3Hyg=
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
-google.golang.org/api v0.92.0 h1:8JHk7q/+rJla+iRsWj9FQ9/wjv2M1SKtpKSdmLhxPT0=
-google.golang.org/api v0.92.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
+google.golang.org/api v0.94.0 h1:KtKM9ru3nzQioV1HLlUf1cR7vMYJIpgls5VhAYQXIwA=
+google.golang.org/api v0.94.0/go.mod h1:eADj+UBuxkh5zlrSntJghuNeg8HwQ1w5lTKkuqaETEI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -1513,10 +1518,9 @@ google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP
google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
-google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
-google.golang.org/genproto v0.0.0-20220720214146-176da50484ac h1:EOa+Yrhx1C0O+4pHeXeWrCwdI0tWI6IfUU56Vebs9wQ=
-google.golang.org/genproto v0.0.0-20220720214146-176da50484ac/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
+google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc h1:Nf+EdcTLHR8qDNN/KfkQL0u0ssxt9OhbaWCl5C0ucEI=
+google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -1550,8 +1554,8 @@ google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
-google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
-google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
+google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
+google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b h1:NuxyvVZoDfHZwYW9LD4GJiF5/nhiSyP4/InTrvw9Ibk=
google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b/go.mod h1:IBqQ7wSUJ2Ep09a8rMWFsg4fmI2r38zwsq8a0GgxXpM=
@@ -1633,8 +1637,9 @@ modernc.org/libc v1.16.19 h1:S8flPn5ZeXx6iw/8yNa986hwTQDrY8RXU7tObZuAozo=
modernc.org/libc v1.16.19/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA=
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
-modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8=
modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
+modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
+modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.1.1 h1:bDOL0DIDLQv7bWhP3gMvIrnoFw+Eo6F7a2QK9HPDiFU=
modernc.org/memory v1.1.1/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw=
modernc.org/opt v0.1.1 h1:/0RX92k9vwVeDXj+Xn23DKp2VJubL7k8qNffND6qn3A=
@@ -1643,8 +1648,8 @@ modernc.org/sqlite v1.18.1 h1:ko32eKt3jf7eqIkCgPAeHMBXw3riNSLhl2f3loEF7o8=
modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw=
-modernc.org/strutil v1.1.2 h1:iFBDH6j1Z0bN/Q9udJnnFoFpENA4252qe/7/5woE5MI=
-modernc.org/strutil v1.1.2/go.mod h1:OYajnUAcI/MX+XD/Wx7v1bbdvcQSvxgtb0gC+u3d3eg=
+modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY=
+modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
modernc.org/tcl v1.13.1 h1:npxzTwFTZYM8ghWicVIX1cRWzj7Nd8i6AqqX2p+IYao=
modernc.org/token v1.0.0 h1:a0jaWiNMDhDUtqOj09wvjWWAqd3q7WpBulmL9H2egsk=
modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
diff --git a/k8s/helm_charts2/Chart.yaml b/k8s/helm_charts2/Chart.yaml
index d57f1d9f6..4fa84ac52 100644
--- a/k8s/helm_charts2/Chart.yaml
+++ b/k8s/helm_charts2/Chart.yaml
@@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
-appVersion: "3.24"
-version: "3.24"
+appVersion: "3.25"
+version: "3.25"
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index 6e1dee356..1e26cf96d 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -6,7 +6,7 @@
3.13
- 2.10.1
+ 3.2.4
com.github.chrislusf
diff --git a/unmaintained/s3/presigned_put/presigned_put.go b/unmaintained/s3/presigned_put/presigned_put.go
index e8368d124..ba135ff25 100644
--- a/unmaintained/s3/presigned_put/presigned_put.go
+++ b/unmaintained/s3/presigned_put/presigned_put.go
@@ -1,15 +1,16 @@
package main
import (
+ "crypto/md5"
+ "encoding/base64"
+ "fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
- "encoding/base64"
- "fmt"
- "crypto/md5"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "net/http"
"strings"
"time"
- "net/http"
)
// Downloads an item from an S3 Bucket in the region configured in the shared config
@@ -63,6 +64,7 @@ func main() {
fmt.Printf("error put request: %v\n", err)
return
}
+ defer util.CloseResponse(resp)
fmt.Printf("response: %+v\n", resp)
}
@@ -70,4 +72,4 @@ var stringContent = `Generate a Pre-Signed URL for an Amazon S3 PUT Operation wi
You can generate a pre-signed URL for a PUT operation that checks whether users upload the correct content. When the SDK pre-signs a request, it computes the checksum of the request body and generates an MD5 checksum that is included in the pre-signed URL. Users must upload the same content that produces the same MD5 checksum generated by the SDK; otherwise, the operation fails. This is not the Content-MD5, but the signature. To enforce Content-MD5, simply add the header to the request.
The following example adds a Body field to generate a pre-signed PUT operation that requires a specific payload to be uploaded by users.
-`
\ No newline at end of file
+`
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 5c1e653cb..452e76228 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -173,29 +173,29 @@ func runFiler(cmd *Command, args []string) bool {
if *f.dataCenter != "" && *filerS3Options.dataCenter == "" {
filerS3Options.dataCenter = f.dataCenter
}
- go func() {
- time.Sleep(startDelay * time.Second)
+ go func(delay time.Duration) {
+ time.Sleep(delay * time.Second)
filerS3Options.startS3Server()
- }()
+ }(startDelay)
startDelay++
}
if *filerStartWebDav {
filerWebDavOptions.filer = &filerAddress
- go func() {
- time.Sleep(startDelay * time.Second)
+ go func(delay time.Duration) {
+ time.Sleep(delay * time.Second)
filerWebDavOptions.startWebDav()
- }()
+ }(startDelay)
startDelay++
}
if *filerStartIam {
filerIamOptions.filer = &filerAddress
filerIamOptions.masters = f.mastersString
- go func() {
- time.Sleep(startDelay * time.Second)
+ go func(delay time.Duration) {
+ time.Sleep(delay * time.Second)
filerIamOptions.startIamServer()
- }()
+ }(startDelay)
}
f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
@@ -293,17 +293,18 @@ func (fo *FilerOptions) startFiler() {
httpS := &http.Server{Handler: defaultMux}
if runtime.GOOS != "windows" {
- if *fo.localSocket == "" {
- *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
+ localSocket := *fo.localSocket
+ if localSocket == "" {
+ localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
}
- if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) {
- glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error())
+ if err := os.Remove(localSocket); err != nil && !os.IsNotExist(err) {
+ glog.Fatalf("Failed to remove %s, error: %s", localSocket, err.Error())
}
go func() {
// start on local unix socket
- filerSocketListener, err := net.Listen("unix", *fo.localSocket)
+ filerSocketListener, err := net.Listen("unix", localSocket)
if err != nil {
- glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err)
+ glog.Fatalf("Failed to listen on %s: %v", localSocket, err)
}
httpS.Serve(filerSocketListener)
}()
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index d6f1d63d8..7133b0aef 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -44,10 +44,16 @@ type SyncOptions struct {
aProxyByFiler *bool
bProxyByFiler *bool
metricsHttpPort *int
+ concurrency *int
clientId int32
clientEpoch int32
}
+const (
+ SyncKeyPrefix = "sync."
+ DefaultConcurrcyLimit = 32
+)
+
var (
syncOptions SyncOptions
syncCpuProfile *string
@@ -77,6 +83,7 @@ func init() {
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
+ syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrcyLimit, "The maximum number of files that will be synced concurrently.")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
@@ -153,6 +160,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.bProxyByFiler,
*syncOptions.bDiskType,
*syncOptions.bDebug,
+ *syncOptions.concurrency,
aFilerSignature,
bFilerSignature)
if err != nil {
@@ -189,6 +197,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.aProxyByFiler,
*syncOptions.aDiskType,
*syncOptions.aDebug,
+ *syncOptions.concurrency,
bFilerSignature,
aFilerSignature)
if err != nil {
@@ -221,7 +230,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
}
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
- replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
+ replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
// if has previously synced, resume from that point of time
@@ -251,7 +260,12 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
return persistEventFn(resp)
}
- processor := NewMetadataProcessor(processEventFn, 128)
+
+ if concurrency < 0 || concurrency > 1024 {
+ glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrcyLimit)
+ concurrency = DefaultConcurrcyLimit
+ }
+ processor := NewMetadataProcessor(processEventFn, concurrency)
var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
@@ -276,10 +290,6 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
-const (
- SyncKeyPrefix = "sync."
-)
-
// When each business is distinguished according to path, and offsets need to be maintained separately.
func getSignaturePrefixByPath(path string) string {
// compatible historical version
diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go
index aebb14e61..1d1727519 100644
--- a/weed/command/mount_linux.go
+++ b/weed/command/mount_linux.go
@@ -3,6 +3,7 @@ package command
import (
"bufio"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"io"
"os"
"strings"
@@ -142,6 +143,9 @@ func checkMountPointAvailable(dir string) bool {
}
if mounted, err := mounted(mountPoint); err != nil || mounted {
+ if err != nil {
+ glog.Errorf("check %s: %v", mountPoint, err)
+ }
return false
}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index db5899109..a39d88cce 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -157,7 +157,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
// Ensure target mount point availability
if isValid := checkMountPointAvailable(dir); !isValid {
- glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
+ glog.Fatalf("Target mount point is not available: %s, please check!", dir)
return true
}
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 4bcb9527b..d69ac214c 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -194,7 +194,7 @@ func (s3opt *S3Options) startS3Server() bool {
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
- LocalFilerSocket: s3opt.localFilerSocket,
+ LocalFilerSocket: *s3opt.localFilerSocket,
DataCenter: *s3opt.dataCenter,
})
if s3ApiServer_err != nil {
diff --git a/weed/command/update.go b/weed/command/update.go
index 89efae79a..30b15cc99 100644
--- a/weed/command/update.go
+++ b/weed/command/update.go
@@ -199,6 +199,7 @@ func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (R
if err != nil {
return Release{}, err
}
+ defer util.CloseResponse(res)
if res.StatusCode != http.StatusOK {
content := res.Header.Get("Content-Type")
@@ -211,17 +212,10 @@ func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (R
}
}
- _ = res.Body.Close()
return Release{}, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status)
}
buf, err := ioutil.ReadAll(res.Body)
- if err != nil {
- _ = res.Body.Close()
- return Release{}, err
- }
-
- err = res.Body.Close()
if err != nil {
return Release{}, err
}
@@ -265,18 +259,13 @@ func getGithubData(ctx context.Context, url string) ([]byte, error) {
if err != nil {
return nil, err
}
+ defer util.CloseResponse(res)
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status)
}
buf, err := ioutil.ReadAll(res.Body)
- if err != nil {
- _ = res.Body.Close()
- return nil, err
- }
-
- err = res.Body.Close()
if err != nil {
return nil, err
}
diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go
index 75e65f7e0..25ef60bf0 100644
--- a/weed/filer/arangodb/arangodb_store.go
+++ b/weed/filer/arangodb/arangodb_store.go
@@ -121,7 +121,7 @@ func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Conte
return nil, err
}
- return context.WithValue(ctx, transactionKey, txn), nil
+ return context.WithValue(driver.WithTransactionID(ctx, txn), transactionKey, txn), nil
}
func (store *ArangodbStore) CommitTransaction(ctx context.Context) error {
diff --git a/weed/filer/arangodb/arangodb_store_bucket.go b/weed/filer/arangodb/arangodb_store_bucket.go
index b35a5c9b3..44aeeadea 100644
--- a/weed/filer/arangodb/arangodb_store_bucket.go
+++ b/weed/filer/arangodb/arangodb_store_bucket.go
@@ -34,6 +34,9 @@ func (store *ArangodbStore) OnBucketDeletion(bucket string) {
glog.Errorf("bucket delete %s: %v", bucket, err)
return
}
+ store.mu.Lock()
+ delete(store.buckets, bucket)
+ store.mu.Unlock()
}
func (store *ArangodbStore) CanDropWholeBucket() bool {
return true
diff --git a/weed/filer/arangodb/helpers.go b/weed/filer/arangodb/helpers.go
index 35796a8f8..3f36acb0a 100644
--- a/weed/filer/arangodb/helpers.go
+++ b/weed/filer/arangodb/helpers.go
@@ -86,7 +86,7 @@ func (store *ArangodbStore) ensureBucket(ctx context.Context, bucket string) (bc
store.mu.RLock()
bc, ok = store.buckets[bucket]
store.mu.RUnlock()
- if ok {
+ if ok && bc != nil {
return bc, nil
}
store.mu.Lock()
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index fe5fe289a..ae5bc69bc 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -32,6 +32,8 @@ var (
)
type Filer struct {
+ UniqueFilerId int32
+ UniqueFilerEpoch int32
Store VirtualFilerStore
MasterClient *wdclient.MasterClient
fileIdDeletionQueue *util.UnboundedQueue
@@ -45,8 +47,6 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
- UniqueFilerId int32
- UniqueFilerEpoch int32
}
func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index bdc556f24..0c354f889 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -58,7 +58,7 @@ func (f *Filer) loopProcessingDeletion() {
glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
}
} else {
- glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
+ glog.V(2).Infof("deleting fileIds len=%d", deletionCount)
}
}
})
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index d384c2c38..fbc163442 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -9,6 +9,7 @@ import (
"io"
"strings"
"sync"
+ "sync/atomic"
"time"
"google.golang.org/grpc"
@@ -194,13 +195,13 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- ma.filer.UniqueFilerEpoch++
+ atomic.AddInt32(&ma.filer.UniqueFilerEpoch, 1)
stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "filer:" + string(self),
PathPrefix: "/",
SinceNs: lastTsNs,
ClientId: ma.filer.UniqueFilerId,
- ClientEpoch: ma.filer.UniqueFilerEpoch,
+ ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch),
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
index 64de3ec08..45bee62f7 100644
--- a/weed/filer/rocksdb/rocksdb_store.go
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -123,7 +123,7 @@ func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
key := genKey(dir, name)
data, err := store.db.Get(store.ro, key)
- if data == nil {
+ if data == nil || !data.Exists() {
return nil, filer_pb.ErrNotFound
}
defer data.Free()
diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go
index 24ed46be5..ce662b969 100644
--- a/weed/filer/s3iam_conf.go
+++ b/weed/filer/s3iam_conf.go
@@ -43,7 +43,7 @@ func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error {
for _, cred := range ident.Credentials {
if userName, found := accessKeySet[cred.AccessKey]; !found {
accessKeySet[cred.AccessKey] = ident.Name
- } else {
+ } else if userName != ident.Name {
return fmt.Errorf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName)
}
}
diff --git a/weed/filer/s3iam_conf_test.go b/weed/filer/s3iam_conf_test.go
index 66944aa44..ab1af4bc1 100644
--- a/weed/filer/s3iam_conf_test.go
+++ b/weed/filer/s3iam_conf_test.go
@@ -97,6 +97,41 @@ func TestCheckDuplicateAccessKey(t *testing.T) {
},
"",
},
+ {
+ &iam_pb.S3ApiConfiguration{
+ Identities: []*iam_pb.Identity{
+ {
+ Name: "some_name",
+ Credentials: []*iam_pb.Credential{
+ {
+ AccessKey: "some_access_key1",
+ SecretKey: "some_secret_key1",
+ },
+ },
+ Actions: []string{
+ ACTION_ADMIN,
+ ACTION_READ,
+ ACTION_WRITE,
+ },
+ },
+ {
+ Name: "some_name",
+ Credentials: []*iam_pb.Credential{
+ {
+ AccessKey: "some_access_key1",
+ SecretKey: "some_secret_key1",
+ },
+ },
+ Actions: []string{
+ ACTION_READ,
+ ACTION_TAGGING,
+ ACTION_LIST,
+ },
+ },
+ },
+ },
+ "",
+ },
{
&iam_pb.S3ApiConfiguration{
Identities: []*iam_pb.Identity{
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index 4595764ee..5d1552ce6 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -77,6 +77,10 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
+ if fh.entry == nil {
+ return
+ }
+
// find the earliest incoming chunk
newChunks := chunks
earliestChunk := newChunks[0]
@@ -86,10 +90,6 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
}
}
- if fh.entry == nil {
- return
- }
-
// pick out-of-order chunks from existing chunks
for _, chunk := range fh.entry.Chunks {
if lessThan(earliestChunk, chunk) {
@@ -110,7 +110,8 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
func (fh *FileHandle) CloseReader() {
if fh.reader != nil {
- fh.reader.Close()
+ _ = fh.reader.Close()
+ fh.reader = nil
}
}
diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go
index 0375bc206..307ad5960 100644
--- a/weed/mount/weedfs_file_read.go
+++ b/weed/mount/weedfs_file_read.go
@@ -39,8 +39,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
+ fh.Lock()
+ defer fh.Unlock()
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
index 58ee43767..131345f9c 100644
--- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
+++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
@@ -20,13 +20,14 @@ package gocdk_pub_sub
import (
"context"
"fmt"
- "github.com/streadway/amqp"
+ amqp "github.com/rabbitmq/amqp091-go"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
"gocloud.dev/pubsub/rabbitpubsub"
"google.golang.org/protobuf/proto"
"net/url"
"path"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -49,33 +50,44 @@ func getPath(rawUrl string) string {
}
type GoCDKPubSub struct {
- topicURL string
- topic *pubsub.Topic
+ topicURL string
+ topic *pubsub.Topic
+ topicLock sync.RWMutex
}
func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub"
}
+func (k *GoCDKPubSub) setTopic(topic *pubsub.Topic) {
+ k.topicLock.Lock()
+ k.topic = topic
+ k.topicLock.Unlock()
+ k.doReconnect()
+}
+
func (k *GoCDKPubSub) doReconnect() {
var conn *amqp.Connection
+ k.topicLock.RLock()
+ defer k.topicLock.RUnlock()
if k.topic.As(&conn) {
- go func() {
- <-conn.NotifyClose(make(chan *amqp.Error))
- conn.Close()
+ go func(c *amqp.Connection) {
+ <-c.NotifyClose(make(chan *amqp.Error))
+ c.Close()
+ k.topicLock.RLock()
k.topic.Shutdown(context.Background())
+ k.topicLock.RUnlock()
for {
glog.Info("Try reconnect")
conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err == nil {
- k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
- k.doReconnect()
+ k.setTopic(rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil))
break
}
glog.Error(err)
time.Sleep(time.Second)
}
- }()
+ }(conn)
}
}
@@ -86,8 +98,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
if err != nil {
glog.Fatalf("Failed to open topic: %v", err)
}
- k.topic = topic
- k.doReconnect()
+ k.setTopic(topic)
return nil
}
@@ -96,6 +107,8 @@ func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
if err != nil {
return err
}
+ k.topicLock.RLock()
+ defer k.topicLock.RUnlock()
err = k.topic.Send(context.Background(), &pubsub.Message{
Body: bytes,
Metadata: map[string]string{"key": key},
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 790a8012e..eacf64112 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -106,10 +106,7 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri
if err != nil {
return written, err
}
- defer func() {
- io.Copy(io.Discard, resp.Body)
- resp.Body.Close()
- }()
+ defer util.CloseResponse(resp)
switch resp.StatusCode {
case http.StatusRequestedRangeNotSatisfiable:
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 12f4ce524..af1edea75 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -313,19 +313,20 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
}
// print("+")
resp, post_err := HttpClient.Do(req)
+ defer util.CloseResponse(resp)
if post_err != nil {
if strings.Contains(post_err.Error(), "connection reset by peer") ||
strings.Contains(post_err.Error(), "use of closed network connection") {
glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr)
stats.FilerRequestCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
resp, post_err = HttpClient.Do(req)
+ defer util.CloseResponse(resp)
}
}
if post_err != nil {
return nil, fmt.Errorf("upload %s %d bytes to %v: %v", option.Filename, originalDataSize, option.UploadUrl, post_err)
}
// print("-")
- defer util.CloseResponse(resp)
var ret UploadResult
etag := getEtag(resp)
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 9a795d4ec..265f51b55 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -133,6 +133,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Name: name,
IsDirectory: entry.IsDirectory,
Attributes: entry.Attributes,
+ Extended: entry.Extended,
Chunks: replicatedChunks,
Content: entry.Content,
RemoteEntry: entry.RemoteEntry,
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 8167b5d5d..2e7640af4 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -5,10 +5,10 @@ package sub
import (
"context"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
"gocloud.dev/pubsub/rabbitpubsub"
diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go
index a5e8f128c..0e6c5ab89 100644
--- a/weed/s3api/s3api_object_copy_handlers.go
+++ b/weed/s3api/s3api_object_copy_handlers.go
@@ -174,11 +174,12 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
- dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
+ resp, dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
+ defer util.CloseResponse(resp)
defer dataReader.Close()
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 31ee1dc92..76163d724 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -28,7 +28,7 @@ type S3ApiServerOption struct {
GrpcDialOption grpc.DialOption
AllowEmptyFolder bool
AllowDeleteBucketNotEmpty bool
- LocalFilerSocket *string
+ LocalFilerSocket string
DataCenter string
}
@@ -59,7 +59,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
cb: NewCircuitBreaker(option),
}
- if option.LocalFilerSocket == nil || *option.LocalFilerSocket == "" {
+ if option.LocalFilerSocket == "" {
s3ApiServer.client = &http.Client{Transport: &http.Transport{
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
@@ -68,7 +68,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
s3ApiServer.client = &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
- return net.Dial("unix", *option.LocalFilerSocket)
+ return net.Dial("unix", option.LocalFilerSocket)
},
},
}
diff --git a/weed/server/common.go b/weed/server/common.go
index c99ef3640..35924c5c8 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -110,6 +110,7 @@ func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj
func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
m := make(map[string]interface{})
m["error"] = err.Error()
+ glog.V(1).Infof("error JSON response status %d: %s", httpStatus, m["error"])
writeJsonQuiet(w, r, httpStatus, m)
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 327eaa54d..8c3d4bcd8 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -96,7 +96,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
if err == filer_pb.ErrNotFound {
- glog.V(1).Infof("Not found %s: %v", path, err)
+ glog.V(2).Infof("Not found %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
w.WriteHeader(http.StatusNotFound)
} else {
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index a1dc9c8d6..89941b340 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -46,7 +46,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
if err != nil {
- if strings.HasPrefix(err.Error(), "read input:") {
+ if strings.HasPrefix(err.Error(), "read input:") || err.Error() == io.ErrUnexpectedEOF.Error() {
writeJsonError(w, r, 499, err)
} else if strings.HasSuffix(err.Error(), "is a file") {
writeJsonError(w, r, http.StatusConflict, err)
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 7756b5d59..4dc588055 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -52,6 +52,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var bytesBufferCounter int64
bytesBufferLimitCond := sync.NewCond(new(sync.Mutex))
var fileChunksLock sync.Mutex
+ var uploadErrLock sync.Mutex
for {
// need to throttle used byte buffer
@@ -77,7 +78,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1)
bytesBufferLimitCond.Signal()
+ uploadErrLock.Lock()
uploadErr = err
+ uploadErrLock.Unlock()
break
}
if chunkOffset == 0 && !isAppend {
@@ -105,14 +108,19 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
}()
chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
- if uploadErr == nil && toChunkErr != nil {
- uploadErr = toChunkErr
+ if toChunkErr != nil {
+ uploadErrLock.Lock()
+ if uploadErr == nil {
+ uploadErr = toChunkErr
+ }
+ uploadErrLock.Unlock()
}
if chunk != nil {
fileChunksLock.Lock()
fileChunks = append(fileChunks, chunk)
+ fileChunksSize := len(fileChunks)
fileChunksLock.Unlock()
- glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size))
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
}
}(chunkOffset)
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
index ed8477053..636ba8c37 100644
--- a/weed/server/raft_hashicorp.go
+++ b/weed/server/raft_hashicorp.go
@@ -7,7 +7,7 @@ import (
"fmt"
transport "github.com/Jille/raft-grpc-transport"
"github.com/hashicorp/raft"
- boltdb "github.com/hashicorp/raft-boltdb"
+ boltdb "github.com/hashicorp/raft-boltdb/v2"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
@@ -75,7 +75,7 @@ func (s *RaftServer) UpdatePeers() {
s.RaftHashicorp.AddVoter(
raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
}
- for peer, _ := range existsPeerName {
+ for peer := range existsPeerName {
if _, found := s.peers[peer]; !found {
glog.V(0).Infof("removing old peer: %s", peer)
s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 1de569788..3bd10c1e9 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -83,6 +83,13 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
return err
}
+ if *verbose || lastLogTime.Add(time.Second).Before(time.Now()) {
+ if !*verbose {
+ lastLogTime = time.Now()
+ }
+ fmt.Fprintf(writer, "load %s\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name))
+ }
+
fullEntry.Entry.Name = strings.ReplaceAll(fullEntry.Entry.Name, "/", "x")
if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: fullEntry.Dir,
@@ -91,13 +98,6 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
return err
}
- if *verbose || lastLogTime.Add(time.Second).Before(time.Now()) {
- if !*verbose {
- lastLogTime = time.Now()
- }
- fmt.Fprintf(writer, "load %s\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name))
- }
-
if fullEntry.Entry.IsDirectory {
dirCount++
} else {
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 403a3a841..f1924d32d 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -129,10 +129,10 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
// find and make up the differences
if aHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, verbose, writer, applyChanges, nonRepairThreshold); err != nil {
- return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode, a.location.dataNode, b.info.Id, err)
+ return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err)
}
if bHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, verbose, writer, applyChanges, nonRepairThreshold); err != nil {
- return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode, b.location.dataNode, a.info.Id, err)
+ return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
}
return
}
diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go
index 978d53751..ae9e83c2a 100644
--- a/weed/shell/command_volume_tier_move.go
+++ b/weed/shell/command_volume_tier_move.go
@@ -12,7 +12,11 @@ import (
"path/filepath"
"sync"
"time"
+ "context"
+ "errors"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@@ -40,7 +44,7 @@ func (c *commandVolumeTierMove) Name() string {
func (c *commandVolumeTierMove) Help() string {
return `change a volume from one disk type to another
- volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4]
+ volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4] [-toReplication=XYZ]
Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
So "volume.fix.replication" and "volume.balance" should be followed.
@@ -59,6 +63,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
applyChange := tierCommand.Bool("force", false, "actually apply the changes")
ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
+ replicationString := tierCommand.String("toReplication", "", "the new target replication setting");
+
if err = tierCommand.Parse(args); err != nil {
return nil
}
@@ -119,7 +125,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
unlock := c.Lock(job.src)
if applyChanges {
- if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond); err != nil {
+ if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond, replicationString); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
}
}
@@ -220,7 +226,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil
}
-func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64) (err error) {
+func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64, replicationString *string ) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -230,8 +236,9 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
- if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
+ newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
+ if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
@@ -240,6 +247,26 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
}
+ // If move is successful and replication is not empty, alter moved volume's replication setting
+ if *replicationString != "" {
+ err = operation.WithVolumeServerClient(false, newAddress, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
+ VolumeId: uint32(vid),
+ Replication: *replicationString,
+ })
+ if configureErr != nil {
+ return configureErr
+ }
+ if resp.Error != "" {
+ return errors.New(resp.Error)
+ }
+ return nil
+ })
+ if err != nil {
+ glog.Errorf("update volume %d replication on %s: %v", vid, locations[0].Url, err)
+ }
+ }
+
// remove the remaining replicas
for _, loc := range locations {
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
index 01fcc1283..0b3db3c67 100644
--- a/weed/storage/backend/s3_backend/s3_backend.go
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -124,8 +124,6 @@ func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n
bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
- // glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
-
getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
Bucket: &s3backendStorageFile.backendStorage.bucket,
Key: &s3backendStorageFile.key,
@@ -137,13 +135,16 @@ func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n
}
defer getObjectOutput.Body.Close()
- glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
- glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength)
+ // glog.V(3).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
+ // glog.V(3).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength)
+ var readCount int
for {
- if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) {
- p = p[n:]
- } else {
+ p = p[readCount:]
+ readCount, err = getObjectOutput.Body.Read(p)
+ n += readCount
+
+ if err != nil {
break
}
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 3cb35cb85..e12fb2c50 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -549,7 +549,7 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
maxVolumeCount += int32(uint64(unclaimedSpaces)/volumeSizeLimit) - 1
}
atomic.StoreInt32(&diskLocation.MaxVolumeCount, maxVolumeCount)
- glog.V(2).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
+ glog.V(4).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024)
hasChanges = hasChanges || currentMaxVolumeCount != atomic.LoadInt32(&diskLocation.MaxVolumeCount)
}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 91d2d0d1f..1a9c8bd24 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -36,6 +36,7 @@ type Volume struct {
super_block.SuperBlock
dataFileAccessLock sync.RWMutex
+ superBlockAccessLock sync.Mutex
asyncRequestsChan chan *needle.AsyncRequest
lastModifiedTsSeconds uint64 // unix time in seconds
lastAppendAtNs uint64 // unix time in nanoseconds
@@ -97,6 +98,8 @@ func (v *Volume) FileName(ext string) (fileName string) {
}
func (v *Volume) Version() needle.Version {
+ v.superBlockAccessLock.Lock()
+ defer v.superBlockAccessLock.Unlock()
if v.volumeInfo.Version != 0 {
v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
}
@@ -281,7 +284,7 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
- glog.V(3).Infof("collectStatus volume %d", v.Id)
+ glog.V(4).Infof("collectStatus volume %d", v.Id)
if v.nm == nil || v.DataBackend == nil {
return
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index b38ec32d3..d4d795fee 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -107,7 +107,7 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
readOption.IsOutOfRange = false
err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
}
- buf := mem.Allocate(1024 * 1024)
+ buf := mem.Allocate(min(1024*1024, int(size)))
defer mem.Free(buf)
actualOffset := nv.Offset.ToActualOffset()
if readOption.IsOutOfRange {
@@ -117,6 +117,13 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size)
}
+func min(x, y int) int {
+ if x < y {
+ return x
+ }
+ return y
+}
+
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
v.dataFileAccessLock.RLock()
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 8d28eff13..377122cae 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"math/rand"
"sync"
+ "sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -103,6 +104,8 @@ func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
+ growRequestCount int32
+ growRequestTime time.Time
rp *super_block.ReplicaPlacement
ttl *needle.TTL
diskType types.DiskType
@@ -114,8 +117,6 @@ type VolumeLayout struct {
volumeSizeLimit uint64
replicationAsMin bool
accessLock sync.RWMutex
- growRequestCount int
- growRequestTime time.Time
}
type VolumeLayoutStats struct {
@@ -319,18 +320,19 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
}
func (vl *VolumeLayout) HasGrowRequest() bool {
- if vl.growRequestCount > 0 && vl.growRequestTime.Add(time.Minute).After(time.Now()) {
+ if atomic.LoadInt32(&vl.growRequestCount) > 0 &&
+ vl.growRequestTime.Add(time.Minute).After(time.Now()) {
return true
}
return false
}
func (vl *VolumeLayout) AddGrowRequest() {
vl.growRequestTime = time.Now()
- vl.growRequestCount++
+ atomic.AddInt32(&vl.growRequestCount, 1)
}
func (vl *VolumeLayout) DoneGrowRequest() {
vl.growRequestTime = time.Unix(0, 0)
- vl.growRequestCount = 0
+ atomic.StoreInt32(&vl.growRequestCount, 0)
}
func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool {
diff --git a/weed/util/constants.go b/weed/util/constants.go
index d242f37f9..ea387b0f0 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION_NUMBER = fmt.Sprintf("%.02f", 3.24)
+ VERSION_NUMBER = fmt.Sprintf("%.02f", 3.25)
VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = ""
)
diff --git a/weed/util/grace/signal_handling.go b/weed/util/grace/signal_handling.go
index fc7afcad9..8146668be 100644
--- a/weed/util/grace/signal_handling.go
+++ b/weed/util/grace/signal_handling.go
@@ -12,7 +12,7 @@ import (
var signalChan chan os.Signal
var hooks = make([]func(), 0)
-var hookLock sync.Mutex
+var hookLock sync.RWMutex
func init() {
signalChan = make(chan os.Signal, 1)
@@ -27,10 +27,12 @@ func init() {
// syscall.SIGQUIT,
)
go func() {
- for _ = range signalChan {
+ for range signalChan {
+ hookLock.RLock()
for _, hook := range hooks {
hook()
}
+ hookLock.RUnlock()
os.Exit(0)
}
}()
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 783ff2379..bb1a32ede 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -60,7 +60,7 @@ func Get(url string) ([]byte, bool, error) {
if err != nil {
return nil, true, err
}
- defer response.Body.Close()
+ defer CloseResponse(response)
var reader io.ReadCloser
switch response.Header.Get("Content-Encoding") {
@@ -242,8 +242,8 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
if err != nil {
return 0, err
}
+ defer CloseResponse(r)
- defer r.Body.Close()
if r.StatusCode >= 400 {
return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
@@ -370,11 +370,11 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
return false, nil
}
-func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.ReadCloser, error) {
+func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*http.Response, io.ReadCloser, error) {
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
- return nil, err
+ return nil, nil, err
}
if rangeHeader != "" {
req.Header.Add("Range", rangeHeader)
@@ -388,10 +388,11 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.R
r, err := client.Do(req)
if err != nil {
- return nil, err
+ return nil, nil, err
}
if r.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ CloseResponse(r)
+ return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
var reader io.ReadCloser
@@ -399,15 +400,17 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.R
switch contentEncoding {
case "gzip":
reader, err = gzip.NewReader(r.Body)
- defer reader.Close()
default:
reader = r.Body
}
- return reader, nil
+ return r, reader, nil
}
func CloseResponse(resp *http.Response) {
+ if resp == nil || resp.Body == nil {
+ return
+ }
reader := &CountingReader{reader: resp.Body}
io.Copy(io.Discard, reader)
resp.Body.Close()
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index d330e9ec6..6d88e56e9 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -18,17 +18,17 @@ import (
)
type MasterClient struct {
- FilerGroup string
- clientType string
- clientHost pb.ServerAddress
- rack string
- currentMaster pb.ServerAddress
- masters map[string]pb.ServerAddress
- grpcDialOption grpc.DialOption
+ FilerGroup string
+ clientType string
+ clientHost pb.ServerAddress
+ rack string
+ currentMaster pb.ServerAddress
+ currentMasterLock sync.RWMutex
+ masters map[string]pb.ServerAddress
+ grpcDialOption grpc.DialOption
vidMap
- vidMapCacheSize int
-
+ vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
}
@@ -61,12 +61,12 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
if err == nil && len(fullUrls) > 0 {
return
}
- err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ err = pb.WithMasterClient(false, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
if err != nil {
- return fmt.Errorf("LookupVolume failed: %v", err)
+ return fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
}
for vid, vidLocation := range resp.VolumeIdLocations {
for _, vidLoc := range vidLocation.Locations {
@@ -91,9 +91,21 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
return
}
+func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
+ mc.currentMasterLock.RLock()
+ defer mc.currentMasterLock.RUnlock()
+ return mc.currentMaster
+}
+
+func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
+ mc.currentMasterLock.Lock()
+ mc.currentMaster = master
+ mc.currentMasterLock.Unlock()
+}
+
func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected()
- return mc.currentMaster
+ return mc.getCurrentMaster()
}
func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
@@ -103,7 +115,7 @@ func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
func (mc *MasterClient) WaitUntilConnected() {
for {
- if mc.currentMaster != "" {
+ if mc.getCurrentMaster() != "" {
return
}
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
@@ -151,8 +163,7 @@ func (mc *MasterClient) tryAllMasters() {
for nextHintedLeader != "" {
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
}
-
- mc.currentMaster = ""
+ mc.setCurrentMaster("")
}
}
@@ -204,7 +215,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
} else {
mc.resetVidMap()
}
- mc.currentMaster = master
+ mc.setCurrentMaster(master)
for {
resp, err := stream.Recv()
@@ -216,8 +227,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
- if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader {
- glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader)
+ if resp.VolumeLocation.Leader != "" && string(mc.GetMaster()) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
@@ -279,10 +290,7 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
- for mc.currentMaster == "" {
- time.Sleep(3 * time.Second)
- }
- return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})