Merge pull request #2 from chrislusf/master

This commit is contained in:
bingoohuang 2021-02-18 13:57:34 +08:00 committed by GitHub
commit c8f56f5712
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
147 changed files with 4359 additions and 2928 deletions

View file

@ -1,22 +0,0 @@
name: Cleanup
on:
push:
branches: [ master ]
jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Delete old release assets
uses: mknejp/delete-release-assets@v1
with:
token: ${{ github.token }}
tag: dev
fail-if-no-assets: false
assets: |
weed-*

View file

@ -24,10 +24,14 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Wait for the deletion
uses: jakejarvis/wait-action@master
- name: Delete old release assets
uses: mknejp/delete-release-assets@v1
with:
time: '30s'
token: ${{ github.token }}
tag: dev
fail-if-no-assets: false
assets: |
weed-*
- name: Set BUILD_TIME env
run: echo BUILD_TIME=$(date -u +%Y-%m-%d-%H-%M) >> ${GITHUB_ENV}

View file

@ -1,8 +1,8 @@
sudo: false
language: go
go:
- 1.14.x
- 1.15.x
- 1.16.x
before_install:
- export PATH=/home/travis/gopath/bin:$PATH
@ -44,4 +44,4 @@ deploy:
on:
tags: true
repo: chrislusf/seaweedfs
go: 1.15.x
go: 1.16.x

31
docker/Dockerfile.s3tests Normal file
View file

@ -0,0 +1,31 @@
FROM ubuntu:20.04
RUN DEBIAN_FRONTEND=noninteractive apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get upgrade -y && \
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
git \
sudo \
debianutils \
python3-pip \
python3-virtualenv \
python3-dev \
libevent-dev \
libffi-dev \
libxml2-dev \
libxslt-dev \
zlib1g-dev && \
DEBIAN_FRONTEND=noninteractive apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
git clone https://github.com/ceph/s3-tests.git /opt/s3-tests
WORKDIR /opt/s3-tests
RUN ./bootstrap
ENV \
NOSETESTS_EXCLUDE="" \
NOSETESTS_ATTR="" \
NOSETESTS_OPTIONS="" \
S3TEST_CONF="/s3test.conf"
ENTRYPOINT ["/bin/bash", "-c"]
CMD ["exec ./virtualenv/bin/nosetests ${NOSETESTS_OPTIONS-} ${NOSETESTS_ATTR:+-a $NOSETESTS_ATTR} ${NOSETESTS_EXCLUDE:+-e $NOSETESTS_EXCLUDE}"]

View file

@ -9,6 +9,9 @@ build:
docker build --no-cache -t chrislusf/seaweedfs:local -f Dockerfile.local .
rm ./weed
s3tests_build:
docker build --no-cache -t chrislusf/ceph-s3-tests:local -f Dockerfile.s3tests .
dev: build
docker-compose -f compose/local-dev-compose.yml -p seaweedfs up
@ -30,6 +33,9 @@ cluster: build
2clusters: build
docker-compose -f compose/local-clusters-compose.yml -p seaweedfs up
s3tests: build s3tests_build
docker-compose -f compose/local-s3tests-compose.yml -p seaweedfs up
filer_etcd: build
docker stack deploy -c compose/swarm-etcd.yml fs

View file

@ -24,7 +24,7 @@ services:
ports:
- 8080:8080
- 18080:18080
command: 'volume -mserver="master0:9333,master1:9334,master2:9335" -port=8080 -ip=volume1 -publicUrl=localhost:8080'
command: 'volume -mserver="master0:9333,master1:9334,master2:9335" -port=8080 -ip=volume1 -publicUrl=localhost:8080 -preStopSeconds=1 -disk=ssd1'
depends_on:
- master0
- master1
@ -34,7 +34,7 @@ services:
ports:
- 8082:8082
- 18082:18082
command: 'volume -mserver="master0:9333,master1:9334,master2:9335" -port=8082 -ip=volume2 -publicUrl=localhost:8082'
command: 'volume -mserver="master0:9333,master1:9334,master2:9335" -port=8082 -ip=volume2 -publicUrl=localhost:8082 -preStopSeconds=1 -disk=ssd1'
depends_on:
- master0
- master1
@ -44,7 +44,7 @@ services:
ports:
- 8083:8083
- 18083:18083
command: 'volume -mserver="master0:9333,master1:9334,master2:9335" -port=8083 -ip=volume3 -publicUrl=localhost:8083'
command: 'volume -mserver="master0:9333,master1:9334,master2:9335" -port=8083 -ip=volume3 -publicUrl=localhost:8083 -preStopSeconds=1'
depends_on:
- master0
- master1

View file

@ -0,0 +1,45 @@
version: '2'
services:
master:
image: chrislusf/seaweedfs:local
ports:
- 9333:9333
- 19333:19333
command: "master -ip=master -volumeSizeLimitMB=16"
environment:
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1
volume:
image: chrislusf/seaweedfs:local
ports:
- 8080:8080
- 18080:18080
command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1"
depends_on:
- master
s3:
image: chrislusf/seaweedfs:local
ports:
- 8888:8888
- 18888:18888
- 8000:8000
command: 'filer -master="master:9333" -s3 -s3.config=/etc/seaweedfs/s3.json -s3.port=8000'
volumes:
- ./s3.json:/etc/seaweedfs/s3.json
depends_on:
- master
- volume
s3tests:
image: chrislusf/ceph-s3-tests:local
volumes:
- ./s3tests.conf:/opt/s3-tests/s3tests.conf
environment:
S3TEST_CONF: "s3tests.conf"
NOSETESTS_OPTIONS: "--verbose --logging-level=ERROR --with-xunit --failure-detail s3tests_boto3.functional.test_s3"
NOSETESTS_ATTR: "!tagging,!fails_on_aws,!encryption,!bucket-policy,!versioning,!fails_on_rgw,!bucket-policy,!fails_with_subdomain,!policy_status,!object-lock,!lifecycle,!cors,!user-policy"
NOSETESTS_EXCLUDE: "(bucket_list_delimiter_basic|bucket_listv2_delimiter_basic|bucket_listv2_encoding_basic|bucket_list_encoding_basic|bucket_list_delimiter_prefix|bucket_listv2_delimiter_prefix_ends_with_delimiter|bucket_list_delimiter_prefix_ends_with_delimiter|bucket_list_delimiter_alt|bucket_listv2_delimiter_alt|bucket_list_delimiter_prefix_underscore|bucket_list_delimiter_percentage|bucket_listv2_delimiter_percentage|bucket_list_delimiter_whitespace|bucket_listv2_delimiter_whitespace|bucket_list_delimiter_dot|bucket_listv2_delimiter_dot|bucket_list_delimiter_unreadable|bucket_listv2_delimiter_unreadable|bucket_listv2_fetchowner_defaultempty|bucket_listv2_fetchowner_empty|bucket_list_delimiter_not_skip_special|bucket_list_prefix_delimiter_alt|bucket_listv2_prefix_delimiter_alt|bucket_list_prefix_delimiter_prefix_not_exist|bucket_listv2_prefix_delimiter_prefix_not_exist|bucket_list_prefix_delimiter_delimiter_not_exist|bucket_listv2_prefix_delimiter_delimiter_not_exist|bucket_list_prefix_delimiter_prefix_delimiter_not_exist|bucket_listv2_prefix_delimiter_prefix_delimiter_not_exist|bucket_list_maxkeys_none|bucket_listv2_maxkeys_none|bucket_list_maxkeys_invalid|bucket_listv2_continuationtoken_empty|bucket_list_return_data|bucket_list_objects_anonymous|bucket_listv2_objects_anonymous|bucket_notexist|bucketv2_notexist|bucket_delete_nonempty|bucket_concurrent_set_canned_acl|object_write_to_nonexist_bucket|object_requestid_matches_header_on_error|object_head_zero_bytes|object_write_cache_control|object_write_expires|object_set_get_metadata_none_to_good|object_set_get_metadata_none_to_empty|object_set_get_metadata_overwrite_to_empty|post_object_anonymous_request|post_object_authenticated_request|post_object_authenticated_no_content_type|post_object_authenticated_request_bad_access_key|post_object_set_success_code|post_object_set_invalid_success_code|post_object_upload_larger_than_chunk|post_object_set_key_from_filename|post_object_ignored_header|post_object_case_insensitive_condition_fields|post_object_escaped_field_values|post_object_success_redirect_action|post_object_invalid_signature|post_object_invalid_access_key|post_object_missing_policy_condition|post_object_user_specified_header|post_object_request_missing_policy_specified_field|post_object_expired_policy|post_object_invalid_request_field_value|get_object_ifmatch_failed|get_object_ifunmodifiedsince_good|put_object_ifmatch_failed|object_raw_get|object_raw_get_bucket_gone|object_delete_key_bucket_gone|object_raw_get_bucket_acl|object_raw_get_object_acl|object_raw_authenticated|object_raw_response_headers|object_raw_authenticated_bucket_acl|object_raw_authenticated_object_acl|object_raw_authenticated_bucket_gone|object_raw_get_x_amz_expires_not_expired|object_raw_get_x_amz_expires_out_max_range|object_raw_get_x_amz_expires_out_positive_range|object_anon_put_write_access|object_raw_put_authenticated_expired|bucket_create_naming_bad_short_one|bucket_create_naming_bad_short_two|bucket_create_exists|bucket_get_location|bucket_acl_default|bucket_acl_canned|bucket_acl_canned_publicreadwrite|bucket_acl_canned_authenticatedread|object_acl_default|object_acl_canned_during_create|object_acl_canned|object_acl_canned_publicreadwrite|object_acl_canned_authenticatedread|object_acl_canned_bucketownerread|object_acl_canned_bucketownerfullcontrol|object_acl_full_control_verify_attributes|bucket_acl_canned_private_to_private|bucket_acl_grant_nonexist_user|bucket_acl_no_grants|bucket_acl_grant_email_not_exist|bucket_acl_revoke_all|bucket_recreate_not_overriding|bucket_create_special_key_names|object_copy_zero_size|object_copy_verify_contenttype|object_copy_to_itself|object_copy_to_itself_with_metadata|object_copy_not_owned_bucket|object_copy_not_owned_object_bucket|object_copy_retaining_metadata|object_copy_replacing_metadata|multipart_upload_empty|multipart_copy_invalid_range|multipart_copy_special_names|multipart_upload_resend_part|multipart_upload_size_too_small|abort_multipart_upload_not_found|multipart_upload_missing_part|multipart_upload_incorrect_etag|100_continue|ranged_request_invalid_range|ranged_request_empty_object|access_bucket)"
depends_on:
- master
- volume
- s3

