add brokers

This commit is contained in:
Chris Lu 2021-11-09 08:50:55 -08:00
parent 59d1435d65
commit 1d4a61af5e

View file

@ -29,12 +29,15 @@ type Cluster struct {
filers map[pb.ServerAddress]*ClusterNode filers map[pb.ServerAddress]*ClusterNode
filersLock sync.RWMutex filersLock sync.RWMutex
filerLeaders *Leaders filerLeaders *Leaders
brokers map[pb.ServerAddress]*ClusterNode
brokersLock sync.RWMutex
} }
func NewCluster() *Cluster { func NewCluster() *Cluster {
return &Cluster{ return &Cluster{
filers: make(map[pb.ServerAddress]*ClusterNode), filers: make(map[pb.ServerAddress]*ClusterNode),
filerLeaders: &Leaders{}, filerLeaders: &Leaders{},
brokers: make(map[pb.ServerAddress]*ClusterNode),
} }
} }
@ -54,6 +57,28 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress
createdTs: time.Now(), createdTs: time.Now(),
} }
return cluster.ensureFilerLeaders(true, nodeType, address) return cluster.ensureFilerLeaders(true, nodeType, address)
case BrokerType:
cluster.brokersLock.Lock()
defer cluster.brokersLock.Unlock()
if existingNode, found := cluster.brokers[address]; found {
existingNode.counter++
return nil
}
cluster.brokers[address] = &ClusterNode{
Address: address,
Version: version,
counter: 1,
createdTs: time.Now(),
}
return []*master_pb.KeepConnectedResponse{
{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsAdd: true,
},
},
}
case MasterType: case MasterType:
} }
return nil return nil
@ -73,6 +98,26 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr
return cluster.ensureFilerLeaders(false, nodeType, address) return cluster.ensureFilerLeaders(false, nodeType, address)
} }
} }
case BrokerType:
cluster.brokersLock.Lock()
defer cluster.brokersLock.Unlock()
if existingNode, found := cluster.brokers[address]; !found {
return nil
} else {
existingNode.counter--
if existingNode.counter <= 0 {
delete(cluster.brokers, address)
return []*master_pb.KeepConnectedResponse{
{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsAdd: false,
},
},
}
}
}
case MasterType: case MasterType:
} }
return nil return nil
@ -86,6 +131,12 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode)
for _, node := range cluster.filers { for _, node := range cluster.filers {
nodes = append(nodes, node) nodes = append(nodes, node)
} }
case BrokerType:
cluster.brokersLock.RLock()
defer cluster.brokersLock.RUnlock()
for _, node := range cluster.brokers {
nodes = append(nodes, node)
}
case MasterType: case MasterType:
} }
return return