Merge pull request #28 from chrislusf/master

sync
This commit is contained in:
hilimd 2020-10-18 09:44:04 +08:00 committed by GitHub
commit 62af2d961d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 3178 additions and 382 deletions

View file

@ -1,4 +1,4 @@
apiVersion: v1 apiVersion: v1
description: SeaweedFS description: SeaweedFS
name: seaweedfs name: seaweedfs
version: 2.04 version: 2.05

View file

@ -17,6 +17,12 @@ spec:
port: {{ .Values.filer.grpcPort }} port: {{ .Values.filer.grpcPort }}
targetPort: {{ .Values.filer.grpcPort }} targetPort: {{ .Values.filer.grpcPort }}
protocol: TCP protocol: TCP
{{- if .Values.filer.metricsPort }}
- name: "swfs-filer-metrics"
port: {{ .Values.filer.metricsPort }}
targetPort: {{ .Values.filer.metricsPort }}
protocol: TCP
{{- end }}
selector: selector:
app: {{ template "seaweedfs.name" . }} app: {{ template "seaweedfs.name" . }}
component: filer component: filer

View file

@ -0,0 +1,18 @@
{{- if .Values.filer.metricsPort }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ template "seaweedfs.name" . }}-filer
namespace: {{ .Release.Namespace }}
labels:
app: {{ template "seaweedfs.name" . }}
component: filer
spec:
endpoints:
- interval: 30s
port: swfs-filer-metrics
scrapeTimeout: 5s
selector:
app: {{ template "seaweedfs.name" . }}
component: filer
{{- end }}

View file

@ -99,6 +99,9 @@ spec:
{{- end }} {{- end }}
filer \ filer \
-port={{ .Values.filer.port }} \ -port={{ .Values.filer.port }} \
{{- if .Values.filer.metricsPort }}
-metricsPort {{ .Values.filer.metricsPort }} \
{{- end }}}
{{- if .Values.filer.redirectOnRead }} {{- if .Values.filer.redirectOnRead }}
-redirectOnRead \ -redirectOnRead \
{{- end }} {{- end }}

View file

@ -71,6 +71,9 @@ spec:
{{- end }} {{- end }}
s3 \ s3 \
-port={{ .Values.s3.port }} \ -port={{ .Values.s3.port }} \
{{- if .Values.s3.metricsPort }}
-metricsPort {{ .Values.s3.metricsPort }} \
{{- end }}}
{{- if .Values.global.enableSecurity }} {{- if .Values.global.enableSecurity }}
-cert.file=/usr/local/share/ca-certificates/client/tls.crt \ -cert.file=/usr/local/share/ca-certificates/client/tls.crt \
-key.file=/usr/local/share/ca-certificates/client/tls.key \ -key.file=/usr/local/share/ca-certificates/client/tls.key \

View file

@ -12,6 +12,12 @@ spec:
port: {{ .Values.s3.port }} port: {{ .Values.s3.port }}
targetPort: {{ .Values.s3.port }} targetPort: {{ .Values.s3.port }}
protocol: TCP protocol: TCP
{{- if .Values.s3.metricsPort }}
- name: "swfs-s3-metrics"
port: {{ .Values.filer.s3 }}
targetPort: {{ .Values.s3.metricsPort }}
protocol: TCP
{{- end }}}
selector: selector:
app: {{ template "seaweedfs.name" . }} app: {{ template "seaweedfs.name" . }}
component: s3 component: s3

View file

@ -0,0 +1,18 @@
{{- if .Values.s3.metricsPort }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ template "seaweedfs.name" . }}-s3
namespace: {{ .Release.Namespace }}
labels:
app: {{ template "seaweedfs.name" . }}
component: s3
spec:
endpoints:
- interval: 30s
port: swfs-s3-metrics
scrapeTimeout: 5s
selector:
app: {{ template "seaweedfs.name" . }}
component: s3
{{- end }}}

View file

@ -17,6 +17,12 @@ spec:
port: {{ .Values.volume.grpcPort }} port: {{ .Values.volume.grpcPort }}
targetPort: {{ .Values.volume.grpcPort }} targetPort: {{ .Values.volume.grpcPort }}
protocol: TCP protocol: TCP
{{- if .Values.volume.metricsPort }}
- name: "swfs-volume-metrics"
port: {{ .Values.volume.metricsPort }}
targetPort: {{ .Values.volume.metricsPort }}
protocol: TCP
{{- end }}}
selector: selector:
app: {{ template "seaweedfs.name" . }} app: {{ template "seaweedfs.name" . }}
component: volume component: volume