105
docker/compose/s3.json Normal file
View file

@ -0,0 +1,105 @@
{
"identities": [
{
"name": "anonymous",
"actions": [
"Read"
]
},
{
"name": "some_admin_user",
"credentials": [
{
"accessKey": "some_access_key1",
"secretKey": "some_secret_key1"
}
],
"actions": [
"Admin",
"Read",
"List",
"Tagging",
"Write"
]
},
{
"name": "s3_tests",
"credentials": [
{
"accessKey": "ABCDEFGHIJKLMNOPQRST",
"secretKey": "abcdefghijklmnopqrstuvwxyzabcdefghijklmn"
},
{
"accessKey": "0555b35654ad1656d804",
"secretKey": "h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q=="
}
],
"actions": [
"Admin",
"Read",
"List",
"Tagging",
"Write"
]
},
{
"name": "s3_tests_alt",
"credentials": [
{
"accessKey": "NOPQRSTUVWXYZABCDEFG",
"secretKey": "nopqrstuvwxyzabcdefghijklmnabcdefghijklm"
}
],
"actions": [
"Admin",
"Read",
"List",
"Tagging",
"Write"
]
},
{
"name": "s3_tests_tenant",
"credentials": [
{
"accessKey": "HIJKLMNOPQRSTUVWXYZA",
"secretKey": "opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab"
}
],
"actions": [
"Admin",
"Read",
"List",
"Tagging",
"Write"
]
},
{
"name": "some_read_only_user",
"credentials": [
{
"accessKey": "some_access_key2",
"secretKey": "some_secret_key2"
}
],
"actions": [
"Read"
]
},
{
"name": "some_normal_user",
"credentials": [
{
"accessKey": "some_access_key3",
"secretKey": "some_secret_key3"
}
],
"actions": [
"Read",
"List",
"Tagging",
"Write"
]
}
]
}

View file

@ -0,0 +1,70 @@
[DEFAULT]
## this section is just used for host, port and bucket_prefix
# host set for rgw in vstart.sh
host = s3
# port set for rgw in vstart.sh
port = 8000
## say "False" to disable TLS
is_secure = False
[fixtures]
## all the buckets created will start with this prefix;
## {random} will be filled with random characters to pad
## the prefix to 30 characters long, and avoid collisions
bucket prefix = yournamehere-{random}-
[s3 main]
# main display_name set in vstart.sh
display_name = M. Tester
# main user_idname set in vstart.sh
user_id = testid
# main email set in vstart.sh
email = tester@ceph.com
# zonegroup api_name for bucket location
api_name = default
## main AWS access key
access_key = 0555b35654ad1656d804
## main AWS secret key
secret_key = h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==
## replace with key id obtained when secret is created, or delete if KMS not tested
#kms_keyid = 01234567-89ab-cdef-0123-456789abcdef
[s3 alt]
# alt display_name set in vstart.sh
display_name = john.doe
## alt email set in vstart.sh
email = john.doe@example.com
# alt user_id set in vstart.sh
user_id = 56789abcdef0123456789abcdef0123456789abcdef0123456789abcdef01234
# alt AWS access key set in vstart.sh
access_key = NOPQRSTUVWXYZABCDEFG
# alt AWS secret key set in vstart.sh
secret_key = nopqrstuvwxyzabcdefghijklmnabcdefghijklm
[s3 tenant]
# tenant display_name set in vstart.sh
display_name = testx$tenanteduser
# tenant user_id set in vstart.sh
user_id = 9876543210abcdef0123456789abcdef0123456789abcdef0123456789abcdef
# tenant AWS secret key set in vstart.sh
access_key = HIJKLMNOPQRSTUVWXYZA
# tenant AWS secret key set in vstart.sh
secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab
# tenant email set in vstart.sh
email = tenanteduser@example.com

5
go.mod
View file

