diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index bb9970446..3e72ccdbf 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -146,7 +146,7 @@ func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() ret["Volumes"] = dn.GetVolumeCount() - ret["EcShards"] = dn.GetEcShardsCount() + ret["EcShards"] = dn.GetEcShardCount() ret["Max"] = dn.GetMaxVolumeCount() ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index a0e3a699f..3df9394da 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -14,17 +14,6 @@ func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { return ret } -func (dn *DataNode) GetEcShardsCount() (count int) { - dn.RLock() - defer dn.RUnlock() - - for _, ecVolumeInfo := range dn.ecShards { - count += ecVolumeInfo.ShardBits.ShardIdCount() - } - - return count -} - func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { // prepare the new ec shard map actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) @@ -61,6 +50,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) // if changed, set to the new ec shard map dn.ecShardsLock.Lock() dn.ecShards = actualEcShardMap + dn.UpAdjustEcShardCountDelta(int64(len(newShards) - len(deletedShards))) dn.ecShardsLock.Unlock() } @@ -83,12 +73,18 @@ func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { dn.ecShardsLock.Lock() defer dn.ecShardsLock.Unlock() + delta := 0 if existing, ok := dn.ecShards[s.VolumeId]; !ok { dn.ecShards[s.VolumeId] = s + delta = s.ShardBits.ShardIdCount() } else { + oldCount := existing.ShardBits.ShardIdCount() existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) + delta = existing.ShardBits.ShardIdCount() - oldCount } + dn.UpAdjustEcShardCountDelta(int64(delta)) + } func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { @@ -96,7 +92,10 @@ func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { defer dn.ecShardsLock.Unlock() if existing, ok := dn.ecShards[s.VolumeId]; ok { + oldCount := existing.ShardBits.ShardIdCount() existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) + delta := existing.ShardBits.ShardIdCount() - oldCount + dn.UpAdjustEcShardCountDelta(int64(delta)) if existing.ShardBits.ShardIdCount() == 0 { delete(dn.ecShards, s.VolumeId) } diff --git a/weed/topology/node.go b/weed/topology/node.go index d1f539506..9aee0749e 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) @@ -19,10 +20,12 @@ type Node interface { ReserveOneVolume(r int64) (*DataNode, error) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) UpAdjustVolumeCountDelta(volumeCountDelta int64) + UpAdjustEcShardCountDelta(ecShardCountDelta int64) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) UpAdjustMaxVolumeId(vid needle.VolumeId) GetVolumeCount() int64 + GetEcShardCount() int64 GetActiveVolumeCount() int64 GetMaxVolumeCount() int64 GetMaxVolumeId() needle.VolumeId @@ -40,14 +43,15 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { - id NodeId volumeCount int64 activeVolumeCount int64 + ecShardCount int64 maxVolumeCount int64 + id NodeId parent Node - sync.RWMutex // lock children - children map[NodeId]Node - maxVolumeId needle.VolumeId + sync.RWMutex // lock children + children map[NodeId]Node + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string @@ -128,6 +132,9 @@ func (n *NodeImpl) Id() NodeId { return n.id } func (n *NodeImpl) FreeSpace() int64 { + if n.ecShardCount > 0 { + return n.maxVolumeCount - n.volumeCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 + } return n.maxVolumeCount - n.volumeCount } func (n *NodeImpl) SetParent(node Node) { @@ -184,6 +191,12 @@ func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be n n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } +func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative + atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) + if n.parent != nil { + n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) + } +} func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { @@ -204,6 +217,9 @@ func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { func (n *NodeImpl) GetVolumeCount() int64 { return n.volumeCount } +func (n *NodeImpl) GetEcShardCount() int64 { + return n.ecShardCount +} func (n *NodeImpl) GetActiveVolumeCount() int64 { return n.activeVolumeCount } @@ -219,6 +235,7 @@ func (n *NodeImpl) LinkChildNode(node Node) { n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) + n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) @@ -233,6 +250,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { node.SetParent(nil) delete(n.children, node.Id()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) + n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) glog.V(0).Infoln(n, "removes", node.Id())