mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #1219 from song-zhang/master
schedule new volume by free volume number of nodes
This commit is contained in:
commit
e3b8bf5588
|
@ -62,56 +62,64 @@ type NodeImpl struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
|
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
|
||||||
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
|
func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
|
||||||
candidates := make([]Node, 0, len(n.children))
|
var totalWeights int64
|
||||||
var errs []string
|
var errs []string
|
||||||
n.RLock()
|
n.RLock()
|
||||||
|
candidates := make([]Node, 0, len(n.children))
|
||||||
|
candidatesWeights := make([]int64, 0, len(n.children))
|
||||||
|
//pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
|
||||||
for _, node := range n.children {
|
for _, node := range n.children {
|
||||||
|
if node.FreeSpace() <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
totalWeights += node.FreeSpace()
|
||||||
|
candidates = append(candidates, node)
|
||||||
|
candidatesWeights = append(candidatesWeights, node.FreeSpace())
|
||||||
|
}
|
||||||
|
n.RUnlock()
|
||||||
|
if len(candidates) < numberOfNodes {
|
||||||
|
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
|
||||||
|
return nil, nil, errors.New("No enough data node found!")
|
||||||
|
}
|
||||||
|
|
||||||
|
//pick nodes randomly by weights, the node picked earlier has higher final weights
|
||||||
|
sortedCandidates := make([]Node, 0, len(candidates))
|
||||||
|
for i:=0; i<len(candidates); i++ {
|
||||||
|
weightsInterval := rand.Int63n(totalWeights)
|
||||||
|
lastWeights := int64(0)
|
||||||
|
for k, weights := range candidatesWeights {
|
||||||
|
if (weightsInterval>=lastWeights) && (weightsInterval<lastWeights + weights) {
|
||||||
|
sortedCandidates = append(sortedCandidates, candidates[k])
|
||||||
|
candidatesWeights[k] = 0
|
||||||
|
totalWeights -= weights
|
||||||
|
break
|
||||||
|
}
|
||||||
|
lastWeights += weights
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
restNodes = make([]Node, 0, numberOfNodes-1)
|
||||||
|
ret := false
|
||||||
|
n.RLock()
|
||||||
|
for k, node := range sortedCandidates {
|
||||||
if err := filterFirstNodeFn(node); err == nil {
|
if err := filterFirstNodeFn(node); err == nil {
|
||||||
candidates = append(candidates, node)
|
firstNode = node
|
||||||
|
if k >= numberOfNodes-1 {
|
||||||
|
restNodes = sortedCandidates[:numberOfNodes-1]
|
||||||
|
} else {
|
||||||
|
restNodes = append(restNodes, sortedCandidates[:k]...)
|
||||||
|
restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
|
||||||
|
}
|
||||||
|
ret = true
|
||||||
|
break
|
||||||
} else {
|
} else {
|
||||||
errs = append(errs, string(node.Id())+":"+err.Error())
|
errs = append(errs, string(node.Id())+":"+err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
n.RUnlock()
|
n.RUnlock()
|
||||||
if len(candidates) == 0 {
|
|
||||||
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
|
|
||||||
}
|
|
||||||
firstNode = candidates[rand.Intn(len(candidates))]
|
|
||||||
glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
|
|
||||||
|
|
||||||
restNodes = make([]Node, numberOfNodes-1)
|
|
||||||
candidates = candidates[:0]
|
|
||||||
n.RLock()
|
|
||||||
for _, node := range n.children {
|
|
||||||
if node.Id() == firstNode.Id() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if node.FreeSpace() <= 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
glog.V(2).Infoln("select rest node candidate:", node.Id())
|
|
||||||
candidates = append(candidates, node)
|
|
||||||
}
|
|
||||||
n.RUnlock()
|
|
||||||
glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
|
|
||||||
ret := len(restNodes) == 0
|
|
||||||
for k, node := range candidates {
|
|
||||||
if k < len(restNodes) {
|
|
||||||
restNodes[k] = node
|
|
||||||
if k == len(restNodes)-1 {
|
|
||||||
ret = true
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
r := rand.Intn(k + 1)
|
|
||||||
if r < len(restNodes) {
|
|
||||||
restNodes[r] = node
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !ret {
|
if !ret {
|
||||||
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
|
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
|
||||||
err = errors.New("No enough data node found!")
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo
|
||||||
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
|
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
|
||||||
//find main datacenter and other data centers
|
//find main datacenter and other data centers
|
||||||
rp := option.ReplicaPlacement
|
rp := option.ReplicaPlacement
|
||||||
mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
|
mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, func(node Node) error {
|
||||||
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
|
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
|
||||||
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
|
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
|
||||||
}
|
}
|
||||||
|
|
||||||
//find main rack and other racks
|
//find main rack and other racks
|
||||||
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
|
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, func(node Node) error {
|
||||||
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
|
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
|
||||||
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
|
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
|
||||||
}
|
}
|
||||||
|
@ -171,7 +171,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
|
||||||
}
|
}
|
||||||
|
|
||||||
//find main rack and other racks
|
//find main rack and other racks
|
||||||
mainServer, otherServers, serverErr := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
|
mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, func(node Node) error {
|
||||||
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
|
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
|
||||||
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
|
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,3 +253,90 @@ func TestReplication011(t *testing.T) {
|
||||||
fmt.Println("assigned node :", server.Id())
|
fmt.Println("assigned node :", server.Id())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var topologyLayout3 = `
|
||||||
|
{
|
||||||
|
"dc1":{
|
||||||
|
"rack1":{
|
||||||
|
"server111":{
|
||||||
|
"volumes":[],
|
||||||
|
"limit":2000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"dc2":{
|
||||||
|
"rack2":{
|
||||||
|
"server222":{
|
||||||
|
"volumes":[],
|
||||||
|
"limit":2000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"dc3":{
|
||||||
|
"rack3":{
|
||||||
|
"server333":{
|
||||||
|
"volumes":[],
|
||||||
|
"limit":1000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"dc4":{
|
||||||
|
"rack4":{
|
||||||
|
"server444":{
|
||||||
|
"volumes":[],
|
||||||
|
"limit":1000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"dc5":{
|
||||||
|
"rack5":{
|
||||||
|
"server555":{
|
||||||
|
"volumes":[],
|
||||||
|
"limit":500
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"dc6":{
|
||||||
|
"rack6":{
|
||||||
|
"server666":{
|
||||||
|
"volumes":[],
|
||||||
|
"limit":500
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
|
||||||
|
topo := setup(topologyLayout3)
|
||||||
|
vg := NewDefaultVolumeGrowth()
|
||||||
|
rp, _ := super_block.NewReplicaPlacementFromString("100")
|
||||||
|
volumeGrowOption := &VolumeGrowOption{
|
||||||
|
Collection: "Weight",
|
||||||
|
ReplicaPlacement: rp,
|
||||||
|
DataCenter: "",
|
||||||
|
Rack: "",
|
||||||
|
DataNode: "",
|
||||||
|
}
|
||||||
|
|
||||||
|
distribution := map[NodeId]int{}
|
||||||
|
// assign 1000 volumes
|
||||||
|
for i:=0;i<1000 ;i++ {
|
||||||
|
servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("finding empty slots error :", err)
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
for _, server := range servers {
|
||||||
|
fmt.Println("assigned node :", server.Id())
|
||||||
|
if _, ok := distribution[server.id]; !ok {
|
||||||
|
distribution[server.id] = 0
|
||||||
|
}
|
||||||
|
distribution[server.id] += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range distribution {
|
||||||
|
fmt.Println(k, "%s : %d", k, v)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue