Merge pull request #74 from chrislusf/master

sync
This commit is contained in:
hilimd 2021-03-12 15:05:31 +08:00 committed by GitHub
commit 17d02264f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
121 changed files with 3358 additions and 441 deletions

View file

@ -40,6 +40,7 @@ jobs:
goarch: ${{ matrix.goarch }}
release_tag: dev
overwrite: true
pre_command: export CGO_ENABLED=0
build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .`
@ -55,6 +56,7 @@ jobs:
goarch: ${{ matrix.goarch }}
release_tag: dev
overwrite: true
pre_command: export CGO_ENABLED=0
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .`
project_path: weed

View file

@ -79,17 +79,34 @@ SeaweedFS is a simple and highly scalable distributed file system. There are two
1. to store billions of files!
2. to serve the files fast!
SeaweedFS started as an Object Store to handle small files efficiently. Instead of managing all file metadata in a central master, the central master only manages volumes on volume servers, and these volume servers manage files and their metadata. This relieves concurrency pressure from the central master and spreads file metadata into volume servers, allowing faster file access (O(1), usually just one disk read operation).
SeaweedFS started as an Object Store to handle small files efficiently.
Instead of managing all file metadata in a central master,
the central master only manages volumes on volume servers,
and these volume servers manage files and their metadata.
This relieves concurrency pressure from the central master and spreads file metadata into volume servers,
allowing faster file access (O(1), usually just one disk read operation).
SeaweedFS can transparently integrate with the cloud. With hot data on local cluster, and warm data on the cloud with O(1) access time, SeaweedFS can achieve both fast local access time and elastic cloud storage capacity. What's more, the cloud storage access API cost is minimized. Faster and Cheaper than direct cloud storage!
SeaweedFS can transparently integrate with the cloud.
With hot data on local cluster, and warm data on the cloud with O(1) access time,
SeaweedFS can achieve both fast local access time and elastic cloud storage capacity.
What's more, the cloud storage access API cost is minimized.
Faster and Cheaper than direct cloud storage!
Signup for future managed SeaweedFS cluster offering at "seaweedfilesystem at gmail dot com".
There is only 40 bytes of disk storage overhead for each file's metadata. It is so simple with O(1) disk reads that you are welcome to challenge the performance with your actual use cases.
There is only 40 bytes of disk storage overhead for each file's metadata.
It is so simple with O(1) disk reads that you are welcome to challenge the performance with your actual use cases.
SeaweedFS started by implementing [Facebook's Haystack design paper](http://www.usenix.org/event/osdi10/tech/full_papers/Beaver.pdf). Also, SeaweedFS implements erasure coding with ideas from [f4: Facebooks Warm BLOB Storage System](https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-muralidhar.pdf)
SeaweedFS started by implementing [Facebook's Haystack design paper](http://www.usenix.org/event/osdi10/tech/full_papers/Beaver.pdf).
Also, SeaweedFS implements erasure coding with ideas from
[f4: Facebooks Warm BLOB Storage System](https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-muralidhar.pdf), and has a lot of similarities with [Facebooks Tectonic Filesystem](https://www.usenix.org/system/files/fast21-pan.pdf)
On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, RocksDB, MemSql, TiDB, Etcd, CockroachDB, etc.
On top of the object store, optional [Filer] can support directories and POSIX attributes.
Filer is a separate linearly-scalable stateless server with customizable metadata stores,
e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, RocksDB, MemSql, TiDB, Etcd, CockroachDB, etc.
For any distributed key value stores, the large values can be offloaded to SeaweedFS. With the fast access speed and linearly scalable capacity, SeaweedFS can work as a distributed [Key-Large-Value store][KeyLargeValueStore].
For any distributed key value stores, the large values can be offloaded to SeaweedFS.
With the fast access speed and linearly scalable capacity,
SeaweedFS can work as a distributed [Key-Large-Value store][KeyLargeValueStore].
[Back to TOC](#table-of-contents)
@ -105,6 +122,7 @@ For any distributed key value stores, the large values can be offloaded to Seawe
* Support ETag, Accept-Range, Last-Modified, etc.
* Support in-memory/leveldb/readonly mode tuning for memory/performance balance.
* Support rebalancing the writable and readonly volumes.
* [Customizable Multiple Storage Tiers][TieredStorage]: Customizable storage disk types to balance performance and cost.
* [Transparent cloud integration][CloudTier]: unlimited capacity via tiered cloud storage for warm data.
* [Erasure Coding for warm storage][ErasureCoding] Rack-Aware 10.4 erasure coding reduces storage cost and increases availability.
@ -135,6 +153,7 @@ For any distributed key value stores, the large values can be offloaded to Seawe
[Hadoop]: https://github.com/chrislusf/seaweedfs/wiki/Hadoop-Compatible-File-System
[WebDAV]: https://github.com/chrislusf/seaweedfs/wiki/WebDAV
[ErasureCoding]: https://github.com/chrislusf/seaweedfs/wiki/Erasure-coding-for-warm-storage
[TieredStorage]: https://github.com/chrislusf/seaweedfs/wiki/Tiered-Storage
[CloudTier]: https://github.com/chrislusf/seaweedfs/wiki/Cloud-Tier
[FilerDataEncryption]: https://github.com/chrislusf/seaweedfs/wiki/Filer-Data-Encryption
[FilerTTL]: https://github.com/chrislusf/seaweedfs/wiki/Filer-Stores

View file

@ -1,3 +1,4 @@
<h1 align="center">Sponsors &amp; Backers</h1>
- [Become a backer or sponsor on Patreon](https://www.patreon.com/seaweedfs).
@ -12,3 +13,4 @@
- [ColorfulClouds Tech Co. Ltd.](https://caiyunai.com/)
- [Haravan - Ecommerce Platform](https://www.haravan.com)
- PeterCxy - Creator of Shelter App

View file

@ -6,7 +6,7 @@ ARG BRANCH=${BRANCH:-master}
RUN cd /go/src/github.com/chrislusf/seaweedfs && git checkout $BRANCH
RUN cd /go/src/github.com/chrislusf/seaweedfs/weed \
&& export LDFLAGS="-X github.com/chrislusf/seaweedfs/weed/util.COMMIT=$(git rev-parse --short HEAD)" \
&& go install -ldflags "${LDFLAGS}"
&& CGO_ENABLED=0 go install -ldflags "-extldflags -static ${LDFLAGS}"
FROM alpine AS final
LABEL author="Chris Lu"

View file

@ -6,7 +6,7 @@ ARG BRANCH=${BRANCH:-master}
RUN cd /go/src/github.com/chrislusf/seaweedfs && git checkout $BRANCH
RUN cd /go/src/github.com/chrislusf/seaweedfs/weed \
&& export LDFLAGS="-X github.com/chrislusf/seaweedfs/weed/util.COMMIT=$(git rev-parse --short HEAD)" \
&& go install -tags 5BytesOffset -ldflags "${LDFLAGS}"
&& CGO_ENABLED=0 go install -tags 5BytesOffset -ldflags "-extldflags -static ${LDFLAGS}"
FROM alpine AS final
LABEL author="Chris Lu"

View file

@ -5,7 +5,7 @@ all: gen
gen: dev
build:
cd ../weed; GOOS=linux go build; mv weed ../docker/
cd ../weed; CGO_ENABLED=0 GOOS=linux go build -ldflags "-extldflags -static"; mv weed ../docker/
docker build --no-cache -t chrislusf/seaweedfs:local -f Dockerfile.local .
rm ./weed
@ -15,6 +15,9 @@ s3tests_build:
dev: build
docker-compose -f compose/local-dev-compose.yml -p seaweedfs up
dev_tls: build certstrap
ENV_FILE="tls.env" docker-compose -f compose/local-dev-compose.yml -p seaweedfs up
dev_mount: build
docker-compose -f compose/local-mount-compose.yml -p seaweedfs up
@ -41,3 +44,15 @@ filer_etcd: build
clean:
rm ./weed
certstrap:
go get github.com/square/certstrap
certstrap --depot-path compose/tls init --passphrase "" --common-name "SeaweedFS CA" || true
certstrap --depot-path compose/tls request-cert --passphrase "" --common-name volume01.dev || true
certstrap --depot-path compose/tls request-cert --passphrase "" --common-name master01.dev || true
certstrap --depot-path compose/tls request-cert --passphrase "" --common-name filer01.dev || true
certstrap --depot-path compose/tls request-cert --passphrase "" --common-name client01.dev || true
certstrap --depot-path compose/tls sign --CA "SeaweedFS CA" volume01.dev || true
certstrap --depot-path compose/tls sign --CA "SeaweedFS CA" master01.dev || true
certstrap --depot-path compose/tls sign --CA "SeaweedFS CA" filer01.dev || true
certstrap --depot-path compose/tls sign --CA "SeaweedFS CA" client01.dev || true

0
docker/compose/dev.env Normal file
View file

View file

@ -11,6 +11,10 @@ services:
- 8888:8888
- 18888:18888
command: "server -ip=server1 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"
volumes:
- ./master-cloud.toml:/etc/seaweedfs/master.toml
depends_on:
- server2
server2:
image: chrislusf/seaweedfs:local
ports:
@ -20,4 +24,5 @@ services:
- 18085:18080
- 8889:8888
- 18889:18888
command: "server -ip=server2 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"
- 8334:8333
command: "server -ip=server2 -filer -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"

View file

@ -6,33 +6,49 @@ services:
ports:
- 9333:9333
- 19333:19333
command: "master -ip=master"
command: "-v=1 master -ip=master"
volumes:
- ./tls:/etc/seaweedfs/tls
env_file:
- ${ENV_FILE:-dev.env}
volume:
image: chrislusf/seaweedfs:local
ports:
- 8080:8080
- 18080:18080
command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1"
command: "-v=1 volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1"
depends_on:
- master
volumes:
- ./tls:/etc/seaweedfs/tls
env_file:
- ${ENV_FILE:-dev.env}
filer:
image: chrislusf/seaweedfs:local
ports:
- 8888:8888
- 18888:18888
command: 'filer -master="master:9333"'
command: '-v=1 filer -master="master:9333"'
depends_on:
- master
- volume
volumes:
- ./tls:/etc/seaweedfs/tls
env_file:
- ${ENV_FILE:-dev.env}
s3:
image: chrislusf/seaweedfs:local
ports:
- 8333:8333
command: 's3 -filer="filer:8888"'
command: '-v=1 s3 -filer="filer:8888"'
depends_on:
- master
- volume
- filer
volumes:
- ./tls:/etc/seaweedfs/tls
env_file:
- ${ENV_FILE:-dev.env}
mount:
image: chrislusf/seaweedfs:local
privileged: true
@ -40,6 +56,10 @@ services:
- SYS_ADMIN
mem_limit: 4096m
command: '-v=4 mount -filer="filer:8888" -dirAutoCreate -dir=/mnt/seaweedfs -cacheCapacityMB=100 -concurrentWriters=128'
volumes:
- ./tls:/etc/seaweedfs/tls
env_file:
- ${ENV_FILE:-dev.env}
depends_on:
- master
- volume

View file

@ -0,0 +1,30 @@
# Put this file to one of the location, with descending priority
# ./master.toml
# $HOME/.seaweedfs/master.toml
# /etc/seaweedfs/master.toml
# this file is read by master
[master.maintenance]
# periodically run these scripts are the same as running them from 'weed shell'
scripts = """
lock
ec.encode -fullPercent=95 -quietFor=1h
ec.rebuild -force
ec.balance -force
volume.balance -force
volume.fix.replication
unlock
"""
sleep_minutes = 17 # sleep minutes between each script execution
# configurations for tiered cloud storage
# old volumes are transparently moved to cloud for cost efficiency
[storage.backend]
[storage.backend.s3.default]
enabled = true
aws_access_key_id = "any" # if empty, loads from the shared credentials file (~/.aws/credentials).
aws_secret_access_key = "any" # if empty, loads from the shared credentials file (~/.aws/credentials).
region = "us-east-2"
bucket = "volume_bucket" # an existing bucket
endpoint = "http://server2:8333"

14
docker/compose/tls.env Normal file
View file

@ -0,0 +1,14 @@
WEED_GRPC_CA=/etc/seaweedfs/tls/SeaweedFS_CA.crt
WEED_GRPC_ALLOWED_WILDCARD_DOMAIN=".dev"
WEED_GRPC_MASTER_CERT=/etc/seaweedfs/tls/master01.dev.crt
WEED_GRPC_MASTER_KEY=/etc/seaweedfs/tls/master01.dev.key
WEED_GRPC_VOLUME_CERT=/etc/seaweedfs/tls/volume01.dev.crt
WEED_GRPC_VOLUME_KEY=/etc/seaweedfs/tls/volume01.dev.key
WEED_GRPC_FILER_CERT=/etc/seaweedfs/tls/filer01.dev.crt
WEED_GRPC_FILER_KEY=/etc/seaweedfs/tls/filer01.dev.key
WEED_GRPC_CLIENT_CERT=/etc/seaweedfs/tls/client01.dev.crt
WEED_GRPC_CLIENT_KEY=/etc/seaweedfs/tls/client01.dev.key
WEED_GRPC_MASTER_ALLOWED_COMMONNAMES="volume01.dev,master01.dev,filer01.dev,client01.dev"
WEED_GRPC_VOLUME_ALLOWED_COMMONNAMES="volume01.dev,master01.dev,filer01.dev,client01.dev"
WEED_GRPC_FILER_ALLOWED_COMMONNAMES="volume01.dev,master01.dev,filer01.dev,client01.dev"
WEED_GRPC_CLIENT_ALLOWED_COMMONNAMES="volume01.dev,master01.dev,filer01.dev,client01.dev"

View file

@ -60,9 +60,9 @@ case "$1" in
'cronjob')
MASTER=${WEED_MASTER-localhost:9333}
FIX_REPLICATION_CRON_SCHEDULE=${CRON_SCHEDULE-*/7 * * * * *}
echo "$FIX_REPLICATION_CRON_SCHEDULE" 'echo "volume.fix.replication" | weed shell -master='$MASTER > /crontab
echo "$FIX_REPLICATION_CRON_SCHEDULE" 'echo "lock; volume.fix.replication; unlock" | weed shell -master='$MASTER > /crontab
BALANCING_CRON_SCHEDULE=${CRON_SCHEDULE-25 * * * * *}
echo "$BALANCING_CRON_SCHEDULE" 'echo "volume.balance -c ALL -force" | weed shell -master='$MASTER >> /crontab
echo "$BALANCING_CRON_SCHEDULE" 'echo "lock; volume.balance -collection ALL_COLLECTIONS -force; unlock" | weed shell -master='$MASTER >> /crontab
echo "Running Crontab:"
cat /crontab
exec supercronic /crontab

3
go.mod
View file

@ -38,6 +38,7 @@ require (
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.11.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/json-iterator/go v1.1.10
@ -48,7 +49,7 @@ require (
github.com/klauspost/crc32 v1.2.0
github.com/klauspost/reedsolomon v1.9.2
github.com/kurin/blazer v0.5.3
github.com/lib/pq v1.2.0
github.com/lib/pq v1.10.0
github.com/lunixbochs/vtclean v1.0.0 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/mattn/go-colorable v0.1.2 // indirect

2
go.sum
View file

@ -496,6 +496,8 @@ github.com/kurin/blazer v0.5.3/go.mod h1:4FCXMUWo9DllR2Do4TtBd377ezyAJ51vB5uTBjt
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E=
github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lunixbochs/vtclean v1.0.0 h1:xu2sLAri4lGiovBDQKxl5mrXyESr3gUr5m5SM5+LVb8=

View file

@ -29,6 +29,14 @@ please set/update the corresponding affinity rule in values.yaml to an empty one
```affinity: ""```
### PVC - storage class ###
on the volume stateful set added support for K8S PVC, currently example
with the simple local-path-provisioner from Rancher (comes included with k3d / k3s)
https://github.com/rancher/local-path-provisioner
you can use ANY storage class you like, just update the correct storage-class
for your deployment.
### current instances config (AIO):
1 instance for each type (master/filer+s3/volume)

View file

@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
appVersion: "2.27"
version: 2.27
appVersion: "2.31"
version: 2.31

View file

@ -126,3 +126,26 @@ Inject extra environment vars in the format key:value, if populated
{{- printf "%s%s%s:%s" $registryName $repositoryName $name $tag -}}
{{- end -}}
{{- end -}}
{{/* check if any PVC exists */}}
{{- define "volume.pvc_exists" -}}
{{- if or (or (eq .Values.volume.data.type "persistentVolumeClaim") (and (eq .Values.volume.idx.type "persistentVolumeClaim") .Values.volume.dir_idx )) (eq .Values.volume.logs.type "persistentVolumeClaim") -}}
{{- printf "true" -}}
{{- else -}}
{{- printf "false" -}}
{{- end -}}
{{- end -}}
{{/* check if any HostPath exists */}}
{{- define "volume.hostpath_exists" -}}
{{- if or (or (eq .Values.volume.data.type "hostPath") (and (eq .Values.volume.idx.type "hostPath") .Values.volume.dir_idx )) (eq .Values.volume.logs.type "hostPath") -}}
{{- printf "true" -}}
{{- else -}}
{{- if or .Values.global.enableSecurity .Values.volume.extraVolumes -}}
{{- printf "true" -}}
{{- else -}}
{{- printf "false" -}}
{{- end -}}
{{- end -}}
{{- end -}}

View file

@ -40,7 +40,7 @@ spec:
{{ if .Values.volume.dataCenter }} -dataCenter {{ .Values.volume.dataCenter }}{{ end }}\
{{ if .Values.cronjob.collection }} -collection {{ .Values.cronjob.collection }}{{ end }}\n\
{{- if .Values.cronjob.enableFixReplication }}
volume.fix.replication {{ if .Values.cronjob.collectionPattern }} -collectionPattern={{ .Values.cronjob.collectionPattern }} {{ end }} \n\
volume.fix.replication -collectionPattern={{ .Values.cronjob.collectionPattern }} \n\
{{- end }}
unlock\n" | \
/usr/bin/weed shell \

View file

@ -10,6 +10,7 @@ metadata:
monitoring: "true"
{{- end }}
spec:
clusterIP: None
ports:
- name: "swfs-filer"
port: {{ .Values.filer.port }}

View file

@ -45,6 +45,19 @@ spec:
priorityClassName: {{ .Values.volume.priorityClassName | quote }}
{{- end }}
enableServiceLinks: false
{{- if .Values.volume.dir_idx }}
initContainers:
- name: seaweedfs-vol-move-idx
image: {{ template "volume.image" . }}
imagePullPolicy: {{ .Values.global.pullPolicy | default "IfNotPresent" }}
command: [ '/bin/sh', '-c' ]
args: ['if ls {{ .Values.volume.dir }}/*.idx >/dev/null 2>&1; then mv {{ .Values.volume.dir }}/*.idx {{ .Values.volume.dir_idx }}/; fi;']
volumeMounts:
- name: idx
mountPath: {{ .Values.volume.dir_idx }}
- name: data
mountPath: {{ .Values.volume.dir }}
{{- end }}
containers:
- name: seaweedfs
image: {{ template "volume.image" . }}
@ -118,9 +131,13 @@ spec:
-compactionMBps={{ .Values.volume.compactionMBps }} \
-mserver={{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }}
volumeMounts:
- name: seaweedfs-volume-storage
mountPath: "/data/"
- name: seaweedfs-volume-log-volume
- name: data
mountPath: "{{ .Values.volume.dir }}/"
{{- if .Values.volume.dir_idx }}
- name: idx
mountPath: "{{ .Values.volume.dir_idx }}/"
{{- end }}
- name: logs
mountPath: "/logs/"
{{- if .Values.global.enableSecurity }}
- name: security-config
@ -173,15 +190,27 @@ spec:
resources:
{{ tpl .Values.volume.resources . | nindent 12 | trim }}
{{- end }}
{{- $hostpath_exists := include "volume.hostpath_exists" . -}}
{{- if $hostpath_exists }}
volumes:
- name: seaweedfs-volume-log-volume
hostPath:
path: /storage/logs/seaweedfs/volume
type: DirectoryOrCreate
- name: seaweedfs-volume-storage
{{- if eq .Values.volume.data.type "hostPath" }}
- name: data
hostPath:
path: /storage/object_store/
type: DirectoryOrCreate
{{- end }}
{{- if and (eq .Values.volume.idx.type "hostPath") .Values.volume.dir_idx }}
- name: idx
hostPath:
path: /ssd/seaweedfs-volume-idx/
type: DirectoryOrCreate
{{- end }}
{{- if eq .Values.volume.logs.type "hostPath" }}
- name: logs
hostPath:
path: /storage/logs/seaweedfs/volume
type: DirectoryOrCreate
{{- end }}
{{- if .Values.global.enableSecurity }}
- name: security-config
configMap:
@ -205,8 +234,43 @@ spec:
{{- if .Values.volume.extraVolumes }}
{{ tpl .Values.volume.extraVolumes . | indent 8 | trim }}
{{- end }}
{{- end }}
{{- if .Values.volume.nodeSelector }}
nodeSelector:
{{ tpl .Values.volume.nodeSelector . | indent 8 | trim }}
{{- end }}
{{- $pvc_exists := include "volume.pvc_exists" . -}}
{{- if $pvc_exists }}
volumeClaimTemplates:
{{- if eq .Values.volume.data.type "persistentVolumeClaim"}}
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: {{ .Values.volume.data.storageClass }}
resources:
requests:
storage: {{ .Values.volume.data.size }}
{{- end }}
{{- if and (eq .Values.volume.idx.type "persistentVolumeClaim") .Values.volume.dir_idx }}
- metadata:
name: idx
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: {{ .Values.volume.idx.storageClass }}
resources:
requests:
storage: {{ .Values.volume.idx.size }}
{{- end }}
{{- if eq .Values.volume.logs.type "persistentVolumeClaim" }}
- metadata:
name: logs
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: {{ .Values.volume.logs.storageClass }}
resources:
requests:
storage: {{ .Values.volume.logs.size }}
{{- end }}
{{- end }}
{{- end }}

View file

@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
# imageTag: "2.27" - started using {.Chart.appVersion}
# imageTag: "2.31" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
@ -138,6 +138,24 @@ volume:
# minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly
minFreeSpacePercent: 7
# can use ANY storage-class , example with local-path-provisner
# data:
# type: "persistentVolumeClaim"
# size: "24Ti"
# storageClass: "local-path-provisioner"
data:
type: "hostPath"
size: ""
storageClass: ""
idx:
type: "hostPath"
size: ""
storageClass: ""
logs:
type: "hostPath"
size: ""
storageClass: ""
# limit background compaction or copying speed in mega bytes per second
compactionMBps: "50"

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 86 KiB

BIN
note/SeaweedFS_XDR.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<parent>
<groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<parent>
<groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<parent>
<groupId>org.sonatype.oss</groupId>

View file

@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.concurrent.*;
@ -217,7 +218,7 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
bufferToWrite.flip();
((Buffer)bufferToWrite).flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {

View file

@ -11,13 +11,13 @@
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop2-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<scope>compile</scope>
</dependency>
<dependency>

View file

@ -301,7 +301,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<seaweedfs.client.version>1.6.2</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<seaweedfs.client.version>1.6.2</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>

View file

@ -309,7 +309,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<seaweedfs.client.version>1.6.2</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<seaweedfs.client.version>1.6.2</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>

View file

@ -33,3 +33,7 @@ debug_webdav:
debug_s3:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3
debug_filer_copy:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h

View file

@ -41,6 +41,7 @@ type BenchmarkOptions struct {
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
fsync *bool
useTcp *bool
}
var (
@ -67,6 +68,7 @@ func init() {
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
sharedBytes = make([]byte, 1024)
}
@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
volumeTcpClient := wdclient.NewVolumeTcpClient()
for id := range idChan {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if *b.useTcp {
if uploadByTcp(volumeTcpClient, fp) {
fileIdLineChan <- fp.Fid
s.completed++
s.transferred += fileSize
} else {
s.failed++
}
} else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b
}
}
func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool {
err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
if err != nil {
glog.Errorf("upload chunk err: %v", err)
return false
}
return true
}
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {

View file

@ -15,7 +15,9 @@ var Commands = []*Command{
cmdDownload,
cmdExport,
cmdFiler,
cmdFilerBackup,
cmdFilerCat,
cmdFilerMetaBackup,
cmdFilerMetaTail,
cmdFilerReplicate,
cmdFilerSynchronize,

View file

@ -0,0 +1,157 @@
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"io"
"time"
)
type FilerBackupOptions struct {
isActivePassive *bool
filer *string
path *string
debug *bool
proxyByFiler *bool
timeAgo *time.Duration
}
var (
filerBackupOptions FilerBackupOptions
)
func init() {
cmdFilerBackup.Run = runFilerBackup // break init cycle
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}
var cmdFilerBackup = &Command{
UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
`,
}
func runFilerBackup(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
for {
err := doFilerBackup(grpcDialOption, &filerBackupOptions)
if err != nil {
glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
time.Sleep(1747 * time.Millisecond)
}
}
return true
}
const (
BackupKeyPrefix = "backup."
)
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
// find data sink
config := util.GetViper()
dataSink := findSink(config)
if dataSink == nil {
return fmt.Errorf("no data sink configured in replication.toml")
}
sourceFiler := *backupOption.filer
sourcePath := *backupOption.path
timeAgo := *backupOption.timeAgo
targetPath := dataSink.GetSinkToDirectory()
debug := *backupOption.debug
// get start time for the data sink
startFrom := time.Unix(0, 0)
sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
if timeAgo.Milliseconds() == 0 {
lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
if err != nil {
glog.V(0).Infof("starting from %v", startFrom)
} else {
startFrom = time.Unix(0, lastOffsetTsNs)
glog.V(0).Infof("resuming from %v", startFrom)
}
} else {
startFrom = time.Now().Add(-timeAgo)
glog.V(0).Infof("start time is set to %v", startFrom)
}
// create filer sink
filerSource := &source.FilerSource{}
filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler)
dataSink.SetSourceFiler(filerSource)
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "backup_" + dataSink.GetName(),
PathPrefix: sourcePath,
SinceNs: startFrom.UnixNano(),
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
var counter int64
var lastWriteTime time.Time
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
return fmt.Errorf("processEventFn: %v", err)
}
counter++
if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
return fmt.Errorf("setOffset: %v", err)
}
}
}
})
}

View file

@ -0,0 +1,268 @@
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
"google.golang.org/grpc"
"io"
"reflect"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
metaBackup FilerMetaBackupOptions
)
type FilerMetaBackupOptions struct {
grpcDialOption grpc.DialOption
filerAddress *string
filerDirectory *string
restart *bool
backupFilerConfig *string
store filer.FilerStore
}
func init() {
cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
}
var cmdFilerMetaBackup = &Command{
UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
Long: `continuously backup filer meta data changes.
The backup writes to another filer store specified in a backup_filer.toml.
weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
`,
}
func runFilerMetaBackup(cmd *Command, args []string) bool {
metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
// load backup_filer.toml
v := viper.New()
v.SetConfigFile(*metaBackup.backupFilerConfig)
if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
" weed scaffold -config=%s -output=.\n\n\n",
*metaBackup.backupFilerConfig, "backup_filer", "filer")
}
if err := metaBackup.initStore(v); err != nil {
glog.V(0).Infof("init backup filer store: %v", err)
return true
}
missingPreviousBackup := false
_, err := metaBackup.getOffset()
if err != nil {
missingPreviousBackup = true
}
if *metaBackup.restart || missingPreviousBackup {
glog.V(0).Infof("traversing metadata tree...")
startTime := time.Now()
if err := metaBackup.traverseMetadata(); err != nil {
glog.Errorf("traverse meta data: %v", err)
return true
}
glog.V(0).Infof("metadata copied up to %v", startTime)
if err := metaBackup.setOffset(startTime); err != nil {
startTime = time.Now()
}
}
for {
err := metaBackup.streamMetadataBackup()
if err != nil {
glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
time.Sleep(1747 * time.Millisecond)
}
}
return true
}
func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
// load configuration for default filer store
hasDefaultStoreConfigured := false
for _, store := range filer.Stores {
if v.GetBool(store.GetName() + ".enabled") {
store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
if err := store.Initialize(v, store.GetName()+"."); err != nil {
glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
}
glog.V(0).Infof("configured filer store to %s", store.GetName())
hasDefaultStoreConfigured = true
metaBackup.store = filer.NewFilerStoreWrapper(store)
break
}
}
if !hasDefaultStoreConfigured {
return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
}
return nil
}
func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
var saveErr error
traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
println("+", parentPath.Child(entry.Name))
if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
saveErr = fmt.Errorf("insert entry error: %v\n", err)
return
}
})
if traverseErr != nil {
return fmt.Errorf("traverse: %v", traverseErr)
}
return saveErr
}
var (
MetaBackupKey = []byte("metaBackup")
)
func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
startTime, err := metaBackup.getOffset()
if err != nil {
startTime = time.Now()
}
glog.V(0).Infof("streaming from %v", startTime)
store := metaBackup.store
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
ctx := context.Background()
message := resp.EventNotification
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
if message.OldEntry == nil && message.NewEntry != nil {
println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
return store.InsertEntry(ctx, entry)
}
if message.OldEntry != nil && message.NewEntry == nil {
println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
}
if message.OldEntry != nil && message.NewEntry != nil {
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
return store.UpdateEntry(ctx, entry)
}
println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
return err
}
println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
}
return nil
}
tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "meta_backup",
PathPrefix: *metaBackup.filerDirectory,
SinceNs: startTime.UnixNano(),
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
var counter int64
var lastWriteTime time.Time
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err = eachEntryFunc(resp); err != nil {
return err
}
counter++
if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
return err2
}
}
}
})
return tailErr
}
func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
if err != nil {
return
}
tsNs := util.BytesToUint64(value)
return time.Unix(0, int64(tsNs)), nil
}
func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
valueBuf := make([]byte, 8)
util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
return err
}
return nil
}
var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}

View file

@ -23,9 +23,9 @@ func init() {
}
var cmdFilerMetaTail = &Command{
UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
Short: "see recent changes on a filer",
Long: `See recent changes on a filer.
UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
Short: "see continuous changes on a filer",
Long: `See continuous changes on a filer.
weed filer.meta.tail -timeAgo=30h | grep truncate
weed filer.meta.tail -timeAgo=30h | jq .
@ -36,7 +36,7 @@ var cmdFilerMetaTail = &Command{
var (
tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")

View file

@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
}
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
glog.V(0).Infof("Configure sink to %s", sk.GetName())
dataSink = sk
break
}
}
dataSink := findSink(config)
if dataSink == nil {
println("no data sink configured in replication.toml:")
@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
func findSink(config *util.ViperProxy) sink.ReplicationSink {
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
glog.V(0).Infof("Configure sink to %s", sk.GetName())
dataSink = sk
break
}
}
return dataSink
}
func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := ""
for _, input := range sub.NotificationInputs {

View file

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// if first time, start from now
// if has previously synced, resume from that point of time
sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
if err != nil {
return err
}
@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
var sourceOldKey, sourceNewKey util.FullPath
if message.OldEntry != nil {
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
}
if message.NewEntry != nil {
sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
}
for _, sig := range message.Signatures {
if sig == targetFilerSignature && targetFilerSignature != 0 {
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
return nil
}
}
if debug {
fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
}
if !strings.HasPrefix(resp.Directory, sourcePath) {
return nil
}
// handle deletions
if message.OldEntry != nil && message.NewEntry == nil {
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
return nil
}
key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
// handle new entries
if message.OldEntry == nil && message.NewEntry != nil {
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
return nil
}
key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
}
// this is something special?
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
// handle updates
if strings.HasPrefix(string(sourceOldKey), sourcePath) {
// old key is in the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is also in the watched directory
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
return err
}
// not able to find old entry
if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
return fmt.Errorf("delete old entry %v: %v", oldKey, err)
}
// create the new entry
newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
} else {
// new key is outside of the watched directory
key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
} else {
// old key is outside of the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is in the watched directory
key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
} else {
// new key is also outside of the watched directory
// skip
}
}
return nil
return persistEventFn(resp)
}
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
return err
}
}
@ -290,11 +215,11 @@ const (
SyncKeyPrefix = "sync."
)
func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(SyncKeyPrefix + "____")
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
if err != nil {
@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature
}
func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(SyncKeyPrefix + "____")
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
valueBuf := make([]byte, 8)
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur
})
}
func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
// process function
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
var sourceOldKey, sourceNewKey util.FullPath
if message.OldEntry != nil {
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
}
if message.NewEntry != nil {
sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
}
if debug {
glog.V(0).Infof("received %v", resp)
}
if !strings.HasPrefix(resp.Directory, sourcePath) {
return nil
}
// handle deletions
if message.OldEntry != nil && message.NewEntry == nil {
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
// handle new entries
if message.OldEntry == nil && message.NewEntry != nil {
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
}
// this is something special?
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
// handle updates
if strings.HasPrefix(string(sourceOldKey), sourcePath) {
// old key is in the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is also in the watched directory
if !dataSink.IsIncremental() {
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
return err
}
// not able to find old entry
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
return fmt.Errorf("delete old entry %v: %v", oldKey, err)
}
}
// create the new entry
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
} else {
// new key is outside of the watched directory
if !dataSink.IsIncremental() {
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
}
} else {
// old key is outside of the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
} else {
// new key is also outside of the watched directory
// skip
}
}
return nil
}
return processEventFn
}
func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
if !dataSink.IsIncremental() {
return util.Join(targetPath, string(sourceKey)[len(sourcePath):])
}
var mTime int64
if message.NewEntry != nil {
mTime = message.NewEntry.Attributes.Mtime
} else if message.OldEntry != nil {
mTime = message.OldEntry.Attributes.Mtime
}
dateKey := time.Unix(mTime, 0).Format("2006-01-02")
return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
}

View file

@ -138,7 +138,6 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)

View file

@ -53,7 +53,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
filer := *option.filer
// parse filer grpc address
filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer)
filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer)
if err != nil {
glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
return true
@ -63,16 +63,23 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
for i := 0; i < 10; i++ {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1)*time.Second)
}
cipher = resp.Cipher
return nil
})
}
if err != nil {
glog.Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err)
return true
}

View file

@ -63,7 +63,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer)
if err != nil {
glog.Fatal(err)
return false

View file

@ -137,7 +137,7 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer)
filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer)
if err != nil {
glog.Fatal(err)
return false

View file

@ -356,6 +356,9 @@ directory = "/buckets"
[sink.local]
enabled = false
directory = "/data"
# all replicated files are under modified time as yyyy-mm-dd directories
# so each date directory contains all new and updated files.
is_incremental = false
[sink.local_incremental]
# all replicated files are under modified time as yyyy-mm-dd directories
@ -373,6 +376,7 @@ directory = "/backup"
replication = ""
collection = ""
ttlSec = 0
is_incremental = false
[sink.s3]
# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
@ -384,6 +388,7 @@ region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
directory = "/" # destination directory
endpoint = ""
is_incremental = false
[sink.google_cloud_storage]
# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
@ -391,6 +396,7 @@ enabled = false
google_application_credentials = "/path/to/x.json" # path to json credential file
bucket = "your_bucket_seaweedfs" # an existing bucket
directory = "/" # destination directory
is_incremental = false
[sink.azure]
# experimental, let me know if it works
@ -399,6 +405,7 @@ account_name = ""
account_key = ""
container = "mycontainer" # an existing container
directory = "/" # destination directory
is_incremental = false
[sink.backblaze]
enabled = false
@ -406,6 +413,7 @@ b2_account_id = ""
b2_master_application_key = ""
bucket = "mybucket" # an existing bucket
directory = "/" # destination directory
is_incremental = false
`
@ -432,22 +440,28 @@ expires_after_seconds = 10 # seconds
# the host name is not checked, so the PERM files can be shared.
[grpc]
ca = ""
# Set wildcard domain for enable TLS authentication by common names
allowed_wildcard_domain = "" # .mycompany.com
[grpc.volume]
cert = ""
key = ""
allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.master]
cert = ""
key = ""
allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.filer]
cert = ""
key = ""
allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.msg_broker]
cert = ""
key = ""
allowed_commonNames = "" # comma-separated SSL certificate common names
# use this for any place needs a grpc client
# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
@ -455,7 +469,6 @@ key = ""
cert = ""
key = ""
# volume server https options
# Note: work in progress!
# this does not work with other clients, e.g., "weed filer|mount" etc, yet.

View file

@ -111,6 +111,7 @@ func init() {
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files")
serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
@ -156,19 +157,21 @@ func runServer(cmd *Command, args []string) bool {
*isStartingFiler = true
}
_, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
peers := strings.Join(peerList, ",")
masterOptions.peers = &peers
if *isStartingMasterServer {
_, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
peers := strings.Join(peerList, ",")
masterOptions.peers = &peers
}
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
filerOptions.masters = &peers
filerOptions.masters = masterOptions.peers
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
serverOptions.v.masters = &peers
serverOptions.v.masters = masterOptions.peers
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack

View file

@ -62,6 +62,7 @@ type VolumeServerOptions struct {
preStopSeconds *int
metricsHttpPort *int
// pulseSeconds *int
enableTcp *bool
}
func init() {
@ -88,6 +89,7 @@ func init() {
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port")
}
var cmdVolume = &Command{
@ -251,6 +253,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
// starting tcp server
if *v.enableTcp {
go v.startTcpService(volumeServer)
}
// starting the cluster http server
clusterHttpServer := v.startClusterHttpService(volumeMux)
@ -368,3 +375,22 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}()
return clusterHttpServer
}
func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
listener, e := util.NewListener(listeningAddress, 0)
if e != nil {
glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e)
}
defer listener.Close()
for {
c, err := listener.Accept()
if err != nil {
fmt.Println(err)
return
}
go volumeServer.HandleTcpConnection(c)
}
}

View file

@ -78,7 +78,7 @@ func (wo *WebDavOption) startWebDav() bool {
}
// parse filer grpc address
filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer)
filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*wo.filer)
if err != nil {
glog.Fatal(err)
return false

View file

@ -11,6 +11,10 @@ import (
type HardLinkId []byte
const (
MsgFailDelNonEmptyFolder = "fail to delete non-empty folder"
)
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
if p == "/" {
return nil
@ -77,7 +81,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
if lastFileName == "" && !isRecursive && len(entries) > 0 {
// only for first iteration in the loop
glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
return nil, nil, fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath)
}
for _, sub := range entries {

View file

@ -11,6 +11,10 @@ import (
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
f.maybeReloadFilerConfiguration(event)
}
func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) {
if DirectoryEtcSeaweedFS != event.Directory {
if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath {
return
@ -26,7 +30,6 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse)
if entry.Name == FilerConfName {
f.reloadFilerConfiguration(entry)
}
}
func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {

View file

@ -69,6 +69,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
peerSignature, err = ma.readFilerStoreSignature(peer)
}
// when filer store is not shared by multiple filers
if peerSignature != f.Signature {
if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
lastTsNs = prevTsNs

View file

@ -15,7 +15,7 @@ import (
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@ -26,6 +26,9 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
} else if len(urlStrings) == 0 {
glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
}
fileId2Url[chunkView.FileId] = urlStrings
}
@ -39,6 +42,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
glog.Errorf("read chunk: %v", err)
return fmt.Errorf("read chunk: %v", err)
}
_, err = w.Write(data)
if err != nil {
glog.Errorf("write chunk: %v", err)
@ -181,7 +185,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {

View file

@ -251,10 +251,10 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
dirPath := util.FullPath(dir.FullPath())
glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
fullFilePath := dirPath.Child(req.Name)
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
if visitErr != nil {
glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
@ -305,7 +305,8 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath())
dirPath := util.FullPath(dir.FullPath())
glog.V(4).Infof("dir ReadDirAll %s", dirPath)
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
@ -318,12 +319,11 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
return nil
}
dirPath := util.FullPath(dir.FullPath())
if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fuse.EIO
}
listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
processEachEntryFn(entry.ToProtoEntry(), false)
return true
})
@ -389,12 +389,12 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
// clear entry inside the file
fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath)
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
if fsNode != nil {
if file, ok := fsNode.(*File); ok {
file.clearEntry()
}
}
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
// remove current file handle if any
dir.wfs.handlesLock.Lock()

View file

@ -35,15 +35,20 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
return nil, err
}
// update old file to hardlink mode
if len(oldFile.entry.HardLinkId) == 0 {
oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
oldFile.entry.HardLinkCounter = 1
oldEntry := oldFile.getEntry()
if oldEntry == nil {
return nil, fuse.EIO
}
oldFile.entry.HardLinkCounter++
// update old file to hardlink mode
if len(oldEntry.HardLinkId) == 0 {
oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
oldEntry.HardLinkCounter = 1
}
oldEntry.HardLinkCounter++
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: oldFile.dir.FullPath(),
Entry: oldFile.entry,
Entry: oldEntry,
Signatures: []int32{dir.wfs.signature},
}
@ -53,11 +58,11 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
Attributes: oldFile.entry.Attributes,
Chunks: oldFile.entry.Chunks,
Extended: oldFile.entry.Extended,
HardLinkId: oldFile.entry.HardLinkId,
HardLinkCounter: oldFile.entry.HardLinkCounter,
Attributes: oldEntry.Attributes,
Chunks: oldEntry.Chunks,
Extended: oldEntry.Extended,
HardLinkId: oldEntry.HardLinkId,
HardLinkCounter: oldEntry.HardLinkCounter,
},
Signatures: []int32{dir.wfs.signature},
}
@ -83,6 +88,10 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
return nil
})
if err != nil {
return nil, fuse.EIO
}
// create new file node
newNode := dir.newFile(req.NewName, request.Entry)
newFile := newNode.(*File)

