From f90c43635d96cace1ab1ca965a56a082f880aa4b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 4 Mar 2020 00:39:47 -0800 Subject: [PATCH] refactoring --- weed/command/command.go | 2 +- weed/command/filer.go | 3 +- weed/command/filer_copy.go | 20 +-- weed/command/master.go | 3 +- weed/command/mount.go | 21 --- weed/command/mount_std.go | 7 +- weed/command/msg_broker.go | 111 ++++++++++++++++ weed/command/queue.go | 107 ---------------- weed/command/s3.go | 5 +- weed/command/scaffold.go | 2 +- weed/command/volume.go | 3 +- weed/command/webdav.go | 3 +- weed/filesys/wfs.go | 4 +- weed/operation/grpc_client.go | 8 +- weed/{util => pb}/grpc_client_server.go | 55 +++++++- .../replication/sink/filersink/fetch_write.go | 4 +- weed/replication/source/filer_source.go | 3 +- weed/s3api/s3api_handlers.go | 4 +- weed/server/msg_broker_grpc_server.go | 23 ++++ weed/server/msg_broker_server.go | 121 ++++++++++++++++++ weed/server/queue_server.go | 49 ------- weed/server/raft_server.go | 11 +- weed/server/volume_grpc_client_to_master.go | 5 +- weed/server/webdav_server.go | 3 +- weed/shell/command_fs_du.go | 9 +- weed/wdclient/masterclient.go | 21 +-- 26 files changed, 361 insertions(+), 246 deletions(-) create mode 100644 weed/command/msg_broker.go delete mode 100644 weed/command/queue.go rename weed/{util => pb}/grpc_client_server.go (65%) create mode 100644 weed/server/msg_broker_grpc_server.go create mode 100644 weed/server/msg_broker_server.go delete mode 100644 weed/server/queue_server.go diff --git a/weed/command/command.go b/weed/command/command.go index 6687469f1..9dc51e922 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -20,7 +20,7 @@ var Commands = []*Command{ cmdS3, cmdUpload, cmdDownload, - cmdQueue, + cmdMsgBroker, cmdScaffold, cmdShell, cmdVersion, diff --git a/weed/command/filer.go b/weed/command/filer.go index 31e65acea..b5b595215 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" @@ -144,7 +145,7 @@ func (fo *FilerOptions) startFiler() { if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) go grpcS.Serve(grpcL) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 18f41048b..3e7ae1db2 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -159,7 +160,7 @@ func runCopy(cmd *Command, args []string) bool { } func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) { - err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -274,7 +275,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if task.fileSize > 0 { // assign a volume - err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -319,7 +320,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) } - if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -375,7 +376,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, // assign a volume var assignResult *filer_pb.AssignVolumeResponse var assignError error - err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -447,7 +448,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return uploadError } - if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -496,12 +497,3 @@ func detectMimeType(f *os.File) string { mimeType := http.DetectContentType(head[:n]) return mimeType } - -func withFilerClient(filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - - return util.WithCachedGrpcClient(func(clientConn *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(clientConn) - return fn(client) - }, filerAddress, grpcDialOption) - -} diff --git a/weed/command/master.go b/weed/command/master.go index c4b11119b..1be60426f 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" @@ -129,7 +130,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } // Create your protocol servers. - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) diff --git a/weed/command/mount.go b/weed/command/mount.go index 4bdb3415a..e73cbee10 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -1,11 +1,5 @@ package command -import ( - "fmt" - "strconv" - "strings" -) - type MountOptions struct { filer *string filerMountRootPath *string @@ -69,18 +63,3 @@ var cmdMount = &Command{ `, } -func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { - hostnameAndPort := strings.Split(filer, ":") - if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort) - } - - filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) - if parseErr != nil { - return "", fmt.Errorf("filer port parse error: %v", parseErr) - } - - filerGrpcPort := int(filerPort) + 10000 - - return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil -} diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index e8e3fb030..b195bf143 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -17,6 +17,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -135,16 +136,16 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente }) // parse filer grpc address - filerGrpcAddress, err := parseFilerGrpcAddress(filer) + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer) if err != nil { - glog.V(0).Infof("parseFilerGrpcAddress: %v", err) + glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) daemonize.SignalOutcome(err) return true } // try to connect to filer, filerBucketsPath may be useful later grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { _, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go new file mode 100644 index 000000000..0d69a9a66 --- /dev/null +++ b/weed/command/msg_broker.go @@ -0,0 +1,111 @@ +package command + +import ( + "context" + "fmt" + "strconv" + "time" + + "google.golang.org/grpc/reflection" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" + "github.com/chrislusf/seaweedfs/weed/security" + weed_server "github.com/chrislusf/seaweedfs/weed/server" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + messageBrokerStandaloneOptions QueueOptions +) + +type QueueOptions struct { + filer *string + port *int + tlsPrivateKey *string + tlsCertificate *string + defaultTtl *string +} + +func init() { + cmdMsgBroker.Run = runMsgBroker // break init cycle + messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") + messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port") + messageBrokerStandaloneOptions.tlsPrivateKey = cmdMsgBroker.Flag.String("key.file", "", "path to the TLS private key file") + messageBrokerStandaloneOptions.tlsCertificate = cmdMsgBroker.Flag.String("cert.file", "", "path to the TLS certificate file") + messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") +} + +var cmdMsgBroker = &Command{ + UsageLine: "msg.broker [-port=17777] [-filer=]", + Short: " start a message queue broker", + Long: `start a message queue broker + + The broker can accept gRPC calls to write or read messages. The messages are stored via filer. + The brokers are stateless. To scale up, just add more brokers. + +`, +} + +func runMsgBroker(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + return messageBrokerStandaloneOptions.startQueueServer() + +} + +func (msgBrokerOpt *QueueOptions) startQueueServer() bool { + + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer) + if err != nil { + glog.Fatal(err) + return false + } + + filerQueuesPath := "/queues" + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + for { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + filerQueuesPath = resp.DirQueues + glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath) + return nil + }) + if err != nil { + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + time.Sleep(time.Second) + } else { + glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + break + } + } + + qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{ + Filers: []string{*msgBrokerOpt.filer}, + DefaultReplication: "", + MaxMB: 0, + Port: *msgBrokerOpt.port, + }) + + // start grpc listener + grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) + queue_pb.RegisterSeaweedQueueServer(grpcS, qs) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} diff --git a/weed/command/queue.go b/weed/command/queue.go deleted file mode 100644 index d09d5d8b3..000000000 --- a/weed/command/queue.go +++ /dev/null @@ -1,107 +0,0 @@ -package command - -import ( - "context" - "fmt" - "strconv" - "time" - - "google.golang.org/grpc/reflection" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" - "github.com/chrislusf/seaweedfs/weed/security" - weed_server "github.com/chrislusf/seaweedfs/weed/server" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - queueStandaloneOptions QueueOptions -) - -type QueueOptions struct { - filer *string - port *int - tlsPrivateKey *string - tlsCertificate *string - defaultTtl *string -} - -func init() { - cmdQueue.Run = runQueue // break init cycle - queueStandaloneOptions.filer = cmdQueue.Flag.String("filer", "localhost:8888", "filer server address") - queueStandaloneOptions.port = cmdQueue.Flag.Int("port", 17777, "queue server gRPC listen port") - queueStandaloneOptions.tlsPrivateKey = cmdQueue.Flag.String("key.file", "", "path to the TLS private key file") - queueStandaloneOptions.tlsCertificate = cmdQueue.Flag.String("cert.file", "", "path to the TLS certificate file") - queueStandaloneOptions.defaultTtl = cmdQueue.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") -} - -var cmdQueue = &Command{ - UsageLine: " queue [-port=17777] [-filer=]", - Short: "start a queue gRPC server that is backed by a filer", - Long: `start a queue gRPC server that is backed by a filer. - -`, -} - -func runQueue(cmd *Command, args []string) bool { - - util.LoadConfiguration("security", false) - - return queueStandaloneOptions.startQueueServer() - -} - -func (queueopt *QueueOptions) startQueueServer() bool { - - filerGrpcAddress, err := parseFilerGrpcAddress(*queueopt.filer) - if err != nil { - glog.Fatal(err) - return false - } - - filerQueuesPath := "/queues" - - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - - for { - err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) - } - filerQueuesPath = resp.DirQueues - glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath) - return nil - }) - if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress) - time.Sleep(time.Second) - } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress) - break - } - } - - qs, err := weed_server.NewQueueServer(&weed_server.QueueServerOption{ - Filers: []string{*queueopt.filer}, - DefaultReplication: "", - MaxMB: 0, - Port: *queueopt.port, - }) - - // start grpc listener - grpcL, err := util.NewListener(":"+strconv.Itoa(*queueopt.port), 0) - if err != nil { - glog.Fatalf("failed to listen on grpc port %d: %v", *queueopt.port, err) - } - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.queue")) - queue_pb.RegisterSeaweedQueueServer(grpcS, qs) - reflection.Register(grpcS) - go grpcS.Serve(grpcL) - - return true - -} diff --git a/weed/command/s3.go b/weed/command/s3.go index 39d0c04fc..cd4018fbc 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -6,6 +6,7 @@ import ( "net/http" "time" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" @@ -117,7 +118,7 @@ func runS3(cmd *Command, args []string) bool { func (s3opt *S3Options) startS3Server() bool { - filerGrpcAddress, err := parseFilerGrpcAddress(*s3opt.filer) + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer) if err != nil { glog.Fatal(err) return false @@ -128,7 +129,7 @@ func (s3opt *S3Options) startS3Server() bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index fc7f8636d..5b246b7c0 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -326,7 +326,7 @@ key = "" cert = "" key = "" -[grpc.queue] +[grpc.msg_broker] cert = "" key = "" diff --git a/weed/command/volume.go b/weed/command/volume.go index 9d665d143..4773d8a55 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -13,6 +13,7 @@ import ( "github.com/spf13/viper" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util/httpdown" @@ -234,7 +235,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume")) volume_server_pb.RegisterVolumeServerServer(grpcS, vs) reflection.Register(grpcS) go func() { diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 4d5752247..ba88a17be 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -8,6 +8,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" @@ -54,7 +55,7 @@ func runWebDav(cmd *Command, args []string) bool { func (wo *WebDavOption) startWebDav() bool { - filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer) + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer) if err != nil { glog.Fatal(err) return false diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index aa530f6aa..83826fed5 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -14,8 +14,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) @@ -93,7 +93,7 @@ func (wfs *WFS) Root() (fs.Node, error) { func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - err := util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 7eed66503..dccf85da4 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -8,9 +8,9 @@ import ( "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/util" ) func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { @@ -20,7 +20,7 @@ func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, return err } - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, grpcAddress, grpcDialOption) @@ -39,12 +39,12 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer) + masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer) if parseErr != nil { return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr) } - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress, grpcDialOption) diff --git a/weed/util/grpc_client_server.go b/weed/pb/grpc_client_server.go similarity index 65% rename from weed/util/grpc_client_server.go rename to weed/pb/grpc_client_server.go index d6a9ee3c3..4b5f9eff3 100644 --- a/weed/util/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -1,4 +1,4 @@ -package util +package pb import ( "context" @@ -11,6 +11,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/keepalive" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" ) var ( @@ -127,3 +130,53 @@ func ServerToGrpcAddress(server string) (serverGrpcAddress string) { return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort) } + +func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { + + masterGrpcAddress, parseErr := ParseServerToGrpcAddress(master) + if parseErr != nil { + return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr) + } + + return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := master_pb.NewSeaweedClient(grpcConnection) + return fn(client) + }, masterGrpcAddress, grpcDialOption) + +} + +func WithFilerClient(filer string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { + + filerGrpcAddress, parseErr := ParseServerToGrpcAddress(filer) + if parseErr != nil { + return fmt.Errorf("failed to parse filer grpc %v: %v", filer, parseErr) + } + + return WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, fn) + +} + +func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { + + return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, grpcDialOption) + +} + +func ParseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { + hostnameAndPort := strings.Split(filer, ":") + if len(hostnameAndPort) != 2 { + return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort) + } + + filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + if parseErr != nil { + return "", fmt.Errorf("filer port parse error: %v", parseErr) + } + + filerGrpcPort := int(filerPort) + 10000 + + return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil +} diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 954e951c9..232b68fec 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -10,9 +10,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" ) func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) { @@ -111,7 +111,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 11eb3afa1..90bcffdf0 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/glog" @@ -92,7 +93,7 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 81a260a63..d7212d5e3 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -12,8 +12,8 @@ import ( "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" ) type mimeType string @@ -40,7 +40,7 @@ func encodeResponse(response interface{}) []byte { func (s3a *S3ApiServer) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) diff --git a/weed/server/msg_broker_grpc_server.go b/weed/server/msg_broker_grpc_server.go new file mode 100644 index 000000000..8b13aac76 --- /dev/null +++ b/weed/server/msg_broker_grpc_server.go @@ -0,0 +1,23 @@ +package weed_server + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" +) + +func (broker *MessageBroker) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) { + panic("implement me") +} + +func (broker *MessageBroker) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) { + panic("implement me") +} + +func (broker *MessageBroker) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error { + panic("implement me") +} + +func (broker *MessageBroker) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error { + panic("implement me") +} diff --git a/weed/server/msg_broker_server.go b/weed/server/msg_broker_server.go new file mode 100644 index 000000000..a9d908581 --- /dev/null +++ b/weed/server/msg_broker_server.go @@ -0,0 +1,121 @@ +package weed_server + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MessageBrokerOption struct { + Filers []string + DefaultReplication string + MaxMB int + Port int +} + +type MessageBroker struct { + option *MessageBrokerOption + grpcDialOption grpc.DialOption +} + +func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) { + + messageBroker = &MessageBroker{ + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"), + } + + go messageBroker.loopForEver() + + return messageBroker, nil +} + +func (broker *MessageBroker) loopForEver() { + + for { + broker.checkPeers() + time.Sleep(3 * time.Second) + } + +} + +func (broker *MessageBroker) checkPeers() { + + // contact a filer about masters + var masters []string + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + masters = append(masters, resp.Masters...) + return nil + }) + if err != nil { + fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err) + return + } + } + + // contact each masters for filers + var filers []string + for _, master := range masters { + err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ + ClientType: "filer", + }) + if err != nil { + return err + } + + fmt.Printf("filers: %+v\n", resp.GrpcAddresses) + filers = append(filers, resp.GrpcAddresses...) + + return nil + }) + if err != nil { + fmt.Printf("failed to list filers: %v\n", err) + return + } + } + + // contact each filer about brokers + for _, filer := range filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + masters = append(masters, resp.Masters...) + return nil + }) + if err != nil { + fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err) + return + } + } + +} + +func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithFilerClient(filer, broker.grpcDialOption, fn) + +} + +func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { + + return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return fn(client) + }) + +} diff --git a/weed/server/queue_server.go b/weed/server/queue_server.go deleted file mode 100644 index 078c76a30..000000000 --- a/weed/server/queue_server.go +++ /dev/null @@ -1,49 +0,0 @@ -package weed_server - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type QueueServerOption struct { - Filers []string - DefaultReplication string - MaxMB int - Port int -} - -type QueueServer struct { - option *QueueServerOption - grpcDialOption grpc.DialOption -} - -func (q *QueueServer) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) { - panic("implement me") -} - -func (q *QueueServer) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) { - panic("implement me") -} - -func (q *QueueServer) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error { - panic("implement me") -} - -func (q *QueueServer) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error { - panic("implement me") -} - -func NewQueueServer(option *QueueServerOption) (qs *QueueServer, err error) { - - qs = &QueueServer{ - option: option, - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.queue"), - } - - return qs, nil -} diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 53289f1c1..0381c7feb 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,8 +2,6 @@ package weed_server import ( "encoding/json" - "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" "io/ioutil" "os" "path" @@ -11,7 +9,12 @@ import ( "sort" "time" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -61,7 +64,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d s.raftServer.Start() for _, peer := range s.peers { - s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer)) + s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)) } s.GrpcServer = raft.NewGrpcServer(s.raftServer) @@ -72,7 +75,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ Name: s.raftServer.Name(), - ConnectionString: util.ServerToGrpcAddress(s.serverAddr), + ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), }) if err != nil { diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 2168afee7..1f4d9df10 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -36,7 +37,7 @@ func (vs *VolumeServer) heartbeat() { if newLeader != "" { master = newLeader } - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master) + masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master) if parseErr != nil { glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr) continue @@ -55,7 +56,7 @@ func (vs *VolumeServer) heartbeat() { func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := util.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) + grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index ddd611724..a07f6be01 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -98,7 +99,7 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 6c31ebdff..ca2f22b57 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -4,11 +4,9 @@ import ( "fmt" "io" - "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -82,10 +80,7 @@ func duTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir, func (env *CommandEnv) withFilerClient(filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000) - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, filerGrpcAddress, env.option.GrpcDialOption) + return pb.WithGrpcFilerClient(filerGrpcAddress, env.option.GrpcDialOption, fn) } diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 0cf161a63..301f20615 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -2,15 +2,14 @@ package wdclient import ( "context" - "fmt" "math/rand" "time" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/util" ) type MasterClient struct { @@ -67,7 +66,7 @@ func (mc *MasterClient) tryAllMasters() { func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) { glog.V(1).Infof("%s Connecting to master %v", mc.name, master) - gprcErr := withMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { stream, err := client.KeepConnected(context.Background()) if err != nil { @@ -119,22 +118,8 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri return } -func withMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { - - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master) - if parseErr != nil { - return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr) - } - - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := master_pb.NewSeaweedClient(grpcConnection) - return fn(client) - }, masterGrpcAddress, grpcDialOption) - -} - func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error { - return withMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) }) }