View file

@ -0,0 +1,18 @@
{{- if .Values.volume.metricsPort }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ template "seaweedfs.name" . }}-volume
namespace: {{ .Release.Namespace }}
labels:
app: {{ template "seaweedfs.name" . }}
component: volume
spec:
endpoints:
- interval: 30s
port: swfs-volume-metrics
scrapeTimeout: 5s
selector:
app: {{ template "seaweedfs.name" . }}
component: volume
{{- end }}}

View file

@ -76,6 +76,9 @@ spec:
{{- end }} {{- end }}
volume \ volume \
-port={{ .Values.volume.port }} \ -port={{ .Values.volume.port }} \
{{- if .Values.volume.metricsPort }}
-metricsPort {{ .Values.volume.metricsPort }} \
{{- end }}}
-dir={{ .Values.volume.dir }} \ -dir={{ .Values.volume.dir }} \
-max={{ .Values.volume.maxVolumes }} \ -max={{ .Values.volume.maxVolumes }} \
{{- if .Values.volume.rack }} {{- if .Values.volume.rack }}

View file

@ -4,7 +4,7 @@ global:
registry: "" registry: ""
repository: "" repository: ""
imageName: chrislusf/seaweedfs imageName: chrislusf/seaweedfs
imageTag: "2.04" imageTag: "2.05"
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret imagePullSecrets: imagepullsecret
restartPolicy: Always restartPolicy: Always
@ -121,6 +121,7 @@ volume:
restartPolicy: null restartPolicy: null
port: 8080 port: 8080
grpcPort: 18080 grpcPort: 18080
metricsPort: 9327
ipBind: "0.0.0.0" ipBind: "0.0.0.0"
replicas: 1 replicas: 1
loggingOverrideLevel: null loggingOverrideLevel: null
@ -209,6 +210,7 @@ filer:
replicas: 1 replicas: 1
port: 8888 port: 8888
grpcPort: 18888 grpcPort: 18888
metricsPort: 9327
loggingOverrideLevel: null loggingOverrideLevel: null
# replication type is XYZ: # replication type is XYZ:
# X number of replica in other data centers # X number of replica in other data centers
@ -313,6 +315,7 @@ s3:
restartPolicy: null restartPolicy: null
replicas: 1 replicas: 1
port: 8333 port: 8333
metricsPort: 9327
loggingOverrideLevel: null loggingOverrideLevel: null
# Suffix of the host name, {bucket}.{domainName} # Suffix of the host name, {bucket}.{domainName}

View file

@ -37,6 +37,9 @@ service SeaweedFiler {
rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) { rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {
} }
rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) {
}
rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) { rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) {
} }
@ -244,6 +247,16 @@ message LookupVolumeResponse {
map<string, Locations> locations_map = 1; map<string, Locations> locations_map = 1;
} }
message Collection {
string name = 1;
}
message CollectionListRequest {
bool include_normal_volumes = 1;
bool include_ec_volumes = 2;
}
message CollectionListResponse {
repeated Collection collections = 1;
}
message DeleteCollectionRequest { message DeleteCollectionRequest {
string collection = 1; string collection = 1;
} }

File diff suppressed because it is too large Load diff

View file

@ -25,3 +25,7 @@ debug_server:
debug_volume: debug_volume:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- volume -dir=/Volumes/mobile_disk/100 -port 8564 -max=30 -preStopSeconds=2 dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- volume -dir=/Volumes/mobile_disk/100 -port 8564 -max=30 -preStopSeconds=2
debug_webdav:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 webdav

View file

@ -145,7 +145,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
go func() { go func() {
time.Sleep(1500 * time.Millisecond) time.Sleep(1500 * time.Millisecond)
if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
if ms.MasterClient.FindLeader(myMasterAddress) == "" { if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
raftServer.DoJoinCommand() raftServer.DoJoinCommand()
} }
} }

View file

@ -3,6 +3,7 @@ package abstract_sql
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/base64"
"fmt" "fmt"
"strings" "strings"
@ -80,8 +81,8 @@ func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) {
} }
dirHash = int64(util.BytesToUint64(key[:8])) dirHash = int64(util.BytesToUint64(key[:8]))
dirStr = string(key[:8]) dirStr = base64.StdEncoding.EncodeToString(key[:8])
name = string(key[8:]) name = base64.StdEncoding.EncodeToString(key[8:])
return return
} }