View file

@ -30,7 +30,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
@ -69,7 +69,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
return false
}
fileSize := int64(pages.f.entry.Attributes.FileSize)
entry := pages.f.getEntry()
if entry == nil {
return false
}
fileSize := int64(entry.Attributes.FileSize)
chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
if chunkSize == 0 {

View file

@ -5,6 +5,7 @@ import (
"io"
"os"
"sort"
"sync"
"time"
"github.com/seaweedfs/fuse"
@ -33,6 +34,7 @@ type File struct {
dir *Dir
wfs *WFS
entry *filer_pb.Entry
entryLock sync.RWMutex
entryViewCache []filer.VisibleInterval
isOpen int
reader io.ReaderAt
@ -47,7 +49,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
entry := file.entry
entry := file.getEntry()
if file.isOpen <= 0 || entry == nil {
if entry, err = file.maybeLoadEntry(ctx); err != nil {
return err
@ -106,7 +108,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req)
_, err := file.maybeLoadEntry(ctx)
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
@ -123,12 +125,12 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
if req.Valid.Size() {
glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks))
if req.Size < filer.FileSize(file.entry) {
glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
if req.Size < filer.FileSize(entry) {
// fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk
var truncatedChunks []*filer_pb.FileChunk
for _, chunk := range file.entry.Chunks {
for _, chunk := range entry.Chunks {
int64Size := int64(chunk.Size)
if chunk.Offset+int64Size > int64(req.Size) {
// this chunk is truncated
@ -143,36 +145,36 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
}
}
file.entry.Chunks = chunks
entry.Chunks = chunks
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
file.reader = nil
}
file.entry.Attributes.FileSize = req.Size
entry.Attributes.FileSize = req.Size
file.dirtyMetadata = true
}
if req.Valid.Mode() {
file.entry.Attributes.FileMode = uint32(req.Mode)
entry.Attributes.FileMode = uint32(req.Mode)
file.dirtyMetadata = true
}
if req.Valid.Uid() {
file.entry.Attributes.Uid = req.Uid
entry.Attributes.Uid = req.Uid
file.dirtyMetadata = true
}
if req.Valid.Gid() {
file.entry.Attributes.Gid = req.Gid
entry.Attributes.Gid = req.Gid
file.dirtyMetadata = true
}
if req.Valid.Crtime() {
file.entry.Attributes.Crtime = req.Crtime.Unix()
entry.Attributes.Crtime = req.Crtime.Unix()
file.dirtyMetadata = true
}
if req.Valid.Mtime() {
file.entry.Attributes.Mtime = req.Mtime.Unix()
entry.Attributes.Mtime = req.Mtime.Unix()
file.dirtyMetadata = true
}
@ -188,7 +190,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
return nil
}
return file.saveEntry(file.entry)
return file.saveEntry(entry)
}
@ -258,7 +260,7 @@ func (file *File) Forget() {
}
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
entry = file.entry
entry = file.getEntry()
if file.isOpen > 0 {
return entry, nil
}
@ -299,8 +301,13 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
}
}
entry := file.getEntry()
if entry == nil {
return
}
// pick out-of-order chunks from existing chunks
for _, chunk := range file.entry.Chunks {
for _, chunk := range entry.Chunks {
if lessThan(earliestChunk, chunk) {
chunks = append(chunks, chunk)
}
@ -318,18 +325,22 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
file.reader = nil
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
file.entry.Chunks = append(file.entry.Chunks, newChunks...)
entry.Chunks = append(entry.Chunks, newChunks...)
}
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entryLock.Lock()
defer file.entryLock.Unlock()
file.entry = entry
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
file.reader = nil
}
func (file *File) clearEntry() {
file.entryLock.Lock()
defer file.entryLock.Unlock()
file.entry = nil
file.entryViewCache = nil
file.reader = nil
@ -359,3 +370,9 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
return nil
})
}
func (file *File) getEntry() *filer_pb.Entry {
file.entryLock.RLock()
defer file.entryLock.RUnlock()
return file.entry
}