@ -3,7 +3,7 @@ module github.com/chrislusf/seaweedfs
go 1.12
require (
cloud.google.com/go v0.58.0
cloud.google.com/go v0.58.0 // indirect
cloud.google.com/go/pubsub v1.3.1
cloud.google.com/go/storage v1.9.0
github.com/Azure/azure-amqp-common-go/v2 v2.1.0 // indirect
@ -41,7 +41,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.11.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/json-iterator/go v1.1.10
github.com/karlseguin/ccache v2.0.3+incompatible
github.com/karlseguin/ccache v2.0.3+incompatible // indirect
github.com/karlseguin/ccache/v2 v2.0.7
github.com/klauspost/compress v1.10.9 // indirect
github.com/klauspost/cpuid v1.2.1 // indirect
@ -73,6 +73,7 @@ require (
github.com/tidwall/match v1.0.1
github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365
github.com/valyala/bytebufferpool v1.0.0
github.com/valyala/fasthttp v1.20.0
github.com/viant/assertly v0.5.4 // indirect
github.com/viant/ptrie v0.3.0
github.com/viant/toolbox v0.33.2 // indirect

18
go.sum
View file

@ -23,8 +23,10 @@ cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNF
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
cloud.google.com/go/bigquery v1.8.0 h1:PQcPefKFdaIzjQFbiyOgAqyx8q5djaE7x9Sqe712DPA=
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0 h1:/May9ojXjRkPBNVrq+oWLqmWCkr4OU5uRY29bu0mRyQ=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/firestore v1.2.0/go.mod h1:iISCjWnTpnoJT1R287xRdjvQHJrxQOpeah4phb5D3h0=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
@ -68,10 +70,12 @@ github.com/Azure/go-amqp v0.12.7/go.mod h1:qApuH6OFTSKZFmCOxccvAv5rLizBQf4v8pRmG
github.com/Azure/go-autorest v12.0.0+incompatible h1:N+VqClcomLGD/sHb3smbSYYtNMgKpVV3Cd5r5i8z6bQ=
github.com/Azure/go-autorest v12.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest v0.9.3 h1:OZEIaBbMdUE/Js+BQKlpO81XlISgipr6yDJ+PSwsgi4=
github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc=
github.com/Azure/go-autorest/autorest/adal v0.8.1/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
github.com/Azure/go-autorest/autorest/adal v0.8.3 h1:O1AGG9Xig71FxdX9HO5pGNyZ7TbSyHaVg+5eJO/jSGw=
github.com/Azure/go-autorest/autorest/adal v0.8.3/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM=
github.com/Azure/go-autorest/autorest/azure/cli v0.3.1/go.mod h1:ZG5p860J94/0kI9mNJVoIoLgXcirM2gF5i2kWloofxw=
@ -105,6 +109,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4=
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
@ -446,6 +452,7 @@ github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
@ -455,6 +462,7 @@ github.com/karlseguin/ccache/v2 v2.0.7 h1:y5Pfi4eiyYCOD6LS/Kj+o6Nb4M5Ngpw9qFQs+v
github.com/karlseguin/ccache/v2 v2.0.7/go.mod h1:2BDThcfQMf/c0jnZowt16eW405XIqZPavt+HoYEtcxQ=
github.com/karlseguin/expect v1.0.1 h1:z4wy4npwwHSWKjGWH85WNJO42VQhovxTCZDSzhjo8hY=
github.com/karlseguin/expect v1.0.1/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003 h1:vJ0Snvo+SLMY72r5J4sEfkuE7AFbixEP2qRbEcum/wA=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
@ -463,6 +471,7 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.5 h1:U+CaK85mrNNb4k8BNOfgJtJ/gr6kswUCFj6miSzVC6M=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.9 h1:pPRt1Z78crspaHISkpSSHjDlx+Tt9suHe519dsI0vF4=
github.com/klauspost/compress v1.10.9/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
@ -756,6 +765,9 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.20.0 h1:olTmcnLQeZrkBc4TVgE/BatTo1NE/IvW050AuD8SW+U=
github.com/valyala/fasthttp v1.20.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/viant/assertly v0.5.4 h1:5Hh4U3pLZa6uhCFAGpYOxck/8l9TZczEzoHNfJAhHEQ=
github.com/viant/assertly v0.5.4/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/ptrie v0.3.0 h1:SDaRd7Gqr1+ItCNz0GpTxRdK21nOfqjV6YtBm9jGlMY=
@ -874,6 +886,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
@ -882,6 +895,7 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -919,6 +933,7 @@ golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -1058,6 +1073,7 @@ golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200601175630-2caf76543d99/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200606014950-c42cb6316fb6/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200608174601-1b747fd94509 h1:MI14dOfl3OG6Zd32w3ugsrvcUO810fDZdWakTq39dH4=
golang.org/x/tools v0.0.0-20200608174601-1b747fd94509/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
@ -1094,6 +1110,7 @@ google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww
google.golang.org/appengine v1.6.2 h1:j8RI1yW0SkI+paT6uGwMlrMI/6zwYA6/CFil8rxOzGI=
google.golang.org/appengine v1.6.2/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc=
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
@ -1211,6 +1228,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
modernc.org/b v1.0.0 h1:vpvqeyp17ddcQWF29Czawql4lDdABCDRbXRAS4+aF2o=
modernc.org/b v1.0.0/go.mod h1:uZWcZfRj1BpYzfN9JTerzlNUnnPsV9O2ZA8JsRcubNg=

View file

@ -9,15 +9,32 @@ and backup/HA memsql can provide.
with ENV.
* cert config exists and can be enabled, but not been tested.
### current instances config (AIO):
1 instance for each type (master/filer/volume/s3)
### prerequisites
kubernetes node have labels which help to define which node(Host) will run which pod.
instances need node labels:
* sw-volume: true (for volume instance, specific tag)
* sw-backend: true (for all others, as they less resource demanding)
s3/filer/master needs the label **sw-backend=true**
volume need the label **sw-volume=true**
to label a node to be able to run all pod types in k8s:
```
kubectl label node YOUR_NODE_NAME sw-volume=true,sw-backend=true
```
on production k8s deployment you will want each pod to have a different host,
especially the volume server & the masters, currently all pods (master/volume/filer)
have anti-affinity rule to disallow running multiple pod type on the same host.
if you still want to run multiple pods of the same type (master/volume/filer) on the same host
please set/update the corresponding affinity rule in values.yaml to an empty one:
```affinity: ""```
### current instances config (AIO):
1 instance for each type (master/filer+s3/volume)
you can update the replicas count for each node type in values.yaml,
need to add more nodes with the corresponding label.
need to add more nodes with the corresponding labels.
most of the configuration are available through values.yaml

View file

@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
appVersion: "2.24"
version: 2.24
appVersion: "2.26"
version: 2.26

View file

@ -15,13 +15,13 @@ spec:
backoffLimit: 2
template:
spec:
{{- with .Values.cronjob.nodeSelector }}
{{- if .Values.cronjob.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 12 }}
{{ tpl .Values.cronjob.nodeSelector . | indent 12 | trim }}
{{- end }}
{{- with .Values.cronjob.tolerations }}
{{- if .Values.cronjob.tolerations }}
tolerations:
{{- toYaml . | nindent 12 }}
{{ tpl .Values.cronjob.tolerations . | nindent 12 | trim }}
{{- end }}
restartPolicy: OnFailure
containers:
@ -39,7 +39,10 @@ spec:
volume.balance -force \
{{ if .Values.volume.dataCenter }} -dataCenter {{ .Values.volume.dataCenter }}{{ end }}\
{{ if .Values.cronjob.collection }} -collection {{ .Values.cronjob.collection }}{{ end }}\n\
volume.fix.replication\nunlock\n" | \
{{- if .Values.cronjob.enableFixReplication }}
volume.fix.replication {{ if .Values.cronjob.collectionPattern }} -collectionPattern={{ .Values.cronjob.collectionPattern }} {{ end }} \n\
{{- end }}
unlock\n" | \
/usr/bin/weed shell \
{{- if .Values.cronjob.master }}
-master {{ .Values.cronjob.master }} \

View file

@ -10,7 +10,6 @@ metadata:
monitoring: "true"
{{- end }}
spec:
clusterIP: None
ports:
- name: "swfs-filer"
port: {{ .Values.filer.port }}

View file

@ -133,14 +133,36 @@ spec:
-encryptVolumeData \
{{- end }}
-ip=${POD_IP} \
{{- if .Values.filer.enable_peers }}
{{- if gt (.Values.filer.replicas | int) 1 }}
-peers=$(echo -n "{{ range $index := until (.Values.filer.replicas | int) }}${SEAWEEDFS_FULLNAME}-filer-{{ $index }}.${SEAWEEDFS_FULLNAME}-filer:{{ $.Values.filer.port }}{{ if lt $index (sub ($.Values.filer.replicas | int) 1) }},{{ end }}{{ end }}" | sed "s/$HOSTNAME.${SEAWEEDFS_FULLNAME}-filer:{{ $.Values.filer.port }}//" | sed 's/,$//; 's/^,//'; s/,,/,/;' ) \
{{- end }}
{{- end }}
{{- if .Values.filer.s3.enabled }}
-s3 \
-s3.port={{ .Values.filer.s3.port }} \
{{- if .Values.filer.s3.domainName }}
-s3.domainName={{ .Values.filer.s3.domainName }} \
{{- end }}
{{- if .Values.global.enableSecurity }}
-s3.cert.file=/usr/local/share/ca-certificates/client/tls.crt \
-s3.key.file=/usr/local/share/ca-certificates/client/tls.key \
{{- end }}
{{- if .Values.filer.s3.allowEmptyFolder }}
-s3.allowEmptyFolder={{ .Values.filer.s3.allowEmptyFolder }} \
{{- end }}
{{- if .Values.filer.s3.enableAuth }}
-s3.config=/etc/sw/seaweedfs_s3_config \
{{- end }}
{{- end }}
-master={{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }}
{{- if or (.Values.global.enableSecurity) (.Values.filer.extraVolumeMounts) }}
volumeMounts:
- name: seaweedfs-filer-log-volume
mountPath: "/logs/"
- mountPath: /etc/sw
name: config-users
readOnly: true
{{- if .Values.global.enableSecurity }}
- name: security-config
readOnly: true
@ -198,6 +220,13 @@ spec:
hostPath:
path: /storage/logs/seaweedfs/filer
type: DirectoryOrCreate
- name: db-schema-config-volume
configMap:
name: seaweedfs-db-init-config
- name: config-users
secret:
defaultMode: 420
secretName: seaweedfs-s3-secret
{{- if .Values.global.enableSecurity }}
- name: security-config
configMap:

View file

@ -90,10 +90,16 @@ spec:
{{- if .Values.s3.allowEmptyFolder }}
-allowEmptyFolder={{ .Values.s3.allowEmptyFolder }} \
{{- end }}
{{- if .Values.s3.enableAuth }}
-config=/etc/sw/seaweedfs_s3_config \
{{- end }}
-filer={{ template "seaweedfs.name" . }}-filer-client:{{ .Values.filer.port }}
volumeMounts:
- name: logs
mountPath: "/logs/"
- mountPath: /etc/sw
name: config-users
readOnly: true
{{- if .Values.global.enableSecurity }}
- name: security-config
readOnly: true
@ -144,6 +150,10 @@ spec:
{{ tpl .Values.s3.resources . | nindent 12 | trim }}
{{- end }}
volumes:
- name: config-users
secret:
defaultMode: 420
secretName: seaweedfs-s3-secret
{{- if eq .Values.s3.logs.type "hostPath" }}
- name: logs
hostPath:

View file

@ -9,15 +9,15 @@ metadata:
spec:
ports:
- name: "swfs-s3"
port: {{ .Values.s3.port }}
targetPort: {{ .Values.s3.port }}
port: {{ if .Values.s3.enabled }}{{ .Values.s3.port }}{{ else }}{{ .Values.filer.s3.port }}{{ end }}
targetPort: {{ if .Values.s3.enabled }}{{ .Values.s3.port }}{{ else }}{{ .Values.filer.s3.port }}{{ end }}
protocol: TCP
{{- if .Values.s3.metricsPort }}
- name: "swfs-s3-metrics"
{{- if and .Values.s3.enabled .Values.s3.metricsPort }}
- name: "metrics"
port: {{ .Values.s3.metricsPort }}
targetPort: {{ .Values.s3.metricsPort }}
protocol: TCP
{{- end }}
selector:
app: {{ template "seaweedfs.name" . }}
component: s3
component: {{ if .Values.s3.enabled }}s3{{ else }}filer{{ end }}

View file

@ -0,0 +1,21 @@
{{- if not (or .Values.filer.s3.skipAuthSecretCreation .Values.s3.skipAuthSecretCreation) }}
{{- $access_key_admin := randAlphaNum 16 -}}
{{- $secret_key_admin := randAlphaNum 32 -}}
{{- $access_key_read := randAlphaNum 16 -}}
{{- $secret_key_read := randAlphaNum 32 -}}
apiVersion: v1
kind: Secret
type: Opaque
metadata:
name: seaweedfs-s3-secret
namespace: {{ .Release.Namespace }}
annotations:
"helm.sh/resource-policy": keep
"helm.sh/hook": "pre-install"
stringData:
admin_access_key_id: {{ $access_key_admin }}
admin_secret_access_key: {{ $secret_key_admin }}
read_access_key_id: {{ $access_key_read }}
read_secret_access_key: {{ $secret_key_read }}
seaweedfs_s3_config: '{"identities":[{"name":"anvAdmin","credentials":[{"accessKey":"{{ $access_key_admin }}","secretKey":"{{ $secret_key_admin }}"}],"actions":["Admin","Read","Write"]},{"name":"anvReadOnly","credentials":[{"accessKey":"{{ $access_key_read }}","secretKey":"{{ $secret_key_read }}"}],"actions":["Read"]}]}'
{{- end }}

View file

@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
# imageTag: "2.24" - started using {.Chart.appVersion}
# imageTag: "2.26" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
@ -136,7 +136,7 @@ volume:
# limit file size to avoid out of memory, default 256mb
fileSizeLimitMB: null
# minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly
minFreeSpacePercent: 1
minFreeSpacePercent: 7
# limit background compaction or copying speed in mega bytes per second
@ -229,6 +229,8 @@ filer:
maxMB: null
# encrypt data on volume servers
encryptVolumeData: false
# enable peers sync metadata, for leveldb (localdb for filer but with sync across)
enable_peers: false
# Whether proxy or redirect to volume server during file GET request
redirectOnRead: false
@ -313,6 +315,17 @@ filer:
s3:
enabled: true
port: 8333
#allow empty folders
allowEmptyFolder: false
# Suffix of the host name, {bucket}.{domainName}
domainName: ""
# enable user & permission to s3 (need to inject to all services)
enableAuth: false
skipAuthSecretCreation: false
s3:
enabled: false
repository: null
imageName: null
imageTag: null
@ -323,6 +336,9 @@ s3:
loggingOverrideLevel: null
#allow empty folders
allowEmptyFolder: true
# enable user & permission to s3 (need to inject to all services)
enableAuth: false
skipAuthSecretCreation: false
# Suffix of the host name, {bucket}.{domainName}
domainName: ""
@ -359,17 +375,21 @@ s3:
storageClass: ""
cronjob:
enabled: false
enabled: true
master: "seaweedfs-master:9333"
filer: "seaweedfs-filer-client:8888"
tolerations: ""
nodeSelector: |
sw-backend: "true"
replication:
enable: true
collectionPattern: ""
schedule: "*/7 * * * *"
resources: null
# balance all volumes among volume servers
# ALL|EACH_COLLECTION|<collection_name>
collection: ""
master: ""
filer: ""
tolerations: ""
nodeSelector: |
sw-backend: "true"
certificates:
commonName: "SeaweedFS CA"

View file

@ -156,6 +156,7 @@ message FuseAttributes {
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
bytes md5 = 14;
string disk_type = 15;
}
message CreateEntryRequest {
@ -220,6 +221,7 @@ message AssignVolumeRequest {
string data_center = 5;
string path = 6;
string rack = 7;
string disk_type = 8;
}
message AssignVolumeResponse {
@ -270,11 +272,9 @@ message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
string replication = 1;
string collection = 2;
string ttl = 3;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
@ -358,12 +358,7 @@ message FilerConf {
string collection = 2;
string replication = 3;
string ttl = 4;
enum DiskType {
NONE = 0;
HDD = 1;
SSD = 2;
}
DiskType disk_type = 5;
string disk_type = 5;
bool fsync = 6;
uint32 volume_growth_count = 7;
}

View file

@ -45,7 +45,7 @@ func main() {
defer wg.Done()
client := &http.Client{Transport: &http.Transport{
MaxConnsPerHost: 1024,
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x)))

View file

@ -37,7 +37,7 @@ func main() {
sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano()
}
err := operation.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) {
err := operation.TailVolume(func()string{return *master}, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) {
if n.Size == 0 {
println("-", n.String())
return nil

View file

@ -72,7 +72,7 @@ func runBackup(cmd *Command, args []string) bool {
vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
lookup, err := operation.Lookup(*s.master, vid.String())
lookup, err := operation.Lookup(func() string { return *s.master }, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true

View file

@ -35,6 +35,7 @@ type BenchmarkOptions struct {
sequentialRead *bool
collection *string
replication *string
diskType *string
cpuprofile *string
maxCpu *int
grpcDialOption grpc.DialOption
@ -62,6 +63,7 @@ func init() {
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
@ -234,13 +236,14 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Count: 1,
Collection: *b.collection,
Replication: *b.replication,
DiskType: *b.diskType,
}
if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil {
if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@ -290,7 +293,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
var bytes []byte
for _, url := range urls {
bytes, _, err = util.Get(url)
bytes, _, err = util.FastGet(url)
if err == nil {
break
}

View file

@ -44,15 +44,15 @@ var cmdDownload = &Command{
func runDownload(cmd *Command, args []string) bool {
for _, fid := range args {
if e := downloadToFile(*d.server, fid, util.ResolvePath(*d.dir)); e != nil {
if e := downloadToFile(func() string { return *d.server }, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e)
}
}
return true
}
func downloadToFile(server, fileId, saveDir string) error {
fileUrl, lookupError := operation.LookupFileId(server, fileId)
func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) error {
fileUrl, lookupError := operation.LookupFileId(masterFn, fileId)
if lookupError != nil {
return lookupError
}
@ -83,7 +83,7 @@ func downloadToFile(server, fileId, saveDir string) error {
fids := strings.Split(string(content), "\n")
for _, partId := range fids {
var n int
_, part, err := fetchContent(*d.server, partId)
_, part, err := fetchContent(masterFn, partId)
if err == nil {
n, err = f.Write(part)
}
@ -103,8 +103,8 @@ func downloadToFile(server, fileId, saveDir string) error {
return nil
}
func fetchContent(server string, fileId string) (filename string, content []byte, e error) {
fileUrl, lookupError := operation.LookupFileId(server, fileId)
func fetchContent(masterFn operation.GetMasterFn, fileId string) (filename string, content []byte, e error) {
fileUrl, lookupError := operation.LookupFileId(masterFn, fileId)
if lookupError != nil {
return "", nil, lookupError
}

View file

@ -37,6 +37,7 @@ type CopyOptions struct {
replication *string
collection *string
ttl *string
diskType *string
maxMB *int
masterClient *wdclient.MasterClient
concurrenctFiles *int
@ -54,6 +55,7 @@ func init() {
copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
@ -311,6 +313,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType,
Path: task.destinationUrlPath,
}
@ -405,6 +408,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType,
Path: task.destinationUrlPath + fileName,
}
@ -459,7 +463,9 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
operation.DeleteFiles(copy.masters[0], false, worker.options.grpcDialOption, fileIds)
operation.DeleteFiles(func() string {
return copy.masters[0]
}, false, worker.options.grpcDialOption, fileIds)
return uploadError
}

View file

@ -31,6 +31,8 @@ type SyncOptions struct {
bCollection *string
aTtlSec *int
bTtlSec *int
aDiskType *string
bDiskType *string
aDebug *bool
bDebug *bool
aProxyByFiler *bool
@ -56,6 +58,8 @@ func init() {
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd] hard drive or solid state drive on filer A")
syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd] hard drive or solid state drive on filer B")
syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
@ -90,9 +94,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler,
*syncOptions.filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler,
*syncOptions.bDebug)
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@ -103,9 +106,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler,
*syncOptions.filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler,
*syncOptions.aDebug)
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@ -120,7 +122,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}
func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler, debug bool) error {
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
// read source filer signature
sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
@ -146,7 +148,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
filerSource := &source.FilerSource{}
filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
filerSink := &filersink.FilerSink{}
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption, sinkWriteChunkByFiler)
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {

View file

@ -12,6 +12,7 @@ type MountOptions struct {
dirAutoCreate *bool
collection *string
replication *string
diskType *string
ttlSec *int
chunkSizeLimitMB *int
concurrentWriters *int
@ -41,6 +42,7 @@ func init() {
mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0")

View file

@ -5,6 +5,7 @@ package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"os/user"
"path"
@ -168,6 +169,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
diskType := types.ToDiskType(*option.diskType)
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
MountDirectory: dir,
FilerAddress: filer,
@ -177,6 +180,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
DiskType: diskType,
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
CacheDir: *option.cacheDir,

View file

@ -124,7 +124,7 @@ interpolateParams = false
[mysql2] # or memsql, tidb
enabled = false
createTable = """
CREATE TABLE IF NOT EXISTS %s (
CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
dirhash BIGINT,
name VARCHAR(1000),
directory TEXT,
@ -160,11 +160,12 @@ schema = ""
sslmode = "disable"
connection_max_idle = 100
connection_max_open = 100
connection_max_lifetime_seconds = 0
[postgres2]
enabled = false
createTable = """
CREATE TABLE IF NOT EXISTS %s (
CREATE TABLE IF NOT EXISTS "%s" (
dirhash BIGINT,
name VARCHAR(65535),
directory VARCHAR(65535),
@ -181,6 +182,7 @@ schema = ""
sslmode = "disable"
connection_max_idle = 100
connection_max_open = 100
connection_max_lifetime_seconds = 0
[cassandra]
# CREATE TABLE filemeta (

View file

@ -102,6 +102,7 @@ func init() {
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd] hard drive or solid state drive")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")

View file

@ -27,6 +27,7 @@ type UploadOptions struct {
collection *string
dataCenter *string
ttl *string
diskType *string
maxMB *int
usePublicUrl *bool
}
@ -40,6 +41,7 @@ func init() {
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server")
@ -94,7 +96,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
results, e := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@ -111,7 +113,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
results, _ := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}

View file

@ -2,6 +2,7 @@ package command
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
httppprof "net/http/pprof"
"os"
@ -49,6 +50,7 @@ type VolumeServerOptions struct {
rack *string
whiteList []string
indexType *string
diskType *string
fixJpgOrientation *bool
readRedirect *bool
cpuProfile *string
@ -76,6 +78,7 @@ func init() {
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
@ -167,6 +170,21 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
glog.Fatalf("%d directories by -dir, but only %d minFreeSpacePercent is set by -minFreeSpacePercent", len(v.folders), len(v.minFreeSpacePercents))
}
// set disk types
var diskTypes []types.DiskType
diskTypeStrings := strings.Split(*v.diskType, ",")
for _, diskTypeString := range diskTypeStrings {
diskTypes = append(diskTypes, types.ToDiskType(diskTypeString))
}
if len(diskTypes) == 1 && len(v.folders) > 1 {
for i := 0; i < len(v.folders)-1; i++ {
diskTypes = append(diskTypes, diskTypes[0])
}
}
if len(v.folders) != len(diskTypes) {
glog.Fatalf("%d directories by -dir, but only %d disk types is set by -disk", len(v.folders), len(diskTypes))
}
// security related white list configuration
if volumeWhiteListOption != "" {
v.whiteList = strings.Split(volumeWhiteListOption, ",")
@ -212,7 +230,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits, v.minFreeSpacePercents,
v.folders, v.folderMaxLimits, v.minFreeSpacePercents, diskTypes,
*v.idxFolder,
volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,

11
weed/filer.toml Normal file
View file

@ -0,0 +1,11 @@
[elastic7]
enabled = true
servers = [
"http://localhost:9200",
]
username = ""
password = ""
sniff_enabled = false
healthcheck_enabled = false
# increase the value is recommend, be sure the value in Elastic is greater or equal here
index.max_result_window = 10000

View file

@ -107,7 +107,7 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full
}
if _, found := store.dbs[bucket]; !found {
if err = store.CreateTable(ctx, bucket); err != nil {
if err = store.CreateTable(ctx, bucket); err == nil {
store.dbs[bucket] = true
}
}

View file

@ -18,6 +18,7 @@ type Attr struct {
Replication string // replication
Collection string // collection name
TtlSec int32 // ttl in seconds
DiskType string
UserName string
GroupNames []string
SymlinkTarget string

View file

@ -56,6 +56,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
Collection: entry.Attr.Collection,
Replication: entry.Attr.Replication,
TtlSec: entry.Attr.TtlSec,
DiskType: entry.Attr.DiskType,
UserName: entry.Attr.UserName,
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
@ -81,6 +82,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.Collection = attr.Collection
t.Replication = attr.Replication
t.TtlSec = attr.TtlSec
t.DiskType = attr.DiskType
t.UserName = attr.UserName
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget

View file

@ -102,7 +102,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {

View file

@ -116,7 +116,7 @@ func mergePathConf(a, b *filer_pb.FilerConf_PathConf) {
a.Collection = util.Nvl(b.Collection, a.Collection)
a.Replication = util.Nvl(b.Replication, a.Replication)
a.Ttl = util.Nvl(b.Ttl, a.Ttl)
if b.DiskType != filer_pb.FilerConf_PathConf_NONE {
if b.DiskType != "" {
a.DiskType = b.DiskType
}
a.Fsync = b.Fsync || a.Fsync

View file

@ -55,7 +55,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
if notification.Queue != nil {
glog.V(3).Infof("notifying entry update %v", fullpath)
notification.Queue.SendMessage(fullpath, eventNotification)
if err := notification.Queue.SendMessage(fullpath, eventNotification); err != nil {
// throw message
glog.Error(err)
}
}
f.logMetaEvent(ctx, fullpath, eventNotification)

View file

@ -56,7 +56,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
WritableVolumeCount: rule.VolumeGrowthCount,
}
assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
if err != nil {
return nil, nil, fmt.Errorf("AssignVolume: %v", err)
}

View file

@ -16,31 +16,31 @@ var (
)
func (gen *SqlGenMysql) GetSqlInsert(bucket string) string {
return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket)
return fmt.Sprintf("INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket)
}
func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string {
return fmt.Sprintf("UPDATE %s SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket)
return fmt.Sprintf("UPDATE `%s` SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket)
}
func (gen *SqlGenMysql) GetSqlFind(bucket string) string {
return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket)
return fmt.Sprintf("SELECT meta FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket)
}
func (gen *SqlGenMysql) GetSqlDelete(bucket string) string {
return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket)
return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket)
}
func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string {
return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND directory=?", bucket)
return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=?", bucket)
}
func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string {
return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
}
func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string {
return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
}
func (gen *SqlGenMysql) GetSqlCreateTable(bucket string) string {

View file

@ -47,12 +47,14 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
store.SupportBucketTable = false
store.SqlGenerator = &SqlGenMysql{
CreateTableSqlTemplate: "",
DropTableSqlTemplate: "drop table %s",
DropTableSqlTemplate: "drop table `%s`",
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
adaptedSqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, "<ADAPTED>", hostname, port, database)
if interpolateParams {
sqlUrl += "&interpolateParams=true"
adaptedSqlUrl += "&interpolateParams=true"
}
var dbErr error
@ -60,7 +62,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
if dbErr != nil {
store.DB.Close()
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
}
store.DB.SetMaxIdleConns(maxIdle)

View file

@ -50,12 +50,14 @@ func (store *MysqlStore2) initialize(createTable, user, password, hostname strin
store.SupportBucketTable = true
store.SqlGenerator = &mysql.SqlGenMysql{
CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: "drop table %s",
DropTableSqlTemplate: "drop table `%s`",
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
adaptedSqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, "<ADAPTED>", hostname, port, database)
if interpolateParams {
sqlUrl += "&interpolateParams=true"
adaptedSqlUrl += "&interpolateParams=true"
}
var dbErr error
@ -63,7 +65,7 @@ func (store *MysqlStore2) initialize(createTable, user, password, hostname strin
if dbErr != nil {
store.DB.Close()
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
}
store.DB.SetMaxIdleConns(maxIdle)

View file

@ -17,31 +17,31 @@ var (
)
func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string {
return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)", bucket)
return fmt.Sprintf(`INSERT INTO "%s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)`, bucket)
}
func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string {
return fmt.Sprintf("UPDATE %s SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4", bucket)
return fmt.Sprintf(`UPDATE "%s" SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4`, bucket)
}
func (gen *SqlGenPostgres) GetSqlFind(bucket string) string {
return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket)
return fmt.Sprintf(`SELECT meta FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket)
}
func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string {
return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket)
return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket)
}
func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string {
return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND directory=$2", bucket)
return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND directory=$2`, bucket)
}
func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string {
return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket)
return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket)
}
func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string {
return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket)
return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket)
}
func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string {

View file

@ -3,6 +3,7 @@ package postgres
import (
"database/sql"
"fmt"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
@ -37,40 +38,46 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix
configuration.GetString(prefix+"sslmode"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
)
}
func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) {
func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = false
store.SqlGenerator = &SqlGenPostgres{
CreateTableSqlTemplate: "",
DropTableSqlTemplate: "drop table %s",
DropTableSqlTemplate: `drop table "%s"`,
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
if user != "" {
sqlUrl += " user=" + user
}
adaptedSqlUrl := sqlUrl
if password != "" {
sqlUrl += " password=" + password
adaptedSqlUrl += " password=ADAPTED"
}
if database != "" {
sqlUrl += " dbname=" + database
adaptedSqlUrl += " dbname=" + database
}
if schema != "" {
sqlUrl += " search_path=" + schema
adaptedSqlUrl += " search_path=" + schema
}
var dbErr error
store.DB, dbErr = sql.Open("postgres", sqlUrl)
if dbErr != nil {
store.DB.Close()
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
}
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)

View file

@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
@ -40,40 +41,46 @@ func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix
configuration.GetString(prefix+"sslmode"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
)
}
func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) {
func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = true
store.SqlGenerator = &postgres.SqlGenPostgres{
CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: "drop table %s",
DropTableSqlTemplate: `drop table "%s"`,
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
if user != "" {
sqlUrl += " user=" + user
}
adaptedSqlUrl := sqlUrl
if password != "" {
sqlUrl += " password=" + password
adaptedSqlUrl += " password=ADAPTED"
}
if database != "" {
sqlUrl += " dbname=" + database
adaptedSqlUrl += " dbname=" + database
}
if schema != "" {
sqlUrl += " search_path=" + schema
adaptedSqlUrl += " search_path=" + schema
}
var dbErr error
store.DB, dbErr = sql.Open("postgres", sqlUrl)
if dbErr != nil {
store.DB.Close()
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
}
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)

View file

@ -35,7 +35,7 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
data, _, err := util.Get(target)
data, _, err := util.FastGet(target)
return data, err
}

View file

@ -181,7 +181,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {

View file

@ -192,17 +192,14 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
if fh.f.isOpen == 1 {
if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
return err
}
fh.f.isOpen--
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
if closer, ok := fh.f.reader.(io.Closer); ok {
if closer != nil {
closer.Close()
}
}
fh.f.reader = nil
}

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"os"
@ -34,6 +35,7 @@ type Option struct {
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
CacheDir string
@ -194,6 +196,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
DiskType: string(wfs.option.DiskType),
}
glog.V(4).Infof("reading filer stats: %+v", request)

View file

@ -26,6 +26,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
Replication: wfs.option.Replication,
Collection: wfs.option.Collection,
TtlSec: wfs.option.TtlSec,
DiskType: string(wfs.option.DiskType),
DataCenter: wfs.option.DataCenter,
Path: string(fullPath),
}

View file

@ -17,10 +17,14 @@ package gocdk_pub_sub
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
"gocloud.dev/pubsub/rabbitpubsub"
"net/url"
"path"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
@ -29,12 +33,18 @@ import (
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
_ "gocloud.dev/pubsub/rabbitpubsub"
"os"
)
func init() {
notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
}
func getPath(rawUrl string) string {
parsedUrl, _ := url.Parse(rawUrl)
return path.Join(parsedUrl.Host, parsedUrl.Path)
}
type GoCDKPubSub struct {
topicURL string
topic *pubsub.Topic
@ -44,6 +54,28 @@ func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub"
}
func (k *GoCDKPubSub) doReconnect() {
var conn *amqp.Connection
if k.topic.As(&conn) {
go func() {
<-conn.NotifyClose(make(chan *amqp.Error))
conn.Close()
k.topic.Shutdown(context.Background())
for {
glog.Info("Try reconnect")
conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err == nil {
k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
k.doReconnect()
break
}
glog.Error(err)
time.Sleep(time.Second)
}
}()
}
}
func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
k.topicURL = configuration.GetString(prefix + "topic_url")
glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
@ -52,6 +84,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
glog.Fatalf("Failed to open topic: %v", err)
}
k.topic = topic
k.doReconnect()
return nil
}

View file

@ -18,6 +18,7 @@ type VolumeAssignRequest struct {
Replication string
Collection string
Ttl string
DiskType string
DataCenter string
Rack string
DataNode string
@ -33,7 +34,7 @@ type AssignResult struct {
Auth security.EncodedJwt `json:"auth,omitempty"`
}
func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@ -47,13 +48,14 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
continue
}
lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,
Collection: request.Collection,
Ttl: request.Ttl,
DiskType: request.DiskType,
DataCenter: request.DataCenter,
Rack: request.Rack,
DataNode: request.DataNode,
@ -105,6 +107,7 @@ func LookupJwt(master string, fileId string) security.EncodedJwt {
type StorageOption struct {
Replication string
DiskType string
Collection string
DataCenter string
Rack string
@ -123,6 +126,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
DiskType: so.DiskType,
DataCenter: so.DataCenter,
Rack: so.Rack,
WritableVolumeCount: so.VolumeGrowthCount,
@ -133,6 +137,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
DiskType: so.DiskType,
DataCenter: "",
Rack: "",
WritableVolumeCount: so.VolumeGrowthCount,

View file

@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
func (cm *ChunkManifest) DeleteChunks(master string, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
results, err := DeleteFiles(master, usePublicUrl, grpcDialOption, fileIds)
results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
@ -174,7 +174,9 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
fileUrl, lookupError := LookupFileId(cf.master, ci.Fid)
fileUrl, lookupError := LookupFileId(func() string {
return cf.master
}, ci.Fid)
if lookupError != nil {
return n, lookupError
}

View file

@ -28,10 +28,10 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (results map[string]LookupResult, err error) {
results, err = LookupVolumeIds(master, grpcDialOption, vids)
results, err = LookupVolumeIds(masterFn, grpcDialOption, vids)
if err == nil && usePublicUrl {
for _, result := range results {
for _, loc := range result.Locations {

View file

@ -33,10 +33,10 @@ var (
vc VidCache // caching of volume locations, re-check if after 10 minutes
)
func Lookup(server string, vid string) (ret *LookupResult, err error) {
func Lookup(masterFn GetMasterFn, vid string) (ret *LookupResult, err error) {
locations, cache_err := vc.Get(vid)
if cache_err != nil {
if ret, err = do_lookup(server, vid); err == nil {
if ret, err = do_lookup(masterFn, vid); err == nil {
vc.Set(vid, ret.Locations, 10*time.Minute)
}
} else {
@ -45,9 +45,10 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) {
return
}
func do_lookup(server string, vid string) (*LookupResult, error) {
func do_lookup(masterFn GetMasterFn, vid string) (*LookupResult, error) {
values := make(url.Values)
values.Add("volumeId", vid)
server := masterFn()
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
if err != nil {
return nil, err
@ -63,12 +64,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) {
return &ret, nil
}
func LookupFileId(server string, fileId string) (fullUrl string, err error) {
func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return "", errors.New("Invalid fileId " + fileId)
}
lookup, lookupError := Lookup(server, parts[0])
lookup, lookupError := Lookup(masterFn, parts[0])
if lookupError != nil {
return "", lookupError
}
@ -79,7 +80,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
}
// LookupVolumeIds find volume locations by cache and actual lookup
func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
@ -99,7 +100,7 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin
//only query unknown_vids
err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeIds: unknown_vids,

View file

@ -25,6 +25,7 @@ type FilePart struct {
Collection string
DataCenter string
Ttl string
DiskType string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
Fsync bool
@ -38,7 +39,9 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
type GetMasterFn func() string
func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@ -49,8 +52,9 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
Collection: collection,
DataCenter: dataCenter,
Ttl: ttl,
DiskType: diskType,
}
ret, err := Assign(master, grpcDialOption, ar)
ret, err := Assign(masterFn, grpcDialOption, ar)
if err != nil {
for index := range files {
results[index].Error = err.Error()
@ -70,7 +74,8 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Collection = collection
file.DataCenter = dataCenter
file.Ttl = ttl
results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
file.DiskType = diskType
results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@ -113,7 +118,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@ -143,8 +148,9 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
DiskType: fi.DiskType,
}
ret, err = Assign(master, grpcDialOption, ar)
ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
return
}
@ -156,11 +162,12 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
DiskType: fi.DiskType,
}
ret, err = Assign(master, grpcDialOption, ar)
ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
return
}
id = ret.Fid
@ -177,11 +184,11 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
count, e := upload_one_chunk(
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
master, fileUrl,
masterFn, fileUrl,
ret.Auth)
if e != nil {
// delete all uploaded chunks
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@ -196,7 +203,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
}
} else {
ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
@ -208,7 +215,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
return
}
func upload_one_chunk(filename string, reader io.Reader, master,
func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")

View file

@ -11,9 +11,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
// find volume location, replication, ttl info
lookup, err := Lookup(master, vid.String())
lookup, err := Lookup(masterFn, vid.String())
if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err)
}

View file

@ -58,6 +58,7 @@ var (
func init() {
HttpClient = &http.Client{Transport: &http.Transport{
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
}
@ -99,6 +100,7 @@ func retriedUploadData(uploadUrl string, filename string, cipher bool, data []by
} else {
glog.Warningf("uploading to %s: %v", uploadUrl, err)
}
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
return
}

View file

@ -156,6 +156,7 @@ message FuseAttributes {
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
bytes md5 = 14;
string disk_type = 15;
}
message CreateEntryRequest {
@ -220,6 +221,7 @@ message AssignVolumeRequest {
string data_center = 5;
string path = 6;
string rack = 7;
string disk_type = 8;
}
message AssignVolumeResponse {
@ -270,11 +272,9 @@ message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
string replication = 1;
string collection = 2;
string ttl = 3;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
@ -358,12 +358,7 @@ message FilerConf {
string collection = 2;
string replication = 3;
string ttl = 4;
enum DiskType {
NONE = 0;
HDD = 1;
SSD = 2;
}
DiskType disk_type = 5;
string disk_type = 5;
bool fsync = 6;
uint32 volume_growth_count = 7;
}

File diff suppressed because it is too large Load diff

View file

@ -29,6 +29,7 @@ var (
func init() {
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024
}
func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {

View file

@ -44,7 +44,6 @@ message Heartbeat {
string ip = 1;
uint32 port = 2;
string public_url = 3;
uint32 max_volume_count = 4;
uint64 max_file_key = 5;
string data_center = 6;
string rack = 7;
@ -62,6 +61,8 @@ message Heartbeat {
repeated VolumeEcShardInformationMessage deleted_ec_shards = 18;
bool has_no_ec_shards = 19;
map<string, uint32> max_volume_counts = 4;
}
message HeartbeatResponse {
@ -87,6 +88,7 @@ message VolumeInformationMessage {
int64 modified_at_second = 12;
string remote_storage_name = 13;
string remote_storage_key = 14;
string disk_type = 15;
}
message VolumeShortInformationMessage {
@ -95,12 +97,14 @@ message VolumeShortInformationMessage {
uint32 replica_placement = 8;
uint32 version = 9;
uint32 ttl = 10;
string disk_type = 15;
}
message VolumeEcShardInformationMessage {
uint32 id = 1;
string collection = 2;
uint32 ec_index_bits = 3;
string disk_type = 4;
}
message StorageBackend {
@ -163,6 +167,7 @@ message AssignRequest {
string data_node = 7;
uint32 memory_map_max_size_mb = 8;
uint32 Writable_volume_count = 9;
string disk_type = 10;
}
message AssignResponse {
string fid = 1;
@ -177,11 +182,9 @@ message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
string replication = 1;
string collection = 2;
string ttl = 3;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
@ -210,8 +213,8 @@ message CollectionDeleteResponse {
//
// volume related
//
message DataNodeInfo {
string id = 1;
message DiskInfo {
string type = 1;
uint64 volume_count = 2;
uint64 max_volume_count = 3;
uint64 free_volume_count = 4;
@ -220,32 +223,24 @@ message DataNodeInfo {
repeated VolumeEcShardInformationMessage ec_shard_infos = 7;
uint64 remote_volume_count = 8;
}
message DataNodeInfo {
string id = 1;
map<string, DiskInfo> diskInfos = 2;
}
message RackInfo {
string id = 1;
uint64 volume_count = 2;
uint64 max_volume_count = 3;
uint64 free_volume_count = 4;
uint64 active_volume_count = 5;
repeated DataNodeInfo data_node_infos = 6;
uint64 remote_volume_count = 7;
repeated DataNodeInfo data_node_infos = 2;
map<string, DiskInfo> diskInfos = 3;
}
message DataCenterInfo {
string id = 1;
uint64 volume_count = 2;
uint64 max_volume_count = 3;
uint64 free_volume_count = 4;
uint64 active_volume_count = 5;
repeated RackInfo rack_infos = 6;
uint64 remote_volume_count = 7;
repeated RackInfo rack_infos = 2;
map<string, DiskInfo> diskInfos = 3;
}
message TopologyInfo {
string id = 1;
uint64 volume_count = 2;
uint64 max_volume_count = 3;
uint64 free_volume_count = 4;
uint64 active_volume_count = 5;
repeated DataCenterInfo data_center_infos = 6;
uint64 remote_volume_count = 7;
repeated DataCenterInfo data_center_infos = 2;
map<string, DiskInfo> diskInfos = 3;
}
message VolumeListRequest {
}

File diff suppressed because it is too large Load diff

View file

@ -157,6 +157,7 @@ message AllocateVolumeRequest {
string replication = 4;
string ttl = 5;
uint32 memory_map_max_size_mb = 6;
string disk_type = 7;
}
message AllocateVolumeResponse {
}
@ -233,6 +234,7 @@ message VolumeCopyRequest {
string replication = 3;
string ttl = 4;
string source_data_node = 5;
string disk_type = 6;
}
message VolumeCopyResponse {
uint64 last_append_at_ns = 1;
@ -361,6 +363,7 @@ message ReadVolumeFileStatusResponse {
uint64 file_count = 6;
uint32 compaction_revision = 7;
string collection = 8;
string disk_type = 9;
}
message DiskStatus {
@ -370,6 +373,7 @@ message DiskStatus {
uint64 free = 4;
float percent_free = 5;
float percent_used = 6;
string disk_type = 7;
}
message MemStatus {

File diff suppressed because it is too large Load diff

View file

@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
shouldRetry, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {

View file

@ -78,6 +78,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
Collection: fs.collection,
TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter,
DiskType: fs.diskType,
Path: path,
}

View file

@ -25,6 +25,7 @@ type FilerSink struct {
replication string
collection string
ttlSec int32
diskType string
dataCenter string
grpcDialOption grpc.DialOption
address string
@ -51,6 +52,7 @@ func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string)
configuration.GetString(prefix+"replication"),
configuration.GetString(prefix+"collection"),
configuration.GetInt(prefix+"ttlSec"),
configuration.GetString(prefix+"disk"),
security.LoadClientTLS(util.GetViper(), "grpc.client"),
false)
}
@ -60,7 +62,7 @@ func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
}
func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
fs.address = address
if fs.address == "" {
fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
@ -70,6 +72,7 @@ func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
fs.diskType = diskType
fs.grpcDialOption = grpcDialOption
fs.writeChunkByFiler = writeChunkByFiler
return nil

View file

@ -9,9 +9,12 @@ import (
"github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
"gocloud.dev/pubsub/rabbitpubsub"
"net/url"
"os"
"path"
"strings"
"time"
// _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
@ -74,6 +77,7 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str
type GoCDKPubSubInput struct {
sub *pubsub.Subscription
subURL string
}
func (k *GoCDKPubSubInput) GetName() string {
@ -82,9 +86,9 @@ func (k *GoCDKPubSubInput) GetName() string {
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
topicUrl := configuration.GetString(prefix + "topic_url")
subURL := configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
k.subURL = configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
sub, err := pubsub.OpenSubscription(context.Background(), k.subURL)
if err != nil {
return err
}
@ -95,10 +99,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
return err
}
defer ch.Close()
_, err = ch.QueueInspect(getPath(subURL))
_, err = ch.QueueInspect(getPath(k.subURL))
if err != nil {
if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil {
return err
}
} else {
@ -111,10 +115,25 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
}
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg, err := k.sub.Receive(context.Background())
ctx := context.Background()
msg, err := k.sub.Receive(ctx)
if err != nil {
var conn *amqp.Connection
if k.sub.As(&conn) && conn.IsClosed() {
conn.Close()
k.sub.Shutdown(ctx)
conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err != nil {
glog.Error(err)
time.Sleep(time.Second)
return
}
k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil)
return
}
// This is permanent cached sub err
glog.Fatal(err)
}
onFailureFn = func() {
if msg.Nackable() {
isRedelivered := false

View file

@ -27,6 +27,7 @@ var (
func init() {
client = &http.Client{Transport: &http.Transport{
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
}
@ -184,7 +185,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
for _, object := range deleteObjects.Objects {
lastSeparator := strings.LastIndex(object.ObjectName, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "/", object.ObjectName, true, false
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
entryName = object.ObjectName[lastSeparator+1:]
parentDirectoryPath = "/" + object.ObjectName[:lastSeparator]
@ -207,7 +208,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
// purge empty folders, only checking folders with deletions
for len(directoriesWithDeletion) > 0 {
directoriesWithDeletion = doDeleteEmptyDirectories(client, directoriesWithDeletion)
directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
}
return nil
@ -223,7 +224,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
func doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int){
func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
var allDirs []string
for dir, _ := range directoriesWithDeletion {
allDirs = append(allDirs, dir)
@ -234,6 +235,9 @@ func doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWit
newDirectoriesWithDeletion = make(map[string]int)
for _, dir := range allDirs {
parentDir, dirName := util.FullPath(dir).DirAndName()
if parentDir == s3a.option.BucketsPath {
continue
}
if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
} else {

View file

@ -100,7 +100,7 @@ func debug(params ...interface{}) {
glog.V(4).Infoln(params...)
}
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) {
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) {
m := make(map[string]interface{})
if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@ -131,8 +131,9 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
DiskType: r.FormValue("disk"),
}
assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return

View file

@ -263,6 +263,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
newEntry.Attributes.Collection,
newEntry.Attributes.Replication,
newEntry.Attributes.TtlSec,
newEntry.Attributes.DiskType,
"",
"",
)
@ -306,7 +307,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.Chunks, req.Chunks...)
so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "")
so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks)
if err != nil {
// not good, but should be ok
@ -332,11 +333,11 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack)
so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
@ -402,6 +403,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
Replication: req.Replication,
Collection: req.Collection,
Ttl: req.Ttl,
DiskType: req.DiskType,
})
if grpcErr != nil {
return grpcErr

View file

@ -14,6 +14,7 @@ var (
func init() {
client = &http.Client{Transport: &http.Transport{
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
}

View file

@ -37,7 +37,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
ar, altRequest := so.ToAssignRequests(1)
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
err = ae
@ -61,6 +61,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
query.Get("disk"),
query.Get("dataCenter"),
query.Get("rack"),
)
@ -104,7 +105,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption {
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption {
collection := util.Nvl(qCollection, fs.option.Collection)
replication := util.Nvl(qReplication, fs.option.DefaultReplication)
@ -134,17 +135,18 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
Rack: util.Nvl(rack, fs.option.Rack),
TtlSeconds: ttlSeconds,
DiskType: util.Nvl(diskType, rule.DiskType),
Fsync: fsync || rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
}
}
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption {
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack)
return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack)
}

View file

@ -104,7 +104,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
fileName := ""
fileName := path.Base(r.URL.Path)
contentType := r.Header.Get("Content-Type")
if contentType == "application/octet-stream" {
contentType = ""
@ -186,6 +186,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
DiskType: so.DiskType,
Mime: contentType,
Md5: md5bytes,
FileSize: uint64(chunkOffset),

View file

@ -68,6 +68,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
DiskType: so.DiskType,
Mime: pu.MimeType,
Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},

View file

@ -67,9 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
int64(heartbeat.MaxVolumeCount))
dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
@ -79,10 +77,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
}
if heartbeat.MaxVolumeCount != 0 && dn.GetMaxVolumeCount() != int64(heartbeat.MaxVolumeCount) {
delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount()
dn.UpAdjustMaxVolumeCountDelta(delta)
}
dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@ -60,11 +61,13 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if err != nil {
return nil, err
}
diskType := types.ToDiskType(req.DiskType)
option := &topology.VolumeGrowOption{
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
DiskType: diskType,
Prealloacte: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
@ -73,7 +76,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.FreeSpace() <= 0 {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("No free volumes left!")
}
ms.vgLock.Lock()
@ -117,10 +120,10 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
return nil, err
}
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl)
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{
TotalSize: uint64(totalSize),

View file

@ -112,7 +112,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.FreeSpace() <= 0 {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"})
return
}
@ -136,6 +136,9 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId string, isWrite bool) {
if fileId == "" {
return
}
var encodedJwt security.EncodedJwt
if isWrite {
encodedJwt = security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId)

View file

@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"math/rand"
"net/http"
"strconv"
@ -75,8 +76,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
}
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) {
err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount())
} else {
count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
}
@ -124,19 +125,19 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOption)
submitForClientHandler(w, r, func() string { return ms.selfUrl(r) }, ms.grpcDialOption)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
submitForClientHandler(w, r, masterUrl, ms.grpcDialOption)
submitForClientHandler(w, r, func() string { return masterUrl }, ms.grpcDialOption)
}
}
}
func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
return vl.GetActiveVolumeCount(option) > 0
}
@ -157,6 +158,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
diskType := types.ToDiskType(r.FormValue("disk"))
preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
@ -169,6 +171,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
DiskType: diskType,
Prealloacte: preallocate,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),

View file

@ -41,6 +41,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Ttl,
req.Preallocate,
req.MemoryMapMaxSizeMb,
types.ToDiskType(req.DiskType),
)
if err != nil {

View file

@ -222,7 +222,6 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
Ip: vs.store.Ip,
Port: uint32(vs.store.Port),
PublicUrl: vs.store.PublicUrl,
MaxVolumeCount: uint32(0),
MaxFileKey: uint64(0),
DataCenter: vs.store.GetDataCenter(),
Rack: vs.store.GetRack(),

View file

@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"io/ioutil"
"math"
@ -36,11 +37,6 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
}
location := vs.store.FindFreeLocation()
if location == nil {
return nil, fmt.Errorf("no space left")
}
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
// copy .dat and .idx files
// read .idx .dat file size and timestamp
@ -59,6 +55,15 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return fmt.Errorf("read volume file status failed, %v", err)
}
diskType := volFileInfoResp.DiskType
if req.DiskType != "" {
diskType = req.DiskType
}
location := vs.store.FindFreeLocation(types.ToDiskType(diskType))
if location == nil {
return fmt.Errorf("no space left")
}
dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
@ -206,6 +211,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
resp.FileCount = v.FileCount()
resp.CompactionRevision = uint32(v.CompactionRevision)
resp.Collection = v.Collection
resp.DiskType = string(v.DiskType())
return resp, nil
}

View file

@ -105,7 +105,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
location := vs.store.FindFreeLocation()
location := vs.store.FindFreeLocation(types.HardDriveType)
if location == nil {
return nil, fmt.Errorf("no space left")
}

View file

@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
"google.golang.org/grpc"
@ -37,7 +38,7 @@ type VolumeServer struct {
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
folders []string, maxCounts []int, minFreeSpacePercents []float32,
folders []string, maxCounts []int, minFreeSpacePercents []float32, diskTypes []types.DiskType,
idxFolder string,
needleMapKind storage.NeedleMapKind,
masterNodes []string, pulseSeconds int,
@ -76,7 +77,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
vs.checkWithMaster()
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind)
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind, diskTypes)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)

View file

@ -16,7 +16,9 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
ds = append(ds, stats.NewDiskStatus(dir))
newDiskStatus := stats.NewDiskStatus(dir)
newDiskStatus.DiskType = loc.DiskType.String()
ds = append(ds, newDiskStatus)
}
}
m["DiskStatuses"] = ds
@ -31,7 +33,9 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request)
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
ds = append(ds, stats.NewDiskStatus(dir))
newDiskStatus := stats.NewDiskStatus(dir)
newDiskStatus.DiskType = loc.DiskType.String()
ds = append(ds, newDiskStatus)
}
}
m["DiskStatuses"] = ds

View file

@ -63,7 +63,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotFound)
return
}
lookupResult, err := operation.Lookup(vs.GetMaster(), volumeId.String())
lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))

View file

@ -19,7 +19,9 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
ds = append(ds, stats.NewDiskStatus(dir))
newDiskStatus := stats.NewDiskStatus(dir)
newDiskStatus.DiskType = loc.DiskType.String()
ds = append(ds, newDiskStatus)
}
}
volumeInfos := vs.store.VolumeInfos()

View file

@ -50,7 +50,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
ret := operation.UploadResult{}
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, reqNeedle, r)
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.store, volumeId, reqNeedle, r)
// http 204 status code does not allow body
if writeError == nil && isUnchanged {
@ -128,7 +128,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
// make sure all chunks had deleted before delete manifest
if e := chunkManifest.DeleteChunks(vs.GetMaster(), false, vs.grpcDialOption); e != nil {
if e := chunkManifest.DeleteChunks(vs.GetMaster, false, vs.grpcDialOption); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return
}
@ -143,7 +143,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
}
_, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r)
_, err := topology.ReplicatedDelete(vs.GetMaster, vs.store, volumeId, n, r)
writeDeleteResult(err, count, w, r)

View file

@ -69,6 +69,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<thead>
<tr>
<th>Path</th>
<th>Disk</th>
<th>Total</th>
<th>Free</th>
<th>Usage</th>
@ -78,6 +79,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
{{ range .DiskStatuses }}
<tr>
<td>{{ .Dir }}</td>
<td>{{ .DiskType }}</td>
<td>{{ bytesToHumanReadable .All }}</td>
<td>{{ bytesToHumanReadable .Free }}</td>
<td>{{ percentFrom .All .Used}}%</td>
@ -127,6 +129,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<th>Id</th>
<th>Collection</th>
<th>Disk</th>
<th>Data Size</th>
<th>Files</th>
<th>Trash</th>
@ -139,6 +142,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<td><code>{{ .Id }}</code></td>
<td>{{ .Collection }}</td>
<td>{{ .DiskType }}</td>
<td>{{ bytesToHumanReadable .Size }}</td>
<td>{{ .FileCount }}</td>
<td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>

View file

@ -33,6 +33,7 @@ type WebDavOption struct {
BucketsPath string
GrpcDialOption grpc.DialOption
Collection string
DiskType string
Uid uint32
Gid uint32
Cipher bool
@ -382,6 +383,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
Count: 1,
Replication: "",
Collection: f.fs.option.Collection,
DiskType: f.fs.option.DiskType,
Path: name,
}

View file

@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"sort"
@ -325,8 +326,10 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra
var possibleDestinationEcNodes []*EcNode
for _, n := range racks[RackId(rackId)].ecNodes {
if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
}
sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
@ -386,11 +389,15 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
rackEcNodes = append(rackEcNodes, node)
}
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(node *EcNode) (id string, count int) {
for _, ecShardInfo := range node.info.EcShardInfos {
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
if !found {
return
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
}
return node.info.Id, count
return ecNode.info.Id, count
})
var totalShardCount int
@ -411,10 +418,13 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
emptyNodeIds := make(map[uint32]bool)
for _, shards := range emptyNode.info.EcShardInfos {
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
for _, shards := range emptyDiskInfo.EcShardInfos {
emptyNodeIds[shards.Id] = true
}
for _, shards := range fullNode.info.EcShardInfos {
}
if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
@ -435,6 +445,7 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
}
}
}
}
return nil
}
@ -511,7 +522,11 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode {
vidLocations := make(map[needle.VolumeId][]*EcNode)
for _, ecNode := range allEcNodes {
for _, shardInfo := range ecNode.info.EcShardInfos {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
if !found {
continue
}
for _, shardInfo := range diskInfo.EcShardInfos {
vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
}
}

View file

@ -3,6 +3,7 @@ package shell
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"math"
"sort"
@ -159,8 +160,15 @@ func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (cou
return
}
func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
return int(dn.MaxVolumeCount-dn.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(dn.EcShardInfos)
func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
if dn.DiskInfos == nil {
return 0
}
diskInfo := dn.DiskInfos[string(diskType)]
if diskInfo == nil {
return 0
}
return int(diskInfo.MaxVolumeCount-diskInfo.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
}
type RackId string
@ -174,12 +182,14 @@ type EcNode struct {
}
func (ecNode *EcNode) localShardIdCount(vid uint32) int {
for _, ecShardInfo := range ecNode.info.EcShardInfos {
for _, diskInfo := range ecNode.info.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
if vid == ecShardInfo.Id {
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
return shardBits.ShardIdCount()
}
}
}
return 0
}
@ -214,7 +224,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
return
}
freeEcSlots := countFreeShardSlots(dn)
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
ecNodes = append(ecNodes, &EcNode{
info: dn,
dc: dc,
@ -278,11 +288,13 @@ func ceilDivide(total, n int) int {
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
for _, shardInfo := range ecNode.info.EcShardInfos {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
}
}
}
return 0
}
@ -290,7 +302,9 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
foundVolume := false
for _, shardInfo := range ecNode.info.EcShardInfos {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
if found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
newShardBits := oldShardBits
@ -303,16 +317,23 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
break
}
}
} else {
diskInfo = &master_pb.DiskInfo{
Type: string(types.HardDriveType),
}
ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
}
if !foundVolume {
var newShardBits erasure_coding.ShardBits
for _, shardId := range shardIds {
newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
}
ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(newShardBits),
DiskType: string(types.HardDriveType),
})
ecNode.freeEcSlot -= len(shardIds)
}
@ -322,7 +343,8 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
for _, shardInfo := range ecNode.info.EcShardInfos {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
newShardBits := oldShardBits
@ -333,6 +355,7 @@ func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint3
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
}
}
}
return ecNode
}

View file

@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"google.golang.org/grpc"
@ -225,11 +226,13 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn
func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, v := range dn.EcShardInfos {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Collection == selectedCollection && v.Id == uint32(vid) {
ecShardInfos = append(ecShardInfos, v)
}
}
}
})
return
@ -239,11 +242,13 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri
vidMap := make(map[uint32]bool)
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, v := range dn.EcShardInfos {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Collection == selectedCollection {
vidMap[v.Id] = true
}
}
}
})
for vid := range vidMap {
@ -257,11 +262,13 @@ func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeI
nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, v := range dn.EcShardInfos {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) {
nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
}
}
}
})
return nodeToEcIndexBits

View file

@ -281,13 +281,15 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
vidMap := make(map[uint32]bool)
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, v := range dn.VolumeInfos {
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 {
vidMap[v.Id] = true
}
}
}
}
})
for vid := range vidMap {

Some files were not shown because too many files have changed in this diff Show more