shell: optionally read filer address from master

This commit is contained in:
Chris Lu 2021-11-02 23:38:45 -07:00
parent 18bfbf62fc
commit 5160eb08f7
10 changed files with 689 additions and 484 deletions

View file

@ -37,7 +37,7 @@ func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false) util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
if *shellOptions.Masters == "" && *shellInitialFiler == "" { if *shellOptions.Masters == "" {
util.LoadConfiguration("shell", false) util.LoadConfiguration("shell", false)
v := util.GetViper() v := util.GetViper()
cluster := v.GetString("cluster.default") cluster := v.GetString("cluster.default")
@ -45,14 +45,15 @@ func runShell(command *Command, args []string) bool {
cluster = *shellCluster cluster = *shellCluster
} }
if cluster == "" { if cluster == "" {
*shellOptions.Masters, *shellInitialFiler = "localhost:9333", "localhost:8888" *shellOptions.Masters = "localhost:9333"
fmt.Printf("master: %s\n", *shellOptions.Masters)
} else { } else {
*shellOptions.Masters = v.GetString("cluster." + cluster + ".master") *shellOptions.Masters = v.GetString("cluster." + cluster + ".master")
*shellInitialFiler = v.GetString("cluster." + cluster + ".filer") *shellInitialFiler = v.GetString("cluster." + cluster + ".filer")
}
}
fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler) fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler)
}
}
shellOptions.FilerAddress = pb.ServerAddress(*shellInitialFiler) shellOptions.FilerAddress = pb.ServerAddress(*shellInitialFiler)
shellOptions.Directory = "/" shellOptions.Directory = "/"

View file

@ -93,14 +93,16 @@ func (broker *MessageBroker) checkFilers() {
for !found { for !found {
for _, master := range masters { for _, master := range masters {
err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: "filer", ClientType: "filer",
}) })
if err != nil { if err != nil {
return err return err
} }
filers = append(filers, pb.FromAddressStrings(resp.GrpcAddresses)...) for _, clusterNode := range resp.ClusterNodes {
filers = append(filers, pb.ServerAddress(clusterNode.Address))
}
return nil return nil
}) })

View file

@ -29,7 +29,7 @@ service Seaweed {
} }
rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) { rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) {
} }
rpc ListMasterClients (ListMasterClientsRequest) returns (ListMasterClientsResponse) { rpc ListClusterNodes (ListClusterNodesRequest) returns (ListClusterNodesResponse) {
} }
rpc LeaseAdminToken (LeaseAdminTokenRequest) returns (LeaseAdminTokenResponse) { rpc LeaseAdminToken (LeaseAdminTokenRequest) returns (LeaseAdminTokenResponse) {
} }
@ -127,8 +127,9 @@ message SuperBlockExtra {
} }
message KeepConnectedRequest { message KeepConnectedRequest {
string name = 1; string client_type = 1;
string client_address = 3; string client_address = 3;
string version = 4;
} }
message VolumeLocation { message VolumeLocation {
@ -284,11 +285,15 @@ message GetMasterConfigurationResponse {
bool volume_preallocate = 7; bool volume_preallocate = 7;
} }
message ListMasterClientsRequest { message ListClusterNodesRequest {
string client_type = 1; string client_type = 1;
} }
message ListMasterClientsResponse { message ListClusterNodesResponse {
repeated string grpc_addresses = 1; message ClusterNode {
string address = 1;
string version = 2;
}
repeated ClusterNode cluster_nodes = 1;
} }
message LeaseAdminTokenRequest { message LeaseAdminTokenRequest {

File diff suppressed because it is too large Load diff

View file

@ -6,7 +6,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"net" "net"
"strings"
"time" "time"
"github.com/chrislusf/raft" "github.com/chrislusf/raft"
@ -195,9 +194,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
// buffer by 1 so we don't end up getting stuck writing to stopChan forever // buffer by 1 so we don't end up getting stuck writing to stopChan forever
stopChan := make(chan bool, 1) stopChan := make(chan bool, 1)
clientName, messageChan := ms.addClient(req.Name, peerAddress) clientName, messageChan := ms.addClient(req.ClientType, peerAddress)
ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version)
defer ms.deleteClient(clientName) defer func() {
ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress)
ms.deleteClient(clientName)
}()
for _, message := range ms.Topo.ToVolumeLocations() { for _, message := range ms.Topo.ToVolumeLocations() {
if sendErr := stream.Send(message); sendErr != nil { if sendErr := stream.Send(message); sendErr != nil {
@ -295,19 +298,6 @@ func findClientAddress(ctx context.Context, grpcPort uint32) string {
} }
func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) {
resp := &master_pb.ListMasterClientsResponse{}
ms.clientChansLock.RLock()
defer ms.clientChansLock.RUnlock()
for k := range ms.clientChans {
if strings.HasPrefix(k, req.ClientType+"@") {
resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:])
}
}
return resp, nil
}
func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
// tell the volume servers about the leader // tell the volume servers about the leader

View file

@ -0,0 +1,20 @@
package weed_server
import (
"context"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) {
resp := &master_pb.ListClusterNodesResponse{}
clusterNodes := ms.Cluster.ListClusterNode(req.ClientType)
for _, node := range clusterNodes {
resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
Address: string(node.address),
Version: node.version,
})
}
return resp, nil
}