View file

@ -40,8 +40,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
Uid: uid,
Gid: gid,
}
if fh.f.entry != nil {
fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry)
entry := fh.f.getEntry()
if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
}
return fh
@ -104,22 +105,28 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxSto
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fileSize := int64(filer.FileSize(fh.f.entry))
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fh.f.fullpath())
entry := fh.f.getEntry()
if entry == nil {
return 0, io.EOF
}
if offset+int64(len(buff)) <= int64(len(fh.f.entry.Content)) {
totalRead := copy(buff, fh.f.entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead)
fileSize := int64(filer.FileSize(entry))
fileFullPath := fh.f.fullpath()
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fileFullPath)
return 0, io.EOF
}
if offset+int64(len(buff)) <= int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
}
var chunkResolveErr error
if fh.f.entryViewCache == nil {
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
@ -136,10 +143,10 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
totalRead, err := reader.ReadAt(buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
glog.Errorf("file handle read %s: %v", fileFullPath, err)
}
glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
@ -158,8 +165,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
copy(data, req.Data)
}
fh.f.entry.Content = nil
fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
entry := fh.f.getEntry()
if entry == nil {
return fuse.EIO
}
entry.Content = nil
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data)
@ -242,35 +254,40 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
if fh.f.entry.Attributes.Uid == 0 {
fh.f.entry.Attributes.Uid = header.Uid
entry := fh.f.getEntry()
if entry == nil {
return nil
}
if entry.Attributes != nil {
entry.Attributes.Mime = fh.contentType
if entry.Attributes.Uid == 0 {
entry.Attributes.Uid = header.Uid
}
if fh.f.entry.Attributes.Gid == 0 {
fh.f.entry.Attributes.Gid = header.Gid
if entry.Attributes.Gid == 0 {
entry.Attributes.Gid = header.Gid
}
if fh.f.entry.Attributes.Crtime == 0 {
fh.f.entry.Attributes.Crtime = time.Now().Unix()
if entry.Attributes.Crtime == 0 {
entry.Attributes.Crtime = time.Now().Unix()
}
fh.f.entry.Attributes.Mtime = time.Now().Unix()
fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
entry.Attributes.Collection = fh.dirtyPages.collection
entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
Directory: fh.f.dir.FullPath(),
Entry: fh.f.entry,
Entry: entry,
Signatures: []int32{fh.f.wfs.signature},
}
glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
for i, chunk := range fh.f.entry.Chunks {
glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks))
for i, chunk := range entry.Chunks {
glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
@ -278,7 +295,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
fh.f.entry.Chunks = append(chunks, manifestChunks...)
entry.Chunks = append(chunks, manifestChunks...)
fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)

View file

@ -124,8 +124,9 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
}
if f, ok := src.node.(*File); ok {
f.Name = target.name
if f.entry != nil {
f.entry.Name = f.Name
entry := f.getEntry()
if entry != nil {
entry.Name = f.Name
}
}
parent.disconnectChild(target)

View file

@ -31,6 +31,7 @@ type UploadResult struct {
Mime string `json:"mime,omitempty"`
Gzip uint32 `json:"gzip,omitempty"`
ContentMd5 string `json:"contentMd5,omitempty"`
RetryCount int `json:"-"`
}
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
@ -96,6 +97,7 @@ func retriedUploadData(uploadUrl string, filename string, cipher bool, data []by
for i := 0; i < 3; i++ {
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
if err == nil {
uploadResult.RetryCount = i
return
} else {
glog.Warningf("uploading to %s: %v", uploadUrl, err)

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"strings"
"time"
@ -101,12 +102,16 @@ func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string,
}
func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
// Redundancy limit to make it correctly judge whether it is the last file.
redLimit := limit
if limit != math.MaxInt32 && limit != 0{
redLimit = limit + 1
}
request := &ListEntriesRequest{
Directory: string(fullDirPath),
Prefix: prefix,
StartFromFileName: startFrom,
Limit: limit,
Limit: redLimit,
InclusiveStartFrom: inclusive,
}
@ -119,6 +124,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
}
var prevEntry *Entry
count := 0
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
@ -139,6 +145,10 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
}
}
prevEntry = resp.Entry
count++
if count > int(limit) && limit != 0 {
prevEntry = nil
}
}
return nil

