package topology

import (
	"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
	"github.com/chrislusf/seaweedfs/weed/storage/needle"
	"github.com/chrislusf/seaweedfs/weed/storage/types"
)

func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
	dn.RLock()
	for _, c := range dn.children {
		disk := c.(*Disk)
		ret = append(ret, disk.GetEcShards()...)
	}
	dn.RUnlock()
	return ret
}

func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
	// prepare the new ec shard map
	actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
	for _, ecShards := range actualShards {
		actualEcShardMap[ecShards.VolumeId] = ecShards
	}

	existingEcShards := dn.GetEcShards()

	// find out the newShards and deletedShards
	var newShardCount, deletedShardCount int
	for _, ecShards := range existingEcShards {

		disk := dn.getOrCreateDisk(ecShards.DiskType)
		deltaDiskUsages := newDiskUsages()
		deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))

		vid := ecShards.VolumeId
		if actualEcShards, ok := actualEcShardMap[vid]; !ok {
			// dn registered ec shards not found in the new set of ec shards
			deletedShards = append(deletedShards, ecShards)
			deletedShardCount += ecShards.ShardIdCount()
		} else {
			// found, but maybe the actual shard could be missing
			a := actualEcShards.Minus(ecShards)
			if a.ShardIdCount() > 0 {
				newShards = append(newShards, a)
				newShardCount += a.ShardIdCount()
			}
			d := ecShards.Minus(actualEcShards)
			if d.ShardIdCount() > 0 {
				deletedShards = append(deletedShards, d)
				deletedShardCount += d.ShardIdCount()
			}
		}

		deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
		disk.UpAdjustDiskUsageDelta(deltaDiskUsages)

	}

	for _, ecShards := range actualShards {
		if dn.hasEcShards(ecShards.VolumeId) {
			continue
		}

		newShards = append(newShards, ecShards)

		disk := dn.getOrCreateDisk(ecShards.DiskType)
		deltaDiskUsages := newDiskUsages()
		deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
		deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount())
		disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
	}

	if len(newShards) > 0 || len(deletedShards) > 0 {
		// if changed, set to the new ec shard map
		dn.doUpdateEcShards(actualShards)
	}

	return
}

func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
	dn.RLock()
	defer dn.RUnlock()
	for _, c := range dn.children {
		disk := c.(*Disk)
		_, found = disk.ecShards[volumeId]
		if found {
			return
		}
	}
	return
}

func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) {
	dn.Lock()
	for _, c := range dn.children {
		disk := c.(*Disk)
		disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
	}
	for _, shard := range actualShards {
		disk := dn.getOrCreateDisk(shard.DiskType)
		disk.ecShards[shard.VolumeId] = shard
	}
	dn.Unlock()
}

func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {

	for _, newShard := range newShards {
		dn.AddOrUpdateEcShard(newShard)
	}

	for _, deletedShard := range deletedShards {
		dn.DeleteEcShard(deletedShard)
	}

}

func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
	disk := dn.getOrCreateDisk(s.DiskType)
	disk.AddOrUpdateEcShard(s)
}

func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
	disk := dn.getOrCreateDisk(s.DiskType)
	disk.DeleteEcShard(s)
}

func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) {

	dn.RLock()
	defer dn.RUnlock()
	for _, c := range dn.children {
		disk := c.(*Disk)
		if disk.HasVolumesById(volumeId) {
			return true
		}
	}
	return false

}