View file

@ -67,6 +67,8 @@ type MasterServer struct {
MasterClient *wdclient.MasterClient MasterClient *wdclient.MasterClient
adminLocks *AdminLocks adminLocks *AdminLocks
Cluster *Cluster
} }
func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer { func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer {
@ -103,6 +105,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers), MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers),
adminLocks: NewAdminLocks(), adminLocks: NewAdminLocks(),
Cluster: NewCluster(),
} }
ms.boundedLeaderChan = make(chan int, 16) ms.boundedLeaderChan = make(chan int, 16)

View file

@ -0,0 +1,70 @@
package weed_server
import (
"github.com/chrislusf/seaweedfs/weed/pb"
"sync"
)
type NodeType int
const (
filerNodeType NodeType = iota
)
type ClusterNode struct {
address pb.ServerAddress
version string
}
type Cluster struct {
filers map[pb.ServerAddress]*ClusterNode
filersLock sync.RWMutex
}
func NewCluster() *Cluster {
return &Cluster{
filers: make(map[pb.ServerAddress]*ClusterNode),
}
}
func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) {
switch nodeType {
case "filer":
cluster.filersLock.Lock()
defer cluster.filersLock.Unlock()
if _, found := cluster.filers[address]; found {
return
}
cluster.filers[address] = &ClusterNode{
address: address,
version: version,
}
case "master":
}
}
func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) {
switch nodeType {
case "filer":
cluster.filersLock.Lock()
defer cluster.filersLock.Unlock()
if _, found := cluster.filers[address]; !found {
return
}
delete(cluster.filers, address)
case "master":
}
}
func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode){
switch nodeType {
case "filer":
cluster.filersLock.RLock()
defer cluster.filersLock.RUnlock()
for _, node := range cluster.filers {
nodes = append(nodes, node)
}
case "master":
}
return
}

View file

@ -3,9 +3,12 @@ package shell
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/util/grace"
"io" "io"
"math/rand"
"os" "os"
"path" "path"
"regexp" "regexp"
@ -47,6 +50,27 @@ func RunShell(options ShellOptions) {
go commandEnv.MasterClient.KeepConnectedToMaster() go commandEnv.MasterClient.KeepConnectedToMaster()
commandEnv.MasterClient.WaitUntilConnected() commandEnv.MasterClient.WaitUntilConnected()
if commandEnv.option.FilerAddress == "" {
var filers []pb.ServerAddress
commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: "filer",
})
if err != nil {
return err
}
for _, clusterNode := range resp.ClusterNodes {
filers = append(filers, pb.ServerAddress(clusterNode.Address))
}
return nil
})
if len(filers) > 0 {
fmt.Printf("filers: %v\n", filers)
commandEnv.option.FilerAddress = filers[rand.Intn(len(filers))]
}
}
if commandEnv.option.FilerAddress != "" { if commandEnv.option.FilerAddress != "" {
commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})

View file

@ -105,7 +105,11 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
return err return err
} }
if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.clientType, ClientAddress: string(mc.clientHost)}); err != nil { if err = stream.Send(&master_pb.KeepConnectedRequest{
ClientType: mc.clientType,
ClientAddress: string(mc.clientHost),
Version: util.Version(),
}); err != nil {
glog.V(0).Infof("%s masterClient failed to send to %s: %v", mc.clientType, master, err) glog.V(0).Infof("%s masterClient failed to send to %s: %v", mc.clientType, master, err)
return err return err
} }