View file

@ -3,6 +3,7 @@ package pb
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"net/http"
"strconv"
"strings"
@ -108,51 +109,55 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts
}
func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
colonIndex := strings.LastIndex(server, ":")
if colonIndex < 0 {
return "", fmt.Errorf("server should have hostname:port format: %v", server)
}
return ParseServerAddress(server, 10000)
}
port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64)
func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
host, port, parseErr := hostAndPort(server)
if parseErr != nil {
return "", fmt.Errorf("server port parse error: %v", parseErr)
}
grpcPort := int(port) + 10000
newPort := int(port) + deltaPort
return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil
return fmt.Sprintf("%s:%d", host, newPort), nil
}
func hostAndPort(address string) (host string, port uint64, err error) {
colonIndex := strings.LastIndex(address, ":")
if colonIndex < 0 {
return "", 0, fmt.Errorf("server should have hostname:port format: %v", address)
}
port, err = strconv.ParseUint(address[colonIndex+1:], 10, 64)
if err != nil {
return "", 0, fmt.Errorf("server port parse error: %v", err)
}
return address[:colonIndex], port, err
}
func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
hostnameAndPort := strings.Split(server, ":")
if len(hostnameAndPort) != 2 {
return fmt.Sprintf("unexpected server address: %s", server)
}
port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
host, port, parseErr := hostAndPort(server)
if parseErr != nil {
return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
glog.Fatalf("server address %s parse error: %v", server, parseErr)
}
grpcPort := int(port) + 10000
return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
return fmt.Sprintf("%s:%d", host, grpcPort)
}
func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
hostnameAndPort := strings.Split(grpcAddress, ":")
if len(hostnameAndPort) != 2 {
return fmt.Sprintf("unexpected grpcAddress: %s", grpcAddress)
}
grpcPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
host, grpcPort, parseErr := hostAndPort(grpcAddress)
if parseErr != nil {
return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
glog.Fatalf("server grpc address %s parse error: %v", grpcAddress, parseErr)
}
port := int(grpcPort) - 10000
return fmt.Sprintf("%s:%d", hostnameAndPort[0], port)
return fmt.Sprintf("%s:%d", host, port)
}
func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
@ -197,19 +202,3 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption
}, filerGrpcAddress, grpcDialOption)
}
func ParseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
hostnameAndPort := strings.Split(filer, ":")
if len(hostnameAndPort) != 2 {
return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
}
filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
if parseErr != nil {
return "", fmt.Errorf("filer port parse error: %v", parseErr)
}
filerGrpcPort := int(filerPort) + 10000
return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
}

View file

@ -15,40 +15,49 @@ import (
)
// MaybeLoadVolumeInfo load the file data as *volume_server_pb.VolumeInfo, the returned volumeInfo will not be nil
func MaybeLoadVolumeInfo(fileName string) (*volume_server_pb.VolumeInfo, bool, error) {
func MaybeLoadVolumeInfo(fileName string) (volumeInfo *volume_server_pb.VolumeInfo, hasRemoteFile bool, hasVolumeInfoFile bool, err error) {
volumeInfo := &volume_server_pb.VolumeInfo{}
volumeInfo = &volume_server_pb.VolumeInfo{}
glog.V(1).Infof("maybeLoadVolumeInfo checks %s", fileName)
if exists, canRead, _, _, _ := util.CheckFile(fileName); !exists || !canRead {
if !exists {
return volumeInfo, false, nil
return
}
hasVolumeInfoFile = true
if !canRead {
glog.Warningf("can not read %s", fileName)
return volumeInfo, false, fmt.Errorf("can not read %s", fileName)
err = fmt.Errorf("can not read %s", fileName)
return
}
return volumeInfo, false, nil
return
}
hasVolumeInfoFile = true
glog.V(1).Infof("maybeLoadVolumeInfo reads %s", fileName)
tierData, readErr := ioutil.ReadFile(fileName)
if readErr != nil {
glog.Warningf("fail to read %s : %v", fileName, readErr)
return volumeInfo, false, fmt.Errorf("fail to read %s : %v", fileName, readErr)
err = fmt.Errorf("fail to read %s : %v", fileName, readErr)
return
}
glog.V(1).Infof("maybeLoadVolumeInfo Unmarshal volume info %v", fileName)
if err := jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil {
if err = jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil {
glog.Warningf("unmarshal error: %v", err)
return volumeInfo, false, fmt.Errorf("unmarshal error: %v", err)
err = fmt.Errorf("unmarshal error: %v", err)
return
}
if len(volumeInfo.GetFiles()) == 0 {
return volumeInfo, false, nil
return
}
return volumeInfo, true, nil
hasRemoteFile = true
return
}
func SaveVolumeInfo(fileName string, volumeInfo *volume_server_pb.VolumeInfo) error {

View file

@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {

View file

@ -42,7 +42,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
return nil
}
var dateKey string
if r.sink.GetName() == "local_incremental" {
if r.sink.IsIncremental() {
var mTime int64
if message.NewEntry != nil {
mTime = message.NewEntry.Attributes.Mtime

View file

@ -18,10 +18,11 @@ import (
)
type AzureSink struct {
containerURL azblob.ContainerURL
container string
dir string
filerSource *source.FilerSource
containerURL azblob.ContainerURL
container string
dir string
filerSource *source.FilerSource
isIncremental bool
}
func init() {
@ -36,7 +37,12 @@ func (g *AzureSink) GetSinkToDirectory() string {
return g.dir
}
func (g *AzureSink) IsIncremental() bool {
return g.isIncremental
}
func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
g.isIncremental = configuration.GetBool(prefix + "is_incremental")
return g.initialize(
configuration.GetString(prefix+"account_name"),
configuration.GetString(prefix+"account_key"),

View file

@ -14,10 +14,11 @@ import (
)
type B2Sink struct {
client *b2.Client
bucket string
dir string
filerSource *source.FilerSource
client *b2.Client
bucket string
dir string
filerSource *source.FilerSource
isIncremental bool
}
func init() {
@ -32,7 +33,12 @@ func (g *B2Sink) GetSinkToDirectory() string {
return g.dir
}
func (g *B2Sink) IsIncremental() bool {
return g.isIncremental
}
func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
g.isIncremental = configuration.GetBool(prefix + "is_incremental")
return g.initialize(
configuration.GetString(prefix+"b2_account_id"),
configuration.GetString(prefix+"b2_master_application_key"),

View file

@ -30,6 +30,7 @@ type FilerSink struct {
grpcDialOption grpc.DialOption
address string
writeChunkByFiler bool
isIncremental bool
}
func init() {
@ -44,7 +45,12 @@ func (fs *FilerSink) GetSinkToDirectory() string {
return fs.dir
}
func (fs *FilerSink) IsIncremental() bool {
return fs.isIncremental
}
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),

View file

@ -18,10 +18,11 @@ import (
)
type GcsSink struct {
client *storage.Client
bucket string
dir string
filerSource *source.FilerSource
client *storage.Client
bucket string
dir string
filerSource *source.FilerSource
isIncremental bool
}
func init() {
@ -36,7 +37,12 @@ func (g *GcsSink) GetSinkToDirectory() string {
return g.dir
}
func (g *GcsSink) IsIncremental() bool {
return g.isIncremental
}
func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error {
g.isIncremental = configuration.GetBool(prefix + "is_incremental")
return g.initialize(
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"bucket"),

View file

@ -50,6 +50,10 @@ func (localsink *LocalSink) GetSinkToDirectory() string {
return localsink.Dir
}
func (localsink *LocalSink) IsIncremental() bool {
return true
}
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
if localsink.isMultiPartEntry(key) {
return nil
@ -74,13 +78,13 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
if _, err := os.Stat(dir); os.IsNotExist(err) {
glog.V(4).Infof("Create Direcotry key: %s", dir)
if err = os.MkdirAll(dir, 0); err != nil {
if err = os.MkdirAll(dir, 0755); err != nil {
return err
}
}
writeFunc := func(data []byte) error {
writeErr := ioutil.WriteFile(key, data, 0)
writeErr := ioutil.WriteFile(key, data, 0755)
return writeErr
}

View file

@ -14,6 +14,7 @@ type ReplicationSink interface {
UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error)
GetSinkToDirectory() string
SetSourceFiler(s *source.FilerSource)
IsIncremental() bool
}
var (

View file

@ -21,12 +21,13 @@ import (
)
type S3Sink struct {
conn s3iface.S3API
region string
bucket string
dir string
endpoint string
filerSource *source.FilerSource
conn s3iface.S3API
region string
bucket string
dir string
endpoint string
filerSource *source.FilerSource
isIncremental bool
}
func init() {
@ -41,11 +42,17 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
return s3sink.dir
}
func (s3sink *S3Sink) IsIncremental() bool {
return s3sink.isIncremental
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental"))
s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental")
return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
@ -67,8 +74,9 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc
s3sink.endpoint = endpoint
config := &aws.Config{
Region: aws.String(s3sink.region),
Endpoint: aws.String(s3sink.endpoint),
Region: aws.String(s3sink.region),
Endpoint: aws.String(s3sink.endpoint),
S3ForcePathStyle: aws.Bool(true),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
@ -104,7 +112,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
uploadId, err := s3sink.createMultipartUpload(key, entry)
if err != nil {
return err
return fmt.Errorf("createMultipartUpload: %v", err)
}
totalSize := filer.FileSize(entry)
@ -120,6 +128,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
defer wg.Done()
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil {
err = uploadErr
glog.Errorf("uploadPart: %v", uploadErr)
} else {
parts[index] = part
}
@ -129,7 +138,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
if err != nil {
s3sink.abortMultipartUpload(key, uploadId)
return err
return fmt.Errorf("uploadPart: %v", err)
}
return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts)

View file

@ -24,7 +24,7 @@ func (s3sink *S3Sink) deleteObject(key string) error {
result, err := s3sink.conn.DeleteObject(input)
if err == nil {
glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
} else {
glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
}
@ -43,7 +43,7 @@ func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (
result, err := s3sink.conn.CreateMultipartUpload(input)
if err == nil {
glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
glog.V(2).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
return "", err
@ -94,12 +94,13 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId
result, err := s3sink.conn.CompleteMultipartUpload(input)
if err == nil {
glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
glog.V(2).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
}
return err
return nil
}
// To upload a part
@ -122,7 +123,7 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.
result, err := s3sink.conn.UploadPart(input)
if err == nil {
glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
glog.V(2).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
} else {
glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
}
@ -163,7 +164,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er
}
buf := make([]byte, chunk.Size)
for _, fileUrl := range fileUrls {
_, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf)
_, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf)
if err != nil {
glog.V(1).Infof("read from %s: %v", fileUrl, err)
} else {

View file

@ -83,8 +83,12 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
for _, loc := range locations.Locations {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part))
if !fs.proxyByFiler {
for _, loc := range locations.Locations {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part))
}
} else {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part))
}
return

View file

@ -31,6 +31,10 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl
return nil
}, startFrom, inclusive, limit)
if len(entries) == 0 {
isLast = true
}
return
}

View file

@ -51,7 +51,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
var buckets []*s3.Bucket
for _, entry := range entries {
if entry.IsDirectory {
if identity != nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) {
if identity != nil && !identity.canDo(s3_constants.ACTION_LIST, entry.Name) {
continue
}
buckets = append(buckets, &s3.Bucket{

View file

@ -5,9 +5,11 @@ import (
"encoding/json"
"encoding/xml"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"io"
"io/ioutil"
"net/http"
"net/url"
"sort"
"strings"
@ -69,7 +71,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
} else {
uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
@ -84,6 +86,14 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeSuccessResponseEmpty(w)
}
func urlPathEscape(object string) string {
var escapedParts []string
for _, part := range strings.Split(object, "/") {
escapedParts = append(escapedParts, url.PathEscape(part))
}
return strings.Join(escapedParts, "/")
}
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r)
@ -94,7 +104,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
destUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
@ -105,7 +115,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
bucket, object := getBucketAndObject(r)
destUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
@ -116,7 +126,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
bucket, object := getBucketAndObject(r)
destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true",
s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) {
for k, v := range proxyResponse.Header {
@ -196,6 +206,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
if err == nil {
directoriesWithDeletion[parentDirectoryPath]++
deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
} else {
delete(directoriesWithDeletion, parentDirectoryPath)
deleteErrors = append(deleteErrors, DeleteError{

View file

@ -110,7 +110,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
}
}
uploadUrl := fmt.Sprintf("http://%s%s/%s/%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody)

View file

@ -1,10 +1,16 @@
package security
import (
"context"
"crypto/tls"
"crypto/x509"
"github.com/chrislusf/seaweedfs/weed/util"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"io/ioutil"
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -12,21 +18,29 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
)
func LoadServerTLS(config *util.ViperProxy, component string) grpc.ServerOption {
type Authenticator struct {
AllowedWildcardDomain string
AllowedCommonNames map[string]bool
}
func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption, grpc.ServerOption) {
if config == nil {
return nil
return nil, nil
}
// load cert/key, ca cert
cert, err := tls.LoadX509KeyPair(config.GetString(component+".cert"), config.GetString(component+".key"))
if err != nil {
glog.V(1).Infof("load cert/key error: %v", err)
return nil
glog.V(1).Infof("load cert: %s / key: %s error: %v",
config.GetString(component+".cert"),
config.GetString(component+".key"),
err)
return nil, nil
}
caCert, err := ioutil.ReadFile(config.GetString("grpc.ca"))
if err != nil {
glog.V(1).Infof("read ca cert file error: %v", err)
return nil
glog.V(1).Infof("read ca cert file %s error: %v", config.GetString("grpc.ca"), err)
return nil, nil
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
@ -36,7 +50,20 @@ func LoadServerTLS(config *util.ViperProxy, component string) grpc.ServerOption
ClientAuth: tls.RequireAndVerifyClientCert,
})
return grpc.Creds(ta)
allowedCommonNames := config.GetString(component + ".allowed_commonNames")
allowedWildcardDomain := config.GetString("grpc.allowed_wildcard_domain")
if allowedCommonNames != "" || allowedWildcardDomain != "" {
allowedCommonNamesMap := make(map[string]bool)
for _, s := range strings.Split(allowedCommonNames, ",") {
allowedCommonNamesMap[s] = true
}
auther := Authenticator{
AllowedCommonNames: allowedCommonNamesMap,
AllowedWildcardDomain: allowedWildcardDomain,
}
return grpc.Creds(ta), grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(auther.Authenticate))
}
return grpc.Creds(ta), nil
}
func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption {
@ -70,3 +97,28 @@ func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption {
})
return grpc.WithTransportCredentials(ta)
}
func (a Authenticator) Authenticate(ctx context.Context) (newCtx context.Context, err error) {
p, ok := peer.FromContext(ctx)
if !ok {
return ctx, status.Error(codes.Unauthenticated, "no peer found")
}
tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo)
if !ok {
return ctx, status.Error(codes.Unauthenticated, "unexpected peer transport credentials")
}
if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
return ctx, status.Error(codes.Unauthenticated, "could not verify peer certificate")
}
commonName := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName
if a.AllowedWildcardDomain != "" && strings.HasSuffix(commonName, a.AllowedWildcardDomain) {
return ctx, nil
}
if _, ok := a.AllowedCommonNames[commonName]; ok {
return ctx, nil
}
return ctx, status.Errorf(codes.Unauthenticated, "invalid subject common name: %s", commonName)
}

View file

@ -153,7 +153,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) checkWithMaster() {
for _, master := range fs.option.Masters {
_, err := pb.ParseFilerGrpcAddress(master)
_, err := pb.ParseServerToGrpcAddress(master)
if err != nil {
glog.Fatalf("invalid master address %s: %v", master, err)
}

View file

@ -61,15 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
if len(entry.Chunks) == 0 && len(entry.Content) == 0 {
glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
w.WriteHeader(http.StatusNoContent)
return
}
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
// mime type
mimeType := entry.Attr.Mime
@ -164,6 +156,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
if err != nil {
glog.Errorf("failed to write entry content: %v", err)
}
return err
}
return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)

View file

@ -38,10 +38,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
chunkSize := 1024 * 1024 * maxMB
stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
stats.FilerRequestCounter.WithLabelValues("chunk").Inc()
start := time.Now()
defer func() {
stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds())
}()
var reply *FilerPostResult
@ -302,13 +302,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
start := time.Now()
defer func() {
stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
}()
uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
}
return uploadResult, err, data
}

View file

@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("no free volumes left for "+option.String())
return nil, fmt.Errorf("no free volumes left for " + option.String())
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{
TotalSize: uint64(totalSize),
TotalSize: stats.TotalSize,
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}

View file

@ -0,0 +1,137 @@
package weed_server
import (
"bufio"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"net"
"strings"
)
func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
defer c.Close()
glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
bufReader := bufio.NewReaderSize(c, 1024*1024)
bufWriter := bufio.NewWriterSize(c, 1024*1024)
for {
cmd, err := bufReader.ReadString('\n')
if err != nil {
if err != io.EOF {
glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
}
return
}
cmd = cmd[:len(cmd)-1]
switch cmd[0] {
case '+':
fileId := cmd[1:]
err = vs.handleTcpPut(fileId, bufReader)
if err == nil {
bufWriter.Write([]byte("+OK\n"))
} else {
bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
}
case '-':
fileId := cmd[1:]
err = vs.handleTcpDelete(fileId)
if err == nil {
bufWriter.Write([]byte("+OK\n"))
} else {
bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
}
case '?':
fileId := cmd[1:]
err = vs.handleTcpGet(fileId, bufWriter)
case '!':
bufWriter.Flush()
}
}
}
func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) {
volumeId, n, err2 := vs.parseFileId(fileId)
if err2 != nil {
return err2
}
volume := vs.store.GetVolume(volumeId)
if volume == nil {
return fmt.Errorf("volume %d not found", volumeId)
}
err = volume.StreamRead(n, writer)
if err != nil {
return err
}
return nil
}
func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) {
volumeId, n, err2 := vs.parseFileId(fileId)
if err2 != nil {
return err2
}
volume := vs.store.GetVolume(volumeId)
if volume == nil {
return fmt.Errorf("volume %d not found", volumeId)
}
sizeBuf := make([]byte, 4)
if _, err = bufReader.Read(sizeBuf); err != nil {
return err
}
dataSize := util.BytesToUint32(sizeBuf)
err = volume.StreamWrite(n, bufReader, dataSize)
if err != nil {
return err
}
return nil
}
func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) {
volumeId, n, err2 := vs.parseFileId(fileId)
if err2 != nil {
return err2
}
_, err = vs.store.DeleteVolumeNeedle(volumeId, n)
if err != nil {
return err
}
return nil
}
func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) {
commaIndex := strings.LastIndex(fileId, ",")
if commaIndex <= 0 {
return 0, nil, fmt.Errorf("unknown fileId %s", fileId)
}
vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:]
volumeId, ve := needle.NewVolumeId(vid)
if ve != nil {
return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId)
}
n := new(needle.Needle)
n.ParsePath(fid)
return volumeId, n, nil
}