View file

@ -2,6 +2,7 @@ package cassandra
import ( import (
"context" "context"
"encoding/base64"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/gocql/gocql" "github.com/gocql/gocql"
@ -54,8 +55,8 @@ func genDirAndName(key []byte) (dir string, name string) {
key = append(key, 0) key = append(key, 0)
} }
dir = string(key[:8]) dir = base64.StdEncoding.EncodeToString(key[:8])
name = string(key[8:]) name = base64.StdEncoding.EncodeToString(key[8:])
return return
} }

View file

@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io" "io"
"sync" "sync"
"time"
) )
type ContinuousDirtyPages struct { type ContinuousDirtyPages struct {
@ -13,6 +14,7 @@ type ContinuousDirtyPages struct {
f *File f *File
writeWaitGroup sync.WaitGroup writeWaitGroup sync.WaitGroup
chunkSaveErrChan chan error chunkSaveErrChan chan error
chunkSaveErrChanClosed bool
lock sync.Mutex lock sync.Mutex
collection string collection string
replication string replication string
@ -81,6 +83,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
if pages.chunkSaveErrChanClosed {
pages.chunkSaveErrChan = make(chan error, 8)
pages.chunkSaveErrChanClosed = false
}
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1) pages.writeWaitGroup.Add(1)
go func() { go func() {
defer pages.writeWaitGroup.Done() defer pages.writeWaitGroup.Done()
@ -94,19 +102,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
pages.chunkSaveErrChan <- err pages.chunkSaveErrChan <- err
return return
} }
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication pages.collection, pages.replication = collection, replication
pages.f.addChunks([]*filer_pb.FileChunk{chunk}) pages.f.addChunks([]*filer_pb.FileChunk{chunk})
pages.chunkSaveErrChan <- nil pages.chunkSaveErrChan <- nil
}() }()
} }
func maxUint64(x, y uint64) uint64 {
if x > y {
return x
}
return y
}
func max(x, y int64) int64 { func max(x, y int64) int64 {
if x > y { if x > y {
return x return x

View file

@ -77,6 +77,10 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
totalRead = max(maxStop-req.Offset, totalRead) totalRead = max(maxStop-req.Offset, totalRead)
} }
if err == io.EOF {
err = nil
}
if err != nil { if err != nil {
glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err) glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err)
return fuse.EIO return fuse.EIO
@ -122,11 +126,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
totalRead, err := fh.f.reader.ReadAt(buff, offset) totalRead, err := fh.f.reader.ReadAt(buff, offset)
if err == io.EOF { if err != nil && err != io.EOF{
err = nil
}
if err != nil {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
} }
@ -179,10 +179,16 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
} }
if fh.f.isOpen == 0 { if fh.f.isOpen == 0 {
fh.doFlush(ctx, req.Header) if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
}
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
} }
// stop the goroutine
fh.dirtyPages.chunkSaveErrChanClosed = true
close(fh.dirtyPages.chunkSaveErrChan)
return nil return nil
} }

View file

@ -37,6 +37,9 @@ service SeaweedFiler {
rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) { rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {
} }
rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) {
}
rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) { rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) {
} }
@ -244,6 +247,16 @@ message LookupVolumeResponse {
map<string, Locations> locations_map = 1; map<string, Locations> locations_map = 1;
} }
message Collection {
string name = 1;
}
message CollectionListRequest {
bool include_normal_volumes = 1;
bool include_ec_volumes = 2;
}
message CollectionListResponse {
repeated Collection collections = 1;
}
message DeleteCollectionRequest { message DeleteCollectionRequest {
string collection = 1; string collection = 1;
} }

File diff suppressed because it is too large Load diff

View file

@ -58,8 +58,36 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
// avoid duplicated buckets
errCode := s3err.ErrNone
if err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{
IncludeEcVolumes: true,
IncludeNormalVolumes: true,
}); err != nil {
glog.Errorf("list collection: %v", err)
return fmt.Errorf("list collections: %v", err)
}else {
for _, c := range resp.Collections {
if bucket == c.Name {
errCode = s3err.ErrBucketAlreadyExists
break
}
}
}
return nil
}); err != nil {
writeErrorResponse(w, s3err.ErrInternalError, r.URL)
return
}
if errCode != s3err.ErrNone {
writeErrorResponse(w, errCode, r.URL)
return
}
// create the folder for bucket, but lazily create actual collection // create the folder for bucket, but lazily create actual collection
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, nil); err != nil { if err := s3a.mkdir(s3a.option.BucketsPath, bucket, nil); err != nil {
glog.Errorf("PutBucketHandler mkdir: %v", err)
writeErrorResponse(w, s3err.ErrInternalError, r.URL) writeErrorResponse(w, s3err.ErrInternalError, r.URL)
return return
} }

