refactor: change masters from a slice to a map

This commit is contained in:
chrislu 2022-03-26 13:33:17 -07:00
parent 4ba7127ab1
commit 21e0898631
15 changed files with 36 additions and 21 deletions

View file

@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddresses()) b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
go b.masterClient.KeepConnectedToMaster() go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected() b.masterClient.WaitUntilConnected()

View file

@ -29,7 +29,7 @@ var (
) )
type FilerOptions struct { type FilerOptions struct {
masters []pb.ServerAddress masters map[string]pb.ServerAddress
mastersString *string mastersString *string
ip *string ip *string
bindIp *string bindIp *string
@ -171,7 +171,7 @@ func runFiler(cmd *Command, args []string) bool {
}() }()
} }
f.masters = pb.ServerAddresses(*f.mastersString).ToAddresses() f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
f.startFiler() f.startFiler()

View file

@ -67,7 +67,7 @@ func (iamopt *IamOptions) startIamServer() bool {
} }
} }
masters := pb.ServerAddresses(*iamopt.masters).ToAddresses() masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap()
router := mux.NewRouter().SkipClean(true) router := mux.NewRouter().SkipClean(true)
_, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ _, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
Masters: masters, Masters: masters,

View file

@ -132,8 +132,13 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers) myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers)
masterPeers := make(map[string]pb.ServerAddress)
for _, peer := range peers {
masterPeers[peer.String()] = peer
}
r := mux.NewRouter() r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers)
listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port) listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress) glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0) masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0)
@ -141,11 +146,6 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("Master startup error: %v", e) glog.Fatalf("Master startup error: %v", e)
} }
masterPeers := make(map[string]pb.ServerAddress)
for _, peer := range peers {
masterPeers[peer.String()] = peer
}
// start raftServer // start raftServer
raftServerOption := &weed_server.RaftServerOption{ raftServerOption := &weed_server.RaftServerOption{
GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"), GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),

View file

@ -83,7 +83,7 @@ func runMasterFollower(cmd *Command, args []string) bool {
func startMasterFollower(masterOptions MasterOptions) { func startMasterFollower(masterOptions MasterOptions) {
// collect settings from main masters // collect settings from main masters
masters := pb.ServerAddresses(*mf.peers).ToAddresses() masters := pb.ServerAddresses(*mf.peers).ToAddressMap()
var err error var err error
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")

View file

@ -191,7 +191,7 @@ func runServer(cmd *Command, args []string) bool {
// ip address // ip address
masterOptions.ip = serverIp masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp masterOptions.ipBind = serverBindIp
filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses() filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap()
filerOptions.ip = serverIp filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp filerOptions.bindIp = serverBindIp
s3Options.bindIp = serverBindIp s3Options.bindIp = serverBindIp

View file

@ -49,7 +49,7 @@ type Filer struct {
UniqueFileId uint32 UniqueFileId uint32
} }
func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption,
filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer { filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{ f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.FilerType, filerHost, dataCenter, masters), MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.FilerType, filerHost, dataCenter, masters),

View file

@ -33,7 +33,7 @@ type IamS3ApiConfigure struct {
} }
type IamServerOption struct { type IamServerOption struct {
Masters []pb.ServerAddress Masters map[string]pb.ServerAddress
Filer pb.ServerAddress Filer pb.ServerAddress
Port int Port int
GrpcDialOption grpc.DialOption GrpcDialOption grpc.DialOption

View file

@ -206,7 +206,7 @@ func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption g
} }
func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
for _, masterGrpcAddress := range masterGrpcAddresses { for _, masterGrpcAddress := range masterGrpcAddresses {
err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {

View file

@ -86,6 +86,14 @@ func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) {
return return
} }
func (sa ServerAddresses) ToAddressMap() (addresses map[string]ServerAddress) {
addresses = make(map[string]ServerAddress)
for _, address := range sa.ToAddresses() {
addresses[address.String()] = address
}
return
}
func (sa ServerAddresses) ToAddressStrings() (addresses []string) { func (sa ServerAddresses) ToAddressStrings() (addresses []string) {
parts := strings.Split(string(sa), ",") parts := strings.Split(string(sa), ",")
for _, address := range parts { for _, address := range parts {
@ -101,6 +109,13 @@ func ToAddressStrings(addresses []ServerAddress) []string {
} }
return strings return strings
} }
func ToAddressStringsFromMap(addresses map[string]ServerAddress) []string {
var strings []string
for _, addr := range addresses {
strings = append(strings, string(addr))
}
return strings
}
func FromAddressStrings(strings []string) []ServerAddress { func FromAddressStrings(strings []string) []ServerAddress {
var addresses []ServerAddress var addresses []ServerAddress
for _, addr := range strings { for _, addr := range strings {

View file

@ -393,7 +393,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
t := &filer_pb.GetFilerConfigurationResponse{ t := &filer_pb.GetFilerConfigurationResponse{
Masters: pb.ToAddressStrings(fs.option.Masters), Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
Collection: fs.option.Collection, Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication, Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB), MaxMb: uint32(fs.option.MaxMB),

View file

@ -48,7 +48,7 @@ import (
) )
type FilerOption struct { type FilerOption struct {
Masters []pb.ServerAddress Masters map[string]pb.ServerAddress
Collection string Collection string
DefaultReplication string DefaultReplication string
DisableDirListing bool DisableDirListing bool

View file

@ -75,7 +75,7 @@ type MasterServer struct {
Cluster *cluster.Cluster Cluster *cluster.Cluster
} }
func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer { func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
v := util.GetViper() v := util.GetViper()
signingKey := v.GetString("jwt.signing.key") signingKey := v.GetString("jwt.signing.key")

View file

@ -46,7 +46,7 @@ var (
func NewCommandEnv(options *ShellOptions) *CommandEnv { func NewCommandEnv(options *ShellOptions) *CommandEnv {
ce := &CommandEnv{ ce := &CommandEnv{
env: make(map[string]string), env: make(map[string]string),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()), MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()),
option: options, option: options,
} }
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "admin") ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "admin")

View file

@ -18,7 +18,7 @@ type MasterClient struct {
clientType string clientType string
clientHost pb.ServerAddress clientHost pb.ServerAddress
currentMaster pb.ServerAddress currentMaster pb.ServerAddress
masters []pb.ServerAddress masters map[string]pb.ServerAddress
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
vidMap vidMap
@ -26,7 +26,7 @@ type MasterClient struct {
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate) OnPeerUpdate func(update *master_pb.ClusterNodeUpdate)
} }
func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters []pb.ServerAddress) *MasterClient { func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters map[string]pb.ServerAddress) *MasterClient {
return &MasterClient{ return &MasterClient{
clientType: clientType, clientType: clientType,
clientHost: clientHost, clientHost: clientHost,