mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
avoid possible nil disk info
This commit is contained in:
parent
43101ccea0
commit
36f95e50a9
|
@ -388,7 +388,10 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
|
|||
}
|
||||
|
||||
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
|
||||
diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
||||
count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
|
||||
}
|
||||
|
@ -413,28 +416,30 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
|
|||
if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
|
||||
|
||||
emptyNodeIds := make(map[uint32]bool)
|
||||
emptyDiskInfo := emptyNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
for _, shards := range emptyDiskInfo.EcShardInfos {
|
||||
emptyNodeIds[shards.Id] = true
|
||||
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
|
||||
for _, shards := range emptyDiskInfo.EcShardInfos {
|
||||
emptyNodeIds[shards.Id] = true
|
||||
}
|
||||
}
|
||||
fullDiskInfo := fullNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
for _, shards := range fullDiskInfo.EcShardInfos {
|
||||
if _, found := emptyNodeIds[shards.Id]; !found {
|
||||
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
|
||||
if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
|
||||
for _, shards := range fullDiskInfo.EcShardInfos {
|
||||
if _, found := emptyNodeIds[shards.Id]; !found {
|
||||
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
|
||||
|
||||
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
|
||||
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
|
||||
|
||||
err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
|
||||
if err != nil {
|
||||
return err
|
||||
err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ecNodeIdToShardCount[emptyNode.info.Id]++
|
||||
ecNodeIdToShardCount[fullNode.info.Id]--
|
||||
hasMove = true
|
||||
break
|
||||
}
|
||||
|
||||
ecNodeIdToShardCount[emptyNode.info.Id]++
|
||||
ecNodeIdToShardCount[fullNode.info.Id]--
|
||||
hasMove = true
|
||||
break
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -515,7 +520,10 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
|
|||
func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode {
|
||||
vidLocations := make(map[needle.VolumeId][]*EcNode)
|
||||
for _, ecNode := range allEcNodes {
|
||||
diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
for _, shardInfo := range diskInfo.EcShardInfos {
|
||||
vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
|
||||
}
|
||||
|
|
|
@ -288,10 +288,11 @@ func ceilDivide(total, n int) int {
|
|||
|
||||
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
|
||||
|
||||
diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
for _, shardInfo := range diskInfo.EcShardInfos {
|
||||
if needle.VolumeId(shardInfo.Id) == vid {
|
||||
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
|
||||
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
|
||||
for _, shardInfo := range diskInfo.EcShardInfos {
|
||||
if needle.VolumeId(shardInfo.Id) == vid {
|
||||
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -301,18 +302,19 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
|
|||
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
|
||||
|
||||
foundVolume := false
|
||||
diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
for _, shardInfo := range diskInfo.EcShardInfos {
|
||||
if needle.VolumeId(shardInfo.Id) == vid {
|
||||
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
|
||||
newShardBits := oldShardBits
|
||||
for _, shardId := range shardIds {
|
||||
newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
|
||||
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
|
||||
for _, shardInfo := range diskInfo.EcShardInfos {
|
||||
if needle.VolumeId(shardInfo.Id) == vid {
|
||||
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
|
||||
newShardBits := oldShardBits
|
||||
for _, shardId := range shardIds {
|
||||
newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
|
||||
}
|
||||
shardInfo.EcIndexBits = uint32(newShardBits)
|
||||
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
|
||||
foundVolume = true
|
||||
break
|
||||
}
|
||||
shardInfo.EcIndexBits = uint32(newShardBits)
|
||||
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
|
||||
foundVolume = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -335,16 +337,17 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
|
|||
|
||||
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
|
||||
|
||||
diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
|
||||
for _, shardInfo := range diskInfo.EcShardInfos {
|
||||
if needle.VolumeId(shardInfo.Id) == vid {
|
||||
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
|
||||
newShardBits := oldShardBits
|
||||
for _, shardId := range shardIds {
|
||||
newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
|
||||
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
|
||||
for _, shardInfo := range diskInfo.EcShardInfos {
|
||||
if needle.VolumeId(shardInfo.Id) == vid {
|
||||
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
|
||||
newShardBits := oldShardBits
|
||||
for _, shardId := range shardIds {
|
||||
newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
|
||||
}
|
||||
shardInfo.EcIndexBits = uint32(newShardBits)
|
||||
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
|
||||
}
|
||||
shardInfo.EcIndexBits = uint32(newShardBits)
|
||||
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -226,10 +226,11 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn
|
|||
func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
|
||||
|
||||
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
diskInfo := dn.DiskInfos[string(types.HardDriveType)]
|
||||
for _, v := range diskInfo.EcShardInfos {
|
||||
if v.Collection == selectedCollection && v.Id == uint32(vid) {
|
||||
ecShardInfos = append(ecShardInfos, v)
|
||||
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
||||
for _, v := range diskInfo.EcShardInfos {
|
||||
if v.Collection == selectedCollection && v.Id == uint32(vid) {
|
||||
ecShardInfos = append(ecShardInfos, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -241,10 +242,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri
|
|||
|
||||
vidMap := make(map[uint32]bool)
|
||||
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
diskInfo := dn.DiskInfos[string(types.HardDriveType)]
|
||||
for _, v := range diskInfo.EcShardInfos {
|
||||
if v.Collection == selectedCollection {
|
||||
vidMap[v.Id] = true
|
||||
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
||||
for _, v := range diskInfo.EcShardInfos {
|
||||
if v.Collection == selectedCollection {
|
||||
vidMap[v.Id] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -260,10 +262,11 @@ func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeI
|
|||
|
||||
nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
|
||||
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
diskInfo := dn.DiskInfos[string(types.HardDriveType)]
|
||||
for _, v := range diskInfo.EcShardInfos {
|
||||
if v.Id == uint32(vid) {
|
||||
nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
|
||||
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
||||
for _, v := range diskInfo.EcShardInfos {
|
||||
if v.Id == uint32(vid) {
|
||||
nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
@ -167,7 +168,8 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
|
|||
keepDataNodesSorted(allLocations, replica.info.DiskType)
|
||||
for _, dst := range allLocations {
|
||||
// check whether data nodes satisfy the constraints
|
||||
if dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
|
||||
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
|
||||
if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
|
||||
// check collection name pattern
|
||||
if *c.collectionPattern != "" {
|
||||
matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
|
||||
|
@ -218,8 +220,9 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
|
|||
}
|
||||
|
||||
func keepDataNodesSorted(dataNodes []location, diskType string) {
|
||||
fn := capacityByFreeVolumeCount(types.ToDiskType(diskType))
|
||||
sort.Slice(dataNodes, func(i, j int) bool {
|
||||
return dataNodes[i].dataNode.DiskInfos[diskType].FreeVolumeCount > dataNodes[j].dataNode.DiskInfos[diskType].FreeVolumeCount
|
||||
return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue