diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index c274dcf33..8dced9c44 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -4,12 +4,13 @@ import ( ) type DataCenter struct { - Node + NodeImpl ipRange IpRange } -func NewDataCenter(id NodeId) *DataCenter{ +func NewDataCenter(id string) *DataCenter{ dc := &DataCenter{} - dc.Node = *NewNode() - dc.Node.Id = id + dc.id = NodeId(id) + dc.nodeType = "DataCenter" + dc.children = make(map[NodeId]Node) return dc } diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index 4352373cf..cb13a038e 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -6,93 +6,133 @@ import ( ) type NodeId string -type Node struct { - Id NodeId - activeVolumeCount int - maxVolumeCount int - parent *Node - children map[NodeId]*Node - maxVolumeId storage.VolumeId +type Node interface { + Id() NodeId + String() string + FreeSpace() int + ReserveOneVolume(r int, vid storage.VolumeId) (bool, Node) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) + UpAdjustMaxVolumeId(vid storage.VolumeId) + GetActiveVolumeCount() int + GetMaxVolumeCount() int + GetMaxVolumeId() storage.VolumeId + setParent(Node) + LinkChildNode(node Node) + UnlinkChildNode(nodeId NodeId) +} +type NodeImpl struct { + id NodeId + activeVolumeCount int + maxVolumeCount int + parent Node + children map[NodeId]Node + maxVolumeId storage.VolumeId + + //for rack, data center, topology + nodeType string } -func NewNode() *Node { - n := &Node{} - n.children = make(map[NodeId]*Node) - return n +func (n *NodeImpl) IsServer() bool { + return n.nodeType == "Server" } -func (n *Node) String() string { - if n.parent!=nil { - return n.parent.String()+":"+string(n.Id) - } - return string(n.Id) +func (n *NodeImpl) IsRack() bool { + return n.nodeType == "Rack" } - -func (n *Node) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *Node) { - if n.children == nil { - return true, n - } - ret := false - var assignedNode *Node +func (n *NodeImpl) IsDataCenter() bool { + return n.nodeType == "DataCenter" +} +func (n *NodeImpl) String() string { + if n.parent != nil { + return n.parent.String() + ":" + string(n.id) + } + return string(n.id) +} +func (n *NodeImpl) Id() NodeId { + return n.id +} +func (n *NodeImpl) FreeSpace() int { + return n.maxVolumeCount - n.activeVolumeCount +} +func (n *NodeImpl) setParent(node Node) { + n.parent = node +} +func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, Node) { + if n.IsServer() && n.maxVolumeCount > n.activeVolumeCount { + fmt.Println("vid =", vid, " assigned to node =", n, ", freeSpace =", n.maxVolumeCount-n.activeVolumeCount) + return true, n + } + ret := false + var assignedNode Node for _, node := range n.children { - freeSpace := node.maxVolumeCount - node.activeVolumeCount - fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) - if freeSpace <= 0 { - continue - } + freeSpace := node.FreeSpace() + fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) + if freeSpace <= 0 { + continue + } if r >= freeSpace { r -= freeSpace } else { - ret, assignedNode = node.ReserveOneVolume(r, vid) - if ret { - break - } + ret, assignedNode = node.ReserveOneVolume(r, vid) + if ret { + break + } } } return ret, assignedNode } -func (n *Node) AddVolume(v *storage.VolumeInfo) { - if n.maxVolumeId < v.Id { - n.maxVolumeId = v.Id - } - n.activeVolumeCount++ - fmt.Println(n.Id, "adds 1, volumeCount =", n.activeVolumeCount) +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative + n.maxVolumeCount += maxVolumeCountDelta if n.parent != nil { - n.parent.AddVolume(v) + n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } -func (n *Node) AddMaxVolumeCount(maxVolumeCount int) {//can be negative - n.maxVolumeCount += maxVolumeCount - if n.parent != nil { - n.parent.AddMaxVolumeCount(maxVolumeCount) - } +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative + n.activeVolumeCount += activeVolumeCountDelta + if n.parent != nil { + n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative + if n.maxVolumeId < vid { + n.maxVolumeId = vid + if n.parent != nil { + n.parent.UpAdjustMaxVolumeId(vid) + } + } } -func (n *Node) GetMaxVolumeId() storage.VolumeId { +func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { return n.maxVolumeId } - -func (n *Node) AddNode(node *Node) { - if n.children[node.Id] == nil { - n.children[node.Id] = node - n.activeVolumeCount += node.activeVolumeCount - n.maxVolumeCount += node.maxVolumeCount - fmt.Println(n.Id, "adds", node.Id, "volumeCount =", n.activeVolumeCount) - } +func (n *NodeImpl) GetActiveVolumeCount() int { + return n.activeVolumeCount +} +func (n *NodeImpl) GetMaxVolumeCount() int { + return n.maxVolumeCount } -func (n *Node) RemoveNode(nodeId NodeId) { - node := n.children[nodeId] - if node != nil { - delete(n.children, node.Id) - n.activeVolumeCount -= node.activeVolumeCount - n.maxVolumeCount -= node.maxVolumeCount - p := n.parent - for p != nil { - p.activeVolumeCount -= node.activeVolumeCount - p.maxVolumeCount -= node.maxVolumeCount - p = p.parent +func (n *NodeImpl) LinkChildNode(node Node) { + if n.children[node.Id()] == nil { + n.children[node.Id()] = node + n.activeVolumeCount += node.GetActiveVolumeCount() + n.maxVolumeCount += node.GetMaxVolumeCount() + node.setParent(n) + if n.maxVolumeId < node.GetMaxVolumeId() { + n.maxVolumeId = node.GetMaxVolumeId() } - fmt.Println(n.Id, "removes", node.Id, "volumeCount =", n.activeVolumeCount) + fmt.Println(n, "adds", node, "volumeCount =", n.activeVolumeCount) + } +} + +func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { + node := n.children[nodeId] + node.setParent(nil) + if node != nil { + delete(n.children, node.Id()) + n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) + n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) + fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount) } } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index 31dac6218..46e7780dc 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -4,13 +4,14 @@ import ( ) type Rack struct { - Node + NodeImpl ipRange IpRange } -func NewRack(id NodeId) *Rack { +func NewRack(id string) *Rack { r := &Rack{} - r.Node = *NewNode() - r.Node.Id = id + r.id = NodeId(id) + r.nodeType = "Rack" + r.children = make(map[NodeId]Node) return r } diff --git a/weed-fs/src/pkg/topology/server.go b/weed-fs/src/pkg/topology/server.go index 14aed0e2e..aff1f4f15 100644 --- a/weed-fs/src/pkg/topology/server.go +++ b/weed-fs/src/pkg/topology/server.go @@ -6,15 +6,16 @@ import ( ) type Server struct { - Node + NodeImpl volumes map[storage.VolumeId]*storage.VolumeInfo Ip NodeId Port int PublicUrl string } -func NewServer(id NodeId) *Server{ +func NewServer(id string) *Server{ s := &Server{} - s.Node.Id = id + s.id = NodeId(id) + s.nodeType = "Server" s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo) return s } @@ -24,5 +25,6 @@ func (s *Server) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId { } func (s *Server) AddVolume(v *storage.VolumeInfo){ s.volumes[v.Id] = v - s.Node.AddVolume(v) + s.UpAdjustActiveVolumeCountDelta(1) + s.UpAdjustMaxVolumeId(v.Id) } diff --git a/weed-fs/src/pkg/topology/topo_test.go b/weed-fs/src/pkg/topology/topo_test.go index 5bbc33dd9..cb0551096 100644 --- a/weed-fs/src/pkg/topology/topo_test.go +++ b/weed-fs/src/pkg/topology/topo_test.go @@ -80,36 +80,33 @@ func setup() *Topology { printMap(data) //need to connect all nodes first before server adding volumes - topo := NewTopology(NodeId("mynetwork")) + topo := NewTopology("mynetwork") mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { - dc := NewDataCenter(NodeId(dcKey)) - dc.Node.parent = &topo.Node + dc := NewDataCenter(dcKey) dcMap := dcValue.(map[string]interface{}) - topo.Node.AddNode(&dc.Node) + topo.LinkChildNode(dc) for rackKey, rackValue := range dcMap { - rack := NewRack(NodeId(rackKey)) - rack.Node.parent = &dc.Node + rack := NewRack(rackKey) rackMap := rackValue.(map[string]interface{}) - dc.Node.AddNode(&rack.Node) + dc.LinkChildNode(rack) for serverKey, serverValue := range rackMap { - server := NewServer(NodeId(serverKey)) - server.Node.parent = &rack.Node + server := NewServer(serverKey) serverMap := serverValue.(map[string]interface{}) - rack.Node.AddNode(&server.Node) + rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := &storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))} server.AddVolume(vi) } - server.Node.AddMaxVolumeCount(int(serverMap["limit"].(float64))) + server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) } } } fmt.Println("topology:", *topo) - bytes, err := json.Marshal(topo.Node.children) + bytes, err := json.Marshal(topo.children) if err != nil { fmt.Println("json error:", err) } @@ -140,20 +137,14 @@ func printMap(mm interface{}) { } } -func TestAddVolume(t *testing.T) { - topo := setup() - v := &storage.VolumeInfo{} - topo.AddVolume(v) -} - func TestRemoveDataCenter(t *testing.T) { topo := setup() - topo.RemoveNode(NodeId("dc2")) - if topo.activeVolumeCount != 15 { + topo.UnlinkChildNode(NodeId("dc2")) + if topo.GetActiveVolumeCount() != 15 { t.Fail() } - topo.RemoveNode(NodeId("dc3")) - if topo.activeVolumeCount != 12 { + topo.UnlinkChildNode(NodeId("dc3")) + if topo.GetActiveVolumeCount() != 12 { t.Fail() } } @@ -161,8 +152,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup() rand.Seed(time.Now().UnixNano()) + rand.Seed(1) ret, node, vid := topo.RandomlyReserveOneVolume() - fmt.Println("topology:", topo.Node) + fmt.Println("topology:", topo) fmt.Println("assigned :", ret) fmt.Println("assigned node :", node) fmt.Println("assigned volume id:", vid) diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 79f0fac67..085031c5d 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -7,24 +7,25 @@ import ( ) type Topology struct { - Node -} -func NewTopology(id NodeId) *Topology{ - t := &Topology{} - t.Node = *NewNode() - t.Node.Id = id - return t + NodeImpl } -func (t *Topology) RandomlyReserveOneVolume() (bool, *Node, storage.VolumeId) { - slots := t.Node.maxVolumeCount-t.Node.activeVolumeCount +func NewTopology(id string) *Topology { + t := &Topology{} + t.id = NodeId(id) + t.nodeType = "Topology" + t.children = make(map[NodeId]Node) + return t +} +func (t *Topology) RandomlyReserveOneVolume() (bool, Node, storage.VolumeId) { + slots := t.maxVolumeCount-t.activeVolumeCount r := rand.Intn(slots) vid := t.nextVolumeId() - ret, node := t.Node.ReserveOneVolume(r,vid) + ret, node := t.ReserveOneVolume(r,vid) return ret, node, vid } func (t *Topology) nextVolumeId() storage.VolumeId { - vid := t.Node.GetMaxVolumeId() + vid := t.GetMaxVolumeId() return vid.Next() }