View file

@ -383,6 +383,28 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
}, nil }, nil
} }
func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
glog.V(4).Infof("CollectionList %v", req)
resp = &filer_pb.CollectionListResponse{}
err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: req.IncludeNormalVolumes,
IncludeEcVolumes: req.IncludeEcVolumes,
})
if err != nil {
return err
}
for _, c := range masterResp.Collections {
resp.Collections = append(resp.Collections, &filer_pb.Collection{Name: c.Name})
}
return nil
})
return
}
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) { func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
glog.V(4).Infof("DeleteCollection %v", req) glog.V(4).Infof("DeleteCollection %v", req)

View file

@ -489,11 +489,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize)) glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
f.off += int64(readSize) f.off += int64(readSize)
if err == io.EOF { if err != nil && err != io.EOF {
err = nil
}
if err != nil {
glog.Errorf("file read %s: %v", f.name, err) glog.Errorf("file read %s: %v", f.name, err)
} }

View file

@ -77,6 +77,14 @@ var (
Help: "Number of volumes or shards.", Help: "Number of volumes or shards.",
}, []string{"collection", "type"}) }, []string{"collection", "type"})
VolumeServerReadOnlyVolumeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "SeaweedFS",
Subsystem: "volumeServer",
Name: "read_only_volumes",
Help: "Number of read only volumes.",
}, []string{"collection", "type"})
VolumeServerMaxVolumeCounter = prometheus.NewGauge( VolumeServerMaxVolumeCounter = prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Namespace: "SeaweedFS", Namespace: "SeaweedFS",
@ -122,6 +130,7 @@ func init() {
Gather.MustRegister(VolumeServerRequestHistogram) Gather.MustRegister(VolumeServerRequestHistogram)
Gather.MustRegister(VolumeServerVolumeCounter) Gather.MustRegister(VolumeServerVolumeCounter)
Gather.MustRegister(VolumeServerMaxVolumeCounter) Gather.MustRegister(VolumeServerMaxVolumeCounter)
Gather.MustRegister(VolumeServerReadOnlyVolumeGauge)
Gather.MustRegister(VolumeServerDiskSizeGauge) Gather.MustRegister(VolumeServerDiskSizeGauge)
Gather.MustRegister(S3RequestCounter) Gather.MustRegister(S3RequestCounter)

View file

@ -200,6 +200,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
maxVolumeCount := 0 maxVolumeCount := 0
var maxFileKey NeedleId var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64) collectionVolumeSize := make(map[string]uint64)
collectionVolumeReadOnlyCount := make(map[string]uint8)
for _, location := range s.Locations { for _, location := range s.Locations {
var deleteVids []needle.VolumeId var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
@ -219,6 +220,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
} }
fileSize, _, _ := v.FileStat() fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize collectionVolumeSize[v.Collection] += fileSize
if v.IsReadOnly() {
collectionVolumeReadOnlyCount[v.Collection] += 1
}
} }
location.volumesLock.RUnlock() location.volumesLock.RUnlock()
@ -243,6 +247,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size)) stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
} }
for col, count := range collectionVolumeReadOnlyCount {
stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, "normal").Set(float64(count))
}
return &master_pb.Heartbeat{ return &master_pb.Heartbeat{
Ip: s.Ip, Ip: s.Ip,
Port: uint32(s.Port), Port: uint32(s.Port),

View file

@ -5,7 +5,7 @@ import (
) )
var ( var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 04) VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 05)
COMMIT = "" COMMIT = ""
) )

View file

@ -52,7 +52,7 @@ func (mc *MasterClient) KeepConnectedToMaster() {
} }
} }
func (mc *MasterClient) FindLeader(myMasterAddress string) (leader string) { func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress string) (leader string) {
for _, master := range mc.masters { for _, master := range mc.masters {
if master == myMasterAddress { if master == myMasterAddress {
continue continue