diff --git a/go/topology/node.go b/go/topology/node.go index 356c0abeb..6a84b4b92 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -4,6 +4,7 @@ import ( "errors" "math/rand" "strings" + "sync" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" @@ -32,7 +33,7 @@ type Node interface { IsDataNode() bool IsRack() bool IsDataCenter() bool - Children() map[NodeId]Node + Children() []Node Parent() Node GetValue() interface{} //get reference to the topology,dc,rack,datanode @@ -43,6 +44,7 @@ type NodeImpl struct { activeVolumeCount int maxVolumeCount int parent Node + sync.RWMutex // lock children children map[NodeId]Node maxVolumeId storage.VolumeId @@ -55,6 +57,7 @@ type NodeImpl struct { func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { candidates := make([]Node, 0, len(n.children)) var errs []string + n.RLock() for _, node := range n.children { if err := filterFirstNodeFn(node); err == nil { candidates = append(candidates, node) @@ -62,6 +65,7 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d errs = append(errs, string(node.Id())+":"+err.Error()) } } + n.RUnlock() if len(candidates) == 0 { return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) } @@ -70,6 +74,7 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d restNodes = make([]Node, numberOfNodes-1) candidates = candidates[:0] + n.RLock() for _, node := range n.children { if node.Id() == firstNode.Id() { continue @@ -80,6 +85,7 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d glog.V(2).Infoln("select rest node candidate:", node.Id()) candidates = append(candidates, node) } + n.RUnlock() glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") ret := len(restNodes) == 0 for k, node := range candidates { @@ -126,8 +132,13 @@ func (n *NodeImpl) FreeSpace() int { func (n *NodeImpl) SetParent(node Node) { n.parent = node } -func (n *NodeImpl) Children() map[NodeId]Node { - return n.children +func (n *NodeImpl) Children() (ret []Node) { + n.RLock() + defer n.RUnlock() + for _, c := range n.children { + ret = append(ret, c) + } + return ret } func (n *NodeImpl) Parent() Node { return n.parent @@ -136,6 +147,8 @@ func (n *NodeImpl) GetValue() interface{} { return n.value } func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { + n.RLock() + defer n.RUnlock() for _, node := range n.children { freeSpace := node.FreeSpace() // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) @@ -198,6 +211,8 @@ func (n *NodeImpl) GetMaxVolumeCount() int { } func (n *NodeImpl) LinkChildNode(node Node) { + n.Lock() + defer n.Unlock() if n.children[node.Id()] == nil { n.children[node.Id()] = node n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) @@ -210,6 +225,8 @@ func (n *NodeImpl) LinkChildNode(node Node) { } func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { + n.Lock() + defer n.Unlock() node := n.children[nodeId] if node != nil { node.SetParent(nil)