View file

@ -274,7 +274,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
fmt.Printf("collect volumes quiet for: %d seconds\n", quietSeconds)
vidMap := make(map[uint32]bool)
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {

View file

@ -0,0 +1,92 @@
package shell
import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"math"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func init() {
Commands = append(Commands, &commandS3CleanUploads{})
}
type commandS3CleanUploads struct {
}
func (c *commandS3CleanUploads) Name() string {
return "s3.clean.uploads"
}
func (c *commandS3CleanUploads) Help() string {
return `clean up stale multipart uploads
Example:
s3.clean.uploads -replication 001
`
}
func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")
if err = bucketCommand.Parse(args); err != nil {
return nil
}
var filerBucketsPath string
filerBucketsPath, err = readFilerBucketsPath(commandEnv)
if err != nil {
return fmt.Errorf("read buckets: %v", err)
}
var buckets []string
err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
buckets = append(buckets, entry.Name)
return nil
}, "", false, math.MaxUint32)
if err != nil {
return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
}
for _, bucket:= range buckets {
c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo)
}
return err
}
func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error {
uploadsDir := filerBucketsPath+"/"+bucket+"/.uploads"
var staleUploads []string
now := time.Now()
err := filer_pb.List(commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error {
ctime := time.Unix(entry.Attributes.Crtime, 0)
if ctime.Add(timeAgo).Before(now) {
staleUploads = append(staleUploads, entry.Name)
}
return nil
}, "", false, math.MaxUint32)
if err != nil {
return fmt.Errorf("list uploads under %v: %v", uploadsDir, err)
}
for _, staleUpload:= range staleUploads {
deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true",commandEnv.option.FilerHost, commandEnv.option.FilerPort,uploadsDir, staleUpload)
fmt.Fprintf(writer, "purge %s\n", deleteUrl)
err = util.Delete(deleteUrl, "")
if err != nil {
return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
}
}
return nil
}

