package topology import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { dn.RLock() for _, c := range dn.children { disk := c.(*Disk) ret = append(ret, disk.GetEcShards()...) } dn.RUnlock() return ret } func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { // prepare the new ec shard map actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) for _, ecShards := range actualShards { actualEcShardMap[ecShards.VolumeId] = ecShards } existingEcShards := dn.GetEcShards() // found out the newShards and deletedShards var newShardCount, deletedShardCount int for _, ecShards := range existingEcShards { disk := dn.getOrCreateDisk(ecShards.DiskType) deltaDiskUsages := newDiskUsages() deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) vid := ecShards.VolumeId if actualEcShards, ok := actualEcShardMap[vid]; !ok { // dn registered ec shards not found in the new set of ec shards deletedShards = append(deletedShards, ecShards) deletedShardCount += ecShards.ShardIdCount() } else { // found, but maybe the actual shard could be missing a := actualEcShards.Minus(ecShards) if a.ShardIdCount() > 0 { newShards = append(newShards, a) newShardCount += a.ShardIdCount() } d := ecShards.Minus(actualEcShards) if d.ShardIdCount() > 0 { deletedShards = append(deletedShards, d) deletedShardCount += d.ShardIdCount() } } deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount) disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } for _, ecShards := range actualShards { disk := dn.getOrCreateDisk(ecShards.DiskType) deltaDiskUsages := newDiskUsages() deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) if !dn.hasEcShards(ecShards.VolumeId) { newShards = append(newShards, ecShards) newShardCount += ecShards.ShardIdCount() } deltaDiskUsage.ecShardCount = int64(newShardCount) disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } if len(newShards) > 0 || len(deletedShards) > 0 { // if changed, set to the new ec shard map dn.doUpdateEcShards(actualShards) } return } func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) { dn.RLock() defer dn.RUnlock() for _, c := range dn.children { disk := c.(*Disk) _, found = disk.ecShards[volumeId] if found { return } } return } func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) { dn.Lock() for _, c := range dn.children { disk := c.(*Disk) disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) } for _, shard := range actualShards { disk := dn.getOrCreateDisk(shard.DiskType) disk.ecShards[shard.VolumeId] = shard } dn.Unlock() } func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) { for _, newShard := range newShards { dn.AddOrUpdateEcShard(newShard) } for _, deletedShard := range deletedShards { dn.DeleteEcShard(deletedShard) } } func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { disk := dn.getOrCreateDisk(s.DiskType) disk.AddOrUpdateEcShard(s) } func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { disk := dn.getOrCreateDisk(s.DiskType) disk.DeleteEcShard(s) } func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) { dn.RLock() defer dn.RUnlock() for _, c := range dn.children { disk := c.(*Disk) if disk.HasVolumesById(volumeId) { return true } } return false }