seaweedfs/weed/topology/data_node_ec.go
Patrick Schmidt 7413d59750 Fix EC shard count logic
This fixes the calculation of the amount of EC shards a node holds.
Previously a global counter was increased, but also used inside the
loop to apply disk usage deltas. This led to wrong absolute numbers.
The fix is to apply only deltas of single EC shards per iteration.
2021-03-05 12:50:58 +01:00

143 lines
3.7 KiB
Go

package topology
import (
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
dn.RLock()
for _, c := range dn.children {
disk := c.(*Disk)
ret = append(ret, disk.GetEcShards()...)
}
dn.RUnlock()
return ret
}
func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
// prepare the new ec shard map
actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
for _, ecShards := range actualShards {
actualEcShardMap[ecShards.VolumeId] = ecShards
}
existingEcShards := dn.GetEcShards()
// find out the newShards and deletedShards
var newShardCount, deletedShardCount int
for _, ecShards := range existingEcShards {
disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
vid := ecShards.VolumeId
if actualEcShards, ok := actualEcShardMap[vid]; !ok {
// dn registered ec shards not found in the new set of ec shards
deletedShards = append(deletedShards, ecShards)
deletedShardCount += ecShards.ShardIdCount()
} else {
// found, but maybe the actual shard could be missing
a := actualEcShards.Minus(ecShards)
if a.ShardIdCount() > 0 {
newShards = append(newShards, a)
newShardCount += a.ShardIdCount()
}
d := ecShards.Minus(actualEcShards)
if d.ShardIdCount() > 0 {
deletedShards = append(deletedShards, d)
deletedShardCount += d.ShardIdCount()
}
}
deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
for _, ecShards := range actualShards {
if dn.hasEcShards(ecShards.VolumeId) {
continue
}
newShards = append(newShards, ecShards)
disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount())
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
if len(newShards) > 0 || len(deletedShards) > 0 {
// if changed, set to the new ec shard map
dn.doUpdateEcShards(actualShards)
}
return
}
func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
dn.RLock()
defer dn.RUnlock()
for _, c := range dn.children {
disk := c.(*Disk)
_, found = disk.ecShards[volumeId]
if found {
return
}
}
return
}
func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) {
dn.Lock()
for _, c := range dn.children {
disk := c.(*Disk)
disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
}
for _, shard := range actualShards {
disk := dn.getOrCreateDisk(shard.DiskType)
disk.ecShards[shard.VolumeId] = shard
}
dn.Unlock()
}
func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
for _, newShard := range newShards {
dn.AddOrUpdateEcShard(newShard)
}
for _, deletedShard := range deletedShards {
dn.DeleteEcShard(deletedShard)
}
}
func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
disk := dn.getOrCreateDisk(s.DiskType)
disk.AddOrUpdateEcShard(s)
}
func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
disk := dn.getOrCreateDisk(s.DiskType)
disk.DeleteEcShard(s)
}
func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) {
dn.RLock()
defer dn.RUnlock()
for _, c := range dn.children {
disk := c.(*Disk)
if disk.HasVolumesById(volumeId) {
return true
}
}
return false
}