mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
use separate filer grpc port
This commit is contained in:
parent
95fe745a0c
commit
299312c805
|
@ -10,7 +10,6 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/server"
|
"github.com/chrislusf/seaweedfs/weed/server"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/soheilhy/cmux"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/reflection"
|
"google.golang.org/grpc/reflection"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -24,6 +23,7 @@ type FilerOptions struct {
|
||||||
masters *string
|
masters *string
|
||||||
ip *string
|
ip *string
|
||||||
port *int
|
port *int
|
||||||
|
grpcPort *int
|
||||||
publicPort *int
|
publicPort *int
|
||||||
collection *string
|
collection *string
|
||||||
defaultReplicaPlacement *string
|
defaultReplicaPlacement *string
|
||||||
|
@ -39,6 +39,7 @@ func init() {
|
||||||
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
|
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
|
||||||
f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address")
|
f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address")
|
||||||
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
|
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
|
||||||
|
f.grpcPort = cmdFiler.Flag.Int("port.grpc", 0, "filer grpc server listen port, default to http port + 10000")
|
||||||
f.publicPort = cmdFiler.Flag.Int("port.public", 0, "port opened to public")
|
f.publicPort = cmdFiler.Flag.Int("port.public", 0, "port opened to public")
|
||||||
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
|
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
|
||||||
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
|
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
|
||||||
|
@ -119,21 +120,22 @@ func (fo *FilerOptions) start() {
|
||||||
glog.Fatalf("Filer listener error: %v", e)
|
glog.Fatalf("Filer listener error: %v", e)
|
||||||
}
|
}
|
||||||
|
|
||||||
m := cmux.New(filerListener)
|
// starting grpc server
|
||||||
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
|
grpcPort := *f.grpcPort
|
||||||
httpL := m.Match(cmux.Any())
|
if grpcPort == 0 {
|
||||||
|
grpcPort = *f.port + 10000
|
||||||
// Create your protocol servers.
|
}
|
||||||
|
grpcL, err := util.NewListener(":"+strconv.Itoa(grpcPort), 0)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
|
||||||
|
}
|
||||||
grpcS := grpc.NewServer()
|
grpcS := grpc.NewServer()
|
||||||
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
|
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
|
||||||
reflection.Register(grpcS)
|
reflection.Register(grpcS)
|
||||||
|
go grpcS.Serve(grpcL)
|
||||||
|
|
||||||
httpS := &http.Server{Handler: defaultMux}
|
httpS := &http.Server{Handler: defaultMux}
|
||||||
|
if err := httpS.Serve(filerListener); err != nil {
|
||||||
go grpcS.Serve(grpcL)
|
|
||||||
go httpS.Serve(httpL)
|
|
||||||
|
|
||||||
if err := m.Serve(); err != nil {
|
|
||||||
glog.Fatalf("Filer Fail to serve: %v", e)
|
glog.Fatalf("Filer Fail to serve: %v", e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,13 +25,14 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type CopyOptions struct {
|
type CopyOptions struct {
|
||||||
master *string
|
filerGrpcPort *int
|
||||||
include *string
|
master *string
|
||||||
replication *string
|
include *string
|
||||||
collection *string
|
replication *string
|
||||||
ttl *string
|
collection *string
|
||||||
maxMB *int
|
ttl *string
|
||||||
secretKey *string
|
maxMB *int
|
||||||
|
secretKey *string
|
||||||
|
|
||||||
secret security.Secret
|
secret security.Secret
|
||||||
}
|
}
|
||||||
|
@ -45,6 +46,7 @@ func init() {
|
||||||
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
|
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
|
||||||
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
|
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
|
||||||
copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
|
copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
|
||||||
|
copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
|
||||||
copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,15 +89,33 @@ func runCopy(cmd *Command, args []string) bool {
|
||||||
urlPath = urlPath + "/"
|
urlPath = urlPath + "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if filerUrl.Port() == "" {
|
||||||
|
fmt.Printf("The filer port should be specified.\n")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
|
||||||
|
if parseErr != nil {
|
||||||
|
fmt.Printf("The filer port parse error: %v\n", parseErr)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
filerGrpcPort := filerPort + 10000
|
||||||
|
if *copy.filerGrpcPort != 0 {
|
||||||
|
filerGrpcPort = uint64(*copy.filerGrpcPort)
|
||||||
|
}
|
||||||
|
|
||||||
|
filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
|
||||||
|
|
||||||
for _, fileOrDir := range fileOrDirs {
|
for _, fileOrDir := range fileOrDirs {
|
||||||
if !doEachCopy(fileOrDir, filerUrl.Host, urlPath) {
|
if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func doEachCopy(fileOrDir string, host string, path string) bool {
|
func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool {
|
||||||
f, err := os.Open(fileOrDir)
|
f, err := os.Open(fileOrDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
|
fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
|
||||||
|
@ -113,7 +133,7 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
|
||||||
if mode.IsDir() {
|
if mode.IsDir() {
|
||||||
files, _ := ioutil.ReadDir(fileOrDir)
|
files, _ := ioutil.ReadDir(fileOrDir)
|
||||||
for _, subFileOrDir := range files {
|
for _, subFileOrDir := range files {
|
||||||
if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), host, path+fi.Name()+"/") {
|
if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -135,13 +155,13 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
if chunkCount == 1 {
|
if chunkCount == 1 {
|
||||||
return uploadFileAsOne(host, path, f, fi)
|
return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi)
|
||||||
}
|
}
|
||||||
|
|
||||||
return uploadFileInChunks(host, path, f, fi, chunkCount, chunkSize)
|
return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo) bool {
|
func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool {
|
||||||
|
|
||||||
// upload the file content
|
// upload the file content
|
||||||
fileName := filepath.Base(f.Name())
|
fileName := filepath.Base(f.Name())
|
||||||
|
@ -183,10 +203,10 @@ func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileIn
|
||||||
Mtime: time.Now().UnixNano(),
|
Mtime: time.Now().UnixNano(),
|
||||||
})
|
})
|
||||||
|
|
||||||
fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName)
|
fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
|
if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
request := &filer_pb.CreateEntryRequest{
|
request := &filer_pb.CreateEntryRequest{
|
||||||
Directory: urlFolder,
|
Directory: urlFolder,
|
||||||
Entry: &filer_pb.Entry{
|
Entry: &filer_pb.Entry{
|
||||||
|
@ -209,14 +229,14 @@ func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileIn
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err)
|
fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
|
func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
|
||||||
|
|
||||||
fileName := filepath.Base(f.Name())
|
fileName := filepath.Base(f.Name())
|
||||||
mimeType := detectMimeType(f)
|
mimeType := detectMimeType(f)
|
||||||
|
@ -259,7 +279,7 @@ func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.Fil
|
||||||
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
|
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
|
if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
request := &filer_pb.CreateEntryRequest{
|
request := &filer_pb.CreateEntryRequest{
|
||||||
Directory: urlFolder,
|
Directory: urlFolder,
|
||||||
Entry: &filer_pb.Entry{
|
Entry: &filer_pb.Entry{
|
||||||
|
@ -282,11 +302,11 @@ func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.Fil
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err)
|
fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName)
|
fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package command
|
||||||
|
|
||||||
type MountOptions struct {
|
type MountOptions struct {
|
||||||
filer *string
|
filer *string
|
||||||
|
filerGrpcPort *int
|
||||||
dir *string
|
dir *string
|
||||||
collection *string
|
collection *string
|
||||||
replication *string
|
replication *string
|
||||||
|
@ -15,6 +16,7 @@ var (
|
||||||
func init() {
|
func init() {
|
||||||
cmdMount.Run = runMount // break init cycle
|
cmdMount.Run = runMount // break init cycle
|
||||||
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
|
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
|
||||||
|
mountOptions.filerGrpcPort = cmdMount.Flag.Int("filer.grpc.port", 0, "filer grpc server listen port, default to http port + 10000")
|
||||||
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
|
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
|
||||||
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
|
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
|
||||||
mountOptions.replication = cmdMount.Flag.String("replication", "000", "replication to create to files")
|
mountOptions.replication = cmdMount.Flag.String("replication", "000", "replication to create to files")
|
||||||
|
|
|
@ -11,6 +11,8 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/filesys"
|
"github.com/chrislusf/seaweedfs/weed/filesys"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"strings"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
func runMount(cmd *Command, args []string) bool {
|
func runMount(cmd *Command, args []string) bool {
|
||||||
|
@ -51,8 +53,27 @@ func runMount(cmd *Command, args []string) bool {
|
||||||
c.Close()
|
c.Close()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
hostnameAndPort := strings.Split(*mountOptions.filer, ":")
|
||||||
|
if len(hostnameAndPort) != 2 {
|
||||||
|
fmt.Printf("The filer should have hostname:port format: %v\n", hostnameAndPort)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
|
||||||
|
if parseErr != nil {
|
||||||
|
fmt.Printf("The filer filer port parse error: %v\n", parseErr)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
filerGrpcPort := filerPort + 10000
|
||||||
|
if *mountOptions.filerGrpcPort != 0 {
|
||||||
|
filerGrpcPort = uint64(*copy.filerGrpcPort)
|
||||||
|
}
|
||||||
|
|
||||||
|
filerAddress := fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort)
|
||||||
|
|
||||||
err = fs.Serve(c, filesys.NewSeaweedFileSystem(
|
err = fs.Serve(c, filesys.NewSeaweedFileSystem(
|
||||||
*mountOptions.filer, *mountOptions.collection, *mountOptions.replication, *mountOptions.chunkSizeLimitMB))
|
filerAddress, *mountOptions.collection, *mountOptions.replication, *mountOptions.chunkSizeLimitMB))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fuse.Unmount(*mountOptions.dir)
|
fuse.Unmount(*mountOptions.dir)
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,7 +133,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, fmt.Errorf("filer assign volume: %v", err)
|
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
||||||
|
|
|
@ -26,7 +26,7 @@ type File struct {
|
||||||
isOpen bool
|
isOpen bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
|
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
|
||||||
|
|
||||||
fullPath := filepath.Join(file.dir.Path, file.Name)
|
fullPath := filepath.Join(file.dir.Path, file.Name)
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
|
||||||
ParentDir: file.dir.Path,
|
ParentDir: file.dir.Path,
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := client.GetEntryAttributes(context, request)
|
resp, err := client.GetEntryAttributes(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("file attr read file %v: %v", request, err)
|
glog.V(0).Infof("file attr read file %v: %v", request, err)
|
||||||
return err
|
return err
|
||||||
|
@ -129,7 +129,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
|
||||||
|
|
||||||
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
||||||
// fsync works at OS level
|
// fsync works at OS level
|
||||||
// write the file chunks to the filer
|
// write the file chunks to the filerGrpcAddress
|
||||||
glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
|
glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -9,16 +9,16 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type WFS struct {
|
type WFS struct {
|
||||||
filer string
|
filerGrpcAddress string
|
||||||
listDirectoryEntriesCache *ccache.Cache
|
listDirectoryEntriesCache *ccache.Cache
|
||||||
collection string
|
collection string
|
||||||
replication string
|
replication string
|
||||||
chunkSizeLimit int64
|
chunkSizeLimit int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSeaweedFileSystem(filer string, collection string, replication string, chunkSizeLimitMB int) *WFS {
|
func NewSeaweedFileSystem(filerGrpcAddress string, collection string, replication string, chunkSizeLimitMB int) *WFS {
|
||||||
return &WFS{
|
return &WFS{
|
||||||
filer: filer,
|
filerGrpcAddress: filerGrpcAddress,
|
||||||
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)),
|
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)),
|
||||||
collection: collection,
|
collection: collection,
|
||||||
replication: replication,
|
replication: replication,
|
||||||
|
@ -32,9 +32,9 @@ func (wfs *WFS) Root() (fs.Node, error) {
|
||||||
|
|
||||||
func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
grpcConnection, err := grpc.Dial(wfs.filer, grpc.WithInsecure())
|
grpcConnection, err := grpc.Dial(wfs.filerGrpcAddress, grpc.WithInsecure())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("fail to dial %s: %v", wfs.filer, err)
|
return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err)
|
||||||
}
|
}
|
||||||
defer grpcConnection.Close()
|
defer grpcConnection.Close()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue