seaweedfs/weed/topology/node.go

301 lines
8.3 KiB
Go
Raw Permalink Normal View History

2012-08-24 03:56:09 +00:00
package topology
2012-08-24 05:56:14 +00:00
import (
"errors"
"math/rand"
"strings"
2016-05-20 06:57:31 +00:00
"sync"
"sync/atomic"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
2019-04-19 04:43:36 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2012-08-24 05:56:14 +00:00
)
2012-08-24 03:56:09 +00:00
type NodeId string
type Node interface {
Id() NodeId
String() string
FreeSpace() int64
ReserveOneVolume(r int64) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
UpAdjustVolumeCountDelta(volumeCountDelta int64)
UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
UpAdjustEcShardCountDelta(ecShardCountDelta int64)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
2019-04-19 04:43:36 +00:00
UpAdjustMaxVolumeId(vid needle.VolumeId)
2012-10-10 03:53:31 +00:00
GetVolumeCount() int64
GetEcShardCount() int64
GetActiveVolumeCount() int64
GetRemoteVolumeCount() int64
GetMaxVolumeCount() int64
2019-04-19 04:43:36 +00:00
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
2012-09-18 21:05:12 +00:00
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
2012-09-03 08:50:04 +00:00
2012-09-08 23:25:44 +00:00
IsDataNode() bool
IsRack() bool
IsDataCenter() bool
2016-05-20 06:57:31 +00:00
Children() []Node
Parent() Node
2012-10-10 03:53:31 +00:00
GetValue() interface{} //get reference to the topology,dc,rack,datanode
2012-08-24 03:56:09 +00:00
}
type NodeImpl struct {
volumeCount int64
remoteVolumeCount int64
activeVolumeCount int64
ecShardCount int64
maxVolumeCount int64
id NodeId
parent Node
2019-06-11 04:33:32 +00:00
sync.RWMutex // lock children
children map[NodeId]Node
maxVolumeId needle.VolumeId
//for rack, data center, topology
nodeType string
2012-10-10 03:53:31 +00:00
value interface{}
2012-09-01 09:20:59 +00:00
}
2012-08-31 08:35:11 +00:00
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
candidates := make([]Node, 0, len(n.children))
2015-03-10 07:20:31 +00:00
var errs []string
2016-05-20 06:57:31 +00:00
n.RLock()
for _, node := range n.children {
if err := filterFirstNodeFn(node); err == nil {
candidates = append(candidates, node)
} else {
errs = append(errs, string(node.Id())+":"+err.Error())
}
}
2016-05-20 06:57:31 +00:00
n.RUnlock()
if len(candidates) == 0 {
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
}
firstNode = candidates[rand.Intn(len(candidates))]
glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
restNodes = make([]Node, numberOfNodes-1)
candidates = candidates[:0]
2016-05-20 06:57:31 +00:00
n.RLock()
for _, node := range n.children {
if node.Id() == firstNode.Id() {
continue
}
if node.FreeSpace() <= 0 {
continue
}
glog.V(2).Infoln("select rest node candidate:", node.Id())
candidates = append(candidates, node)
}
2016-05-20 06:57:31 +00:00
n.RUnlock()
glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
ret := len(restNodes) == 0
for k, node := range candidates {
if k < len(restNodes) {
restNodes[k] = node
if k == len(restNodes)-1 {
ret = true
}
} else {
r := rand.Intn(k + 1)
if r < len(restNodes) {
restNodes[r] = node
}
}
}
if !ret {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
2019-01-17 01:17:19 +00:00
err = errors.New("No enough data node found!")
}
return
}
2012-09-08 23:25:44 +00:00
func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode"
}
func (n *NodeImpl) IsRack() bool {
return n.nodeType == "Rack"
}
func (n *NodeImpl) IsDataCenter() bool {
return n.nodeType == "DataCenter"
}
func (n *NodeImpl) String() string {
if n.parent != nil {
return n.parent.String() + ":" + string(n.id)
}
return string(n.id)
}
func (n *NodeImpl) Id() NodeId {
return n.id
}
func (n *NodeImpl) FreeSpace() int64 {
freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
if n.ecShardCount > 0 {
freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
}
return freeVolumeSlotCount
}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
}
2016-05-20 06:57:31 +00:00
func (n *NodeImpl) Children() (ret []Node) {
n.RLock()
defer n.RUnlock()
for _, c := range n.children {
ret = append(ret, c)
}
return ret
2012-09-03 08:50:04 +00:00
}
func (n *NodeImpl) Parent() Node {
return n.parent
2012-09-03 08:50:04 +00:00
}
2012-10-10 03:53:31 +00:00
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
2016-05-20 06:57:31 +00:00
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
freeSpace := node.FreeSpace()
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
}
2012-09-01 09:20:59 +00:00
if r >= freeSpace {
r -= freeSpace
} else {
2012-09-08 23:25:44 +00:00
if node.IsDataNode() && node.FreeSpace() > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return node.(*DataNode), nil
2012-09-03 08:50:04 +00:00
}
assignedNode, err = node.ReserveOneVolume(r)
2018-01-17 11:53:41 +00:00
if err == nil {
return
}
}
}
2017-08-10 17:26:19 +00:00
return nil, errors.New("No free volume slot found!")
}
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
2012-08-31 08:35:11 +00:00
}
}
func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
atomic.AddInt64(&n.volumeCount, volumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
if n.parent != nil {
n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
}
}
func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
}
}
2019-04-19 04:43:36 +00:00
func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil {
n.parent.UpAdjustMaxVolumeId(vid)
}
}
2012-09-01 09:20:59 +00:00
}
2019-04-19 04:43:36 +00:00
func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
2012-08-31 08:35:11 +00:00
return n.maxVolumeId
2012-08-29 08:37:40 +00:00
}
func (n *NodeImpl) GetVolumeCount() int64 {
return n.volumeCount
}
func (n *NodeImpl) GetEcShardCount() int64 {
return n.ecShardCount
}
func (n *NodeImpl) GetRemoteVolumeCount() int64 {
return n.remoteVolumeCount
}
func (n *NodeImpl) GetActiveVolumeCount() int64 {
return n.activeVolumeCount
}
func (n *NodeImpl) GetMaxVolumeCount() int64 {
return n.maxVolumeCount
}
2012-08-29 08:37:40 +00:00
func (n *NodeImpl) LinkChildNode(node Node) {
2016-05-20 06:57:31 +00:00
n.Lock()
defer n.Unlock()
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
}
}
func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
2016-05-20 06:57:31 +00:00
n.Lock()
defer n.Unlock()
2012-09-01 09:20:59 +00:00
node := n.children[nodeId]
if node != nil {
node.SetParent(nil)
delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
glog.V(0).Infoln(n, "removes", node.Id())
}
}
2012-09-18 21:05:12 +00:00
func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
2016-05-20 06:32:56 +00:00
for _, v := range dn.GetVolumes() {
if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- v
}
}
}
} else {
for _, c := range n.Children() {
2012-09-18 21:05:12 +00:00
c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
}
}
2012-09-18 21:05:12 +00:00
}
2012-10-10 03:53:31 +00:00
func (n *NodeImpl) GetTopology() *Topology {
var p Node
p = n
for p.Parent() != nil {
p = p.Parent()
}
return p.GetValue().(*Topology)
}