seaweedfs/weed/topology/data_node_ec.go

144 lines
3.8 KiB
Go
Raw Normal View History

package topology
import (
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
2019-05-24 05:51:18 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2021-02-16 10:47:02 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
dn.RLock()
2021-02-16 10:47:02 +00:00
for _, c := range dn.children {
disk := c.(*Disk)
ret = append(ret, disk.GetEcShards()...)
}
dn.RUnlock()
return ret
}
func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
2019-05-24 05:51:18 +00:00
// prepare the new ec shard map
actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
for _, ecShards := range actualShards {
2019-05-24 06:34:29 +00:00
actualEcShardMap[ecShards.VolumeId] = ecShards
2019-05-24 05:51:18 +00:00
}
2021-02-16 10:47:02 +00:00
existingEcShards := dn.GetEcShards()
2019-05-24 05:51:18 +00:00
// found out the newShards and deletedShards
2019-06-05 08:58:37 +00:00
var newShardCount, deletedShardCount int
2021-02-16 10:47:02 +00:00
for _, ecShards := range existingEcShards {
disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages()
2021-02-16 11:03:00 +00:00
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
2021-02-16 10:47:02 +00:00
vid := ecShards.VolumeId
2019-05-24 05:51:18 +00:00
if actualEcShards, ok := actualEcShardMap[vid]; !ok {
// dn registered ec shards not found in the new set of ec shards
deletedShards = append(deletedShards, ecShards)
2019-06-05 08:58:37 +00:00
deletedShardCount += ecShards.ShardIdCount()
2019-05-24 05:51:18 +00:00
} else {
// found, but maybe the actual shard could be missing
a := actualEcShards.Minus(ecShards)
2019-05-25 09:02:44 +00:00
if a.ShardIdCount() > 0 {
2019-05-24 05:51:18 +00:00
newShards = append(newShards, a)
2019-06-05 08:58:37 +00:00
newShardCount += a.ShardIdCount()
2019-05-24 05:51:18 +00:00
}
d := ecShards.Minus(actualEcShards)
2019-05-25 09:02:44 +00:00
if d.ShardIdCount() > 0 {
2019-05-24 05:51:18 +00:00
deletedShards = append(deletedShards, d)
2019-06-05 08:58:37 +00:00
deletedShardCount += d.ShardIdCount()
2019-05-24 05:51:18 +00:00
}
}
2021-02-16 10:47:02 +00:00
deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
2019-05-24 05:51:18 +00:00
}
for _, ecShards := range actualShards {
2021-02-16 10:47:02 +00:00
disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages()
2021-02-16 11:03:00 +00:00
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
2021-02-16 10:47:02 +00:00
2021-02-16 13:59:43 +00:00
if !dn.hasEcShards(ecShards.VolumeId) {
2019-05-24 05:51:18 +00:00
newShards = append(newShards, ecShards)
2019-06-05 08:58:37 +00:00
newShardCount += ecShards.ShardIdCount()
2019-05-24 05:51:18 +00:00
}
2021-02-16 10:47:02 +00:00
deltaDiskUsage.ecShardCount = int64(newShardCount)
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
2019-05-24 05:51:18 +00:00
}
2019-05-24 06:34:29 +00:00
if len(newShards) > 0 || len(deletedShards) > 0 {
2019-05-24 05:51:18 +00:00
// if changed, set to the new ec shard map
2021-02-16 10:47:02 +00:00
dn.doUpdateEcShards(actualShards)
2019-05-24 05:51:18 +00:00
}
return
}
2021-02-16 10:47:02 +00:00
func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
dn.RLock()
defer dn.RUnlock()
for _, c := range dn.children {
disk := c.(*Disk)
_, found = disk.ecShards[volumeId]
if found {
return
}
}
return
}
func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) {
dn.Lock()
for _, c := range dn.children {
disk := c.(*Disk)
disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
}
for _, shard := range actualShards {
disk := dn.getOrCreateDisk(shard.DiskType)
disk.ecShards[shard.VolumeId] = shard
}
dn.Unlock()
}
func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
for _, newShard := range newShards {
dn.AddOrUpdateEcShard(newShard)
}
for _, deletedShard := range deletedShards {
dn.DeleteEcShard(deletedShard)
}
}
func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
2021-02-16 10:47:02 +00:00
disk := dn.getOrCreateDisk(s.DiskType)
disk.AddOrUpdateEcShard(s)
}
func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
2021-02-16 10:47:02 +00:00
disk := dn.getOrCreateDisk(s.DiskType)
disk.DeleteEcShard(s)
}
2021-02-16 10:47:02 +00:00
func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) {
dn.RLock()
2021-02-16 10:47:02 +00:00
defer dn.RUnlock()
for _, c := range dn.children {
disk := c.(*Disk)
if disk.HasVolumesById(volumeId) {
return true
}
}
2021-02-16 10:47:02 +00:00
return false
}