From 9d9162ca356932fb8c9ec0322f3a771bbbcc47f5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 7 Jun 2019 00:25:01 -0700 Subject: [PATCH] ec.balance: collect dc rack info --- weed/shell/command_ec_balance.go | 22 +++++++++++++--------- weed/shell/command_ec_common.go | 10 +++++++--- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 4edf94711..15750b61f 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -145,25 +145,29 @@ func balanceEcVolumes(commandEnv *CommandEnv, collection string, applyBalancing func doBalanceEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, allEcNodes []*EcNode, applyBalancing bool) error { // collect all ec nodes with at least one free slot var possibleDestinationEcNodes []*EcNode + possibleDataCenters := make(map[string]int) + possibleRacks := make(map[string]int) for _, ecNode := range allEcNodes { if ecNode.freeEcSlot > 0 { possibleDestinationEcNodes = append(possibleDestinationEcNodes, ecNode) + possibleDataCenters[ecNode.dc] += ecNode.freeEcSlot + possibleRacks[ecNode.dc+"/"+ecNode.rack] += ecNode.freeEcSlot } } // calculate average number of shards an ec node should have for one volume averageShardsPerEcNode := int(math.Ceil(float64(erasure_coding.TotalShardsCount) / float64(len(possibleDestinationEcNodes)))) - fmt.Printf("vid %d averageShardsPerEcNode %+v\n", vid, averageShardsPerEcNode) + fmt.Printf("vid %d averageShards Per EcNode:%d\n", vid, averageShardsPerEcNode) // check whether this volume has ecNodes that are over average - isOverLimit := false + isOverPerNodeAverage := false for _, ecNode := range locations { shardBits := findEcVolumeShards(ecNode, vid) if shardBits.ShardIdCount() > averageShardsPerEcNode { - isOverLimit = true - fmt.Printf("vid %d %s has %d shards, isOverLimit %+v\n", vid, ecNode.info.Id, shardBits.ShardIdCount(), isOverLimit) + isOverPerNodeAverage = true + fmt.Printf("vid %d %s has %d shards, isOverPerNodeAverage %+v\n", vid, ecNode.info.Id, shardBits.ShardIdCount(), isOverPerNodeAverage) break } } - if isOverLimit { + if isOverPerNodeAverage { if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil { return err } @@ -275,11 +279,11 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar return 0 } -func addEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32){ +func addEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32) { for _, shardInfo := range ecNode.info.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { - for _, shardId := range shardIds{ + for _, shardId := range shardIds { shardInfo.EcIndexBits = uint32(erasure_coding.ShardBits(shardInfo.EcIndexBits).AddShardId(erasure_coding.ShardId(shardId))) } } @@ -287,11 +291,11 @@ func addEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32){ } -func deleteEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32){ +func deleteEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32) { for _, shardInfo := range ecNode.info.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { - for _, shardId := range shardIds{ + for _, shardId := range shardIds { shardInfo.EcIndexBits = uint32(erasure_coding.ShardBits(shardInfo.EcIndexBits).RemoveShardId(erasure_coding.ShardId(shardId))) } } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 041715908..7787f4e9f 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -98,11 +98,11 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption return } -func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) { +func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc, rack string, dn *master_pb.DataNodeInfo)) { for _, dc := range topo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, dn := range rack.DataNodeInfos { - fn(dn) + fn(dc.Id, rack.Id, dn) } } } @@ -128,6 +128,8 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { type EcNode struct { info *master_pb.DataNodeInfo + dc string + rack string freeEcSlot int } @@ -144,10 +146,12 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcN } // find out all volume servers with one slot left. - eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { + eachDataNode(resp.TopologyInfo, func(dc, rack string, dn *master_pb.DataNodeInfo) { if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { ecNodes = append(ecNodes, &EcNode{ info: dn, + dc: dc, + rack: rack, freeEcSlot: int(freeEcSlots), }) totalFreeEcSlots += freeEcSlots