diff --git a/weed/cluster/master_client.go b/weed/cluster/master_client.go new file mode 100644 index 000000000..b9b1673f3 --- /dev/null +++ b/weed/cluster/master_client.go @@ -0,0 +1,34 @@ +package cluster + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" +) + +func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOption, filerGroup string, clientType string) (existingNodes []*master_pb.ClusterNodeUpdate) { + + if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: clientType, + FilerGroup: filerGroup, + }) + + glog.V(0).Infof("the cluster has %d %s\n", len(resp.ClusterNodes), clientType) + for _, node := range resp.ClusterNodes { + existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ + NodeType: FilerType, + Address: node.Address, + IsLeader: node.IsLeader, + IsAdd: true, + CreatedAtNs: node.CreatedAtNs, + }) + } + return err + }); grpcErr != nil { + glog.V(0).Infof("connect to %s: %v", master, grpcErr) + } + return +} diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go index cce77bf8e..1b31d0141 100644 --- a/weed/command/mq_broker.go +++ b/weed/command/mq_broker.go @@ -1,10 +1,6 @@ package command import ( - "context" - "fmt" - "time" - "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/util/grace" @@ -12,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/mq/broker" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -26,7 +21,6 @@ type MessageQueueBrokerOptions struct { masters map[string]pb.ServerAddress mastersString *string filerGroup *string - filer *string ip *string port *int dataCenter *string @@ -38,7 +32,6 @@ type MessageQueueBrokerOptions struct { func init() { cmdMqBroker.Run = runMqBroker // break init cycle mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers") - mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address") mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") @@ -73,40 +66,17 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile) - filerAddress := pb.ServerAddress(*mqBrokerOpt.filer) - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") - cipher := false - - for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) - } - cipher = resp.Cipher - return nil - }) - if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress()) - time.Sleep(time.Second) - } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress()) - break - } - } qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{ Masters: mqBrokerOpt.masters, FilerGroup: *mqBrokerOpt.filerGroup, DataCenter: *mqBrokerOpt.dataCenter, Rack: *mqBrokerOpt.rack, - Filers: []pb.ServerAddress{filerAddress}, DefaultReplication: "", MaxMB: 0, Ip: *mqBrokerOpt.ip, Port: *mqBrokerOpt.port, - Cipher: cipher, }, grpcDialOption) // start grpc listener diff --git a/weed/command/server.go b/weed/command/server.go index 04b07ed51..b993d9428 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -228,7 +228,6 @@ func runServer(cmd *Command, args []string) bool { s3Options.filer = &filerAddress iamOptions.filer = &filerAddress webdavOptions.filer = &filerAddress - mqBrokerOptions.filer = &filerAddress mqBrokerOptions.filerGroup = filerOptions.filerGroup go stats_collect.StartMetricsServer(*serverMetricsHttpPort) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index b1582bd30..386d27898 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -99,28 +99,7 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste } func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) { - - if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { - resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: cluster.FilerType, - FilerGroup: f.MasterClient.FilerGroup, - }) - - glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes)) - for _, node := range resp.ClusterNodes { - existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ - NodeType: cluster.FilerType, - Address: node.Address, - IsLeader: node.IsLeader, - IsAdd: true, - CreatedAtNs: node.CreatedAtNs, - }) - } - return err - }); grpcErr != nil { - glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr) - } - return + return cluster.ListExistingPeerUpdates(f.GetMaster(), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType) } func (f *Filer) SetStore(store FilerStore) (isFresh bool) { diff --git a/weed/mq/broker/broker_grpc_server_discovery.go b/weed/mq/broker/broker_grpc_server_discovery.go deleted file mode 100644 index 94e89cd41..000000000 --- a/weed/mq/broker/broker_grpc_server_discovery.go +++ /dev/null @@ -1,72 +0,0 @@ -package broker - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/cluster" - "github.com/chrislusf/seaweedfs/weed/pb" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" -) - -func (broker *MessageQueueBroker) checkFilers() { - - // contact a filer about masters - var masters []pb.ServerAddress - found := false - for !found { - for _, filer := range broker.option.Filers { - err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return err - } - for _, m := range resp.Masters { - masters = append(masters, pb.ServerAddress(m)) - } - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received master list: %s", masters) - - // contact each masters for filers - var filers []pb.ServerAddress - found = false - for !found { - for _, master := range masters { - err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error { - resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: cluster.FilerType, - }) - if err != nil { - return err - } - - for _, clusterNode := range resp.ClusterNodes { - filers = append(filers, pb.ServerAddress(clusterNode.Address)) - } - - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to list filers: %v", err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received filer list: %s", filers) - - broker.option.Filers = filers - -} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 64ea7c666..f940b00c3 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -5,6 +5,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "github.com/chrislusf/seaweedfs/weed/wdclient" "google.golang.org/grpc" + "time" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -16,7 +17,6 @@ type MessageQueueBrokerOption struct { FilerGroup string DataCenter string Rack string - Filers []pb.ServerAddress DefaultReplication string MaxMB int Ip string @@ -29,6 +29,8 @@ type MessageQueueBroker struct { option *MessageQueueBrokerOption grpcDialOption grpc.DialOption MasterClient *wdclient.MasterClient + filers map[pb.ServerAddress]struct{} + currentFiler pb.ServerAddress } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { @@ -37,15 +39,47 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial option: option, grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), + filers: make(map[pb.ServerAddress]struct{}), } - - mqBroker.checkFilers() + mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate go mqBroker.MasterClient.KeepConnectedToMaster() + existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType) + for _, newNode := range existingNodes { + mqBroker.OnBrokerUpdate(newNode, time.Now()) + } + return mqBroker, nil } +func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.FilerType { + return + } + + address := pb.ServerAddress(update.Address) + if update.IsAdd { + broker.filers[address] = struct{}{} + if broker.currentFiler == "" { + broker.currentFiler = address + } + } else { + delete(broker.filers, address) + if broker.currentFiler == address { + for filer, _ := range broker.filers { + broker.currentFiler = filer + break + } + } + } + +} + +func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress { + return broker.currentFiler +} + func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)