View file

@ -102,7 +102,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin
keepDataNodesSorted(allLocations, toDiskType)
fn := capacityByFreeVolumeCount(toDiskType)
for _, dst := range allLocations {
if fn(dst.dataNode) > 0 {
if fn(dst.dataNode) > 0 && !hasFoundTarget {
// ask the volume server to replicate the volume
if isOneOf(dst.dataNode.Id, locations) {
continue

View file

@ -58,6 +58,9 @@ func LoadConfiguration(config *util.ViperProxy) {
if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") {
continue
}
if _, found := BackendStorages[backendTypeName+"."+backendStorageId]; found {
continue
}
backendStorage, buildErr := backendStorageFactory.BuildStorage(config,
StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId)
if buildErr != nil {
@ -81,6 +84,9 @@ func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) {
glog.Warningf("storage type %s not found", storageBackend.Type)
continue
}
if _, found := BackendStorages[storageBackend.Type+"."+storageBackend.Id]; found {
continue
}
backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id)
if buildErr != nil {
glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id)

View file

@ -52,7 +52,7 @@ func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
return
}
func (df *DiskFile) Append(p []byte) (n int, err error) {
func (df *DiskFile) Write(p []byte) (n int, err error) {
return df.WriteAt(p, df.fileSize)
}

View file

@ -34,8 +34,9 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region, endpoint string)
}
config := &aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Region: aws.String(region),
Endpoint: aws.String(endpoint),
S3ForcePathStyle: aws.Bool(true),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")

View file

@ -63,7 +63,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
// read volume info
ev.Version = needle.Version3
if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
} else {
pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})

View file

@ -2,6 +2,7 @@ package needle
import (
"fmt"
"io"
"github.com/klauspost/crc32"
@ -29,3 +30,25 @@ func (n *Needle) Etag() string {
util.Uint32toBytes(bits, uint32(n.Checksum))
return fmt.Sprintf("%x", bits)
}
func NewCRCwriter(w io.Writer) *CRCwriter {
return &CRCwriter{
crc: CRC(0),
w: w,
}
}
type CRCwriter struct {
crc CRC
w io.Writer
}
func (c *CRCwriter) Write(p []byte) (n int, err error) {
n, err = c.w.Write(p) // with each write ...
c.crc = c.crc.Update(p)
return
}
func (c *CRCwriter) Sum() uint32 { return c.crc.Value() } // final hash

View file

@ -168,7 +168,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi
}
if err != nil {
fileSize, _, _ := r.GetStat()
println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
println("n", n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
}
return dataSlice, err

View file

@ -152,8 +152,10 @@ func (m *LevelDbNeedleMap) Close() {
glog.Warningf("close index file %s failed: %v", indexFileName, err)
}
if err := m.db.Close(); err != nil {
glog.Warningf("close levelDB failed: %v", err)
if m.db != nil {
if err := m.db.Close(); err != nil {
glog.Warningf("close levelDB failed: %v", err)
}
}
}

View file

@ -94,8 +94,12 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
}
func (m *SortedFileNeedleMap) Close() {
m.indexFile.Close()
m.dbFile.Close()
if m.indexFile != nil {
m.indexFile.Close()
}
if m.dbFile != nil {
m.dbFile.Close()
}
}
func (m *SortedFileNeedleMap) Destroy() error {

View file

@ -220,20 +220,30 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
if maxFileKey < curMaxFileKey {
maxFileKey = curMaxFileKey
}
deleteVolume := false
if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, volumeMessage)
} else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id)
deleteVolume = true
} else {
glog.V(0).Infof("volume %d is expired", v.Id)
}
if v.lastIoError != nil {
deleteVids = append(deleteVids, v.Id)
deleteVolume = true
glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
}
}
collectionVolumeSize[v.Collection] += volumeMessage.Size
if _, exist := collectionVolumeSize[v.Collection]; !exist {
collectionVolumeSize[v.Collection] = 0
}
if !deleteVolume {
collectionVolumeSize[v.Collection] += volumeMessage.Size
}
if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{
"IsReadOnly": 0,
@ -242,7 +252,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
"isDiskSpaceLow": 0,
}
}
if v.IsReadOnly() {
if !deleteVolume && v.IsReadOnly() {
collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1
if v.noWriteOrDelete {
collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1
@ -267,7 +277,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
glog.V(0).Infof("volume %d is deleted", vid)
}
} else {
glog.V(0).Infof("delete volume %d: %v", vid, err)
glog.Warningf("delete volume %d: %v", vid, err)
}
}
location.volumesLock.Unlock()
@ -446,7 +456,7 @@ func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
// load, modify, save
baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
vifFile := filepath.Join(location.Directory, baseFileName+".vif")
volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile)
volumeInfo, _, _, err := pb.MaybeLoadVolumeInfo(vifFile)
if err != nil {
return fmt.Errorf("volume %d fail to load vif", i)
}

View file

@ -36,6 +36,9 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
}
func (rp *ReplicaPlacement) Byte() byte {
if rp == nil {
return 0
}
ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
return byte(ret)
}

View file

@ -39,12 +39,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
}()
hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0
hasVolumeInfoFile := v.maybeLoadVolumeInfo()
if v.HasRemoteFile() {
v.noWriteCanDelete = true
v.noWriteOrDelete = false
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files)
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo)
v.LoadRemoteFile()
alreadyHasSuperBlock = true
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
@ -83,6 +83,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if alreadyHasSuperBlock {
err = v.readSuperBlock()
glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version)
if v.HasRemoteFile() {
// maybe temporary network problem
glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err)
err = nil
}
} else {
if !v.SuperBlock.Initialized() {
return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))

View file

@ -104,47 +104,8 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
return
}
if v.isFileUnchanged(n) {
size = Size(n.DataSize)
isUnchanged = true
return
}
// check whether existing needle cookie matches
nv, ok := v.nm.Get(n.Id)
if ok {
existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
if existingNeedleReadErr != nil {
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
return
}
if existingNeedle.Cookie != n.Cookie {
glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie)
err = fmt.Errorf("mismatching cookie %x", n.Cookie)
return
}
}
// append to dat file
n.AppendAtNs = uint64(time.Now().UnixNano())
offset, size, _, err = n.Append(v.DataBackend, v.Version())
v.checkReadWriteError(err)
if err != nil {
return
}
v.lastAppendAtNs = n.AppendAtNs
// add to needle map
if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
}
if v.lastModifiedTsSeconds < n.LastModified {
v.lastModifiedTsSeconds = n.LastModified
}
return
return v.doWriteRequest(n)
}
func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) {
@ -223,24 +184,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
return 0, err
}
nv, ok := v.nm.Get(n.Id)
// fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok && nv.Size.IsValid() {
size := nv.Size
n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano())
offset, _, _, err := n.Append(v.DataBackend, v.Version())
v.checkReadWriteError(err)
if err != nil {
return size, err
}
v.lastAppendAtNs = n.AppendAtNs
if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
return size, err
}
return size, err
}
return 0, nil
return v.doDeleteRequest(n)
}
func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) {

View file

@ -0,0 +1,104 @@
package storage
import (
"bufio"
"fmt"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
df, ok := v.DataBackend.(*backend.DiskFile)
if !ok {
return fmt.Errorf("unexpected volume backend")
}
offset, _, _ := v.DataBackend.GetStat()
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
CookieToBytes(header[0:CookieSize], n.Cookie)
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
n.Size = 4 + Size(dataSize) + 1
SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
n.DataSize = dataSize
// needle header
df.Write(header[0:NeedleHeaderSize])
// data size and data
util.Uint32toBytes(header[0:4], n.DataSize)
df.Write(header[0:4])
// write and calculate CRC
crcWriter := needle.NewCRCwriter(df)
io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
// flags
util.Uint8toBytes(header[0:1], n.Flags)
df.Write(header[0:1])
// data checksum
util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
// write timestamp, padding
n.AppendAtNs = uint64(time.Now().UnixNano())
util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
padding := needle.PaddingLength(n.Size, needle.Version3)
df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
// add to needle map
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
return
}
func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
nv, ok := v.nm.Get(n.Id)
if !ok || nv.Offset.IsZero() {
return ErrorNotFound
}
sr := &StreamReader{
readerAt: v.DataBackend,
offset: nv.Offset.ToActualOffset(),
}
bufReader := bufio.NewReader(sr)
bufReader.Discard(NeedleHeaderSize)
sizeBuf := make([]byte, 4)
bufReader.Read(sizeBuf)
if _, err = writer.Write(sizeBuf); err != nil {
return err
}
dataSize := util.BytesToUint32(sizeBuf)
_, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
return
}
type StreamReader struct {
offset int64
readerAt io.ReaderAt
}
func (sr *StreamReader) Read(p []byte) (n int, err error) {
n, err = sr.readerAt.ReadAt(p, sr.offset)
if err != nil {
return
}
sr.offset += int64(n)
return
}

Some files were not shown because too many files have changed in this diff Show more