able to purge extra ec shard copies

This commit is contained in:
Chris Lu 2019-06-03 20:25:02 -07:00
parent 11cffb3168
commit b05456fe07
2 changed files with 79 additions and 30 deletions

View file

@ -126,9 +126,21 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
found := false
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
found = true
baseFilename = path.Join(location.Directory, baseFilename)
for _, shardId := range req.ShardIds { for _, shardId := range req.ShardIds {
os.Remove(baseFilename + erasure_coding.ToExt(int(shardId))) os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
} }
break
}
}
if !found {
return nil, nil
}
// check whether to delete the ecx file also // check whether to delete the ecx file also
hasEcxFile := false hasEcxFile := false

View file

@ -129,6 +129,20 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing
for vid, locations := range vidLocations { for vid, locations := range vidLocations {
if err := doDeduplicateEcShards(ctx, commandEnv, collection, vid, locations, applyBalancing); err != nil {
return err
}
if err := doBalanceEcShards(ctx, commandEnv, collection, vid, locations, allEcNodes, applyBalancing); err != nil {
return err
}
}
return nil
}
func doBalanceEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, allEcNodes []*EcNode, applyBalancing bool) error {
// collect all ec nodes with at least one free slot // collect all ec nodes with at least one free slot
var possibleDestinationEcNodes []*EcNode var possibleDestinationEcNodes []*EcNode
for _, ecNode := range allEcNodes { for _, ecNode := range allEcNodes {
@ -136,12 +150,9 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing
possibleDestinationEcNodes = append(possibleDestinationEcNodes, ecNode) possibleDestinationEcNodes = append(possibleDestinationEcNodes, ecNode)
} }
} }
// calculate average number of shards an ec node should have for one volume // calculate average number of shards an ec node should have for one volume
averageShardsPerEcNode := int(math.Ceil(float64(erasure_coding.TotalShardsCount) / float64(len(possibleDestinationEcNodes)))) averageShardsPerEcNode := int(math.Ceil(float64(erasure_coding.TotalShardsCount) / float64(len(possibleDestinationEcNodes))))
fmt.Printf("vid %d averageShardsPerEcNode %+v\n", vid, averageShardsPerEcNode) fmt.Printf("vid %d averageShardsPerEcNode %+v\n", vid, averageShardsPerEcNode)
// check whether this volume has ecNodes that are over average // check whether this volume has ecNodes that are over average
isOverLimit := false isOverLimit := false
for _, ecNode := range locations { for _, ecNode := range locations {
@ -152,17 +163,43 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing
break break
} }
} }
if isOverLimit { if isOverLimit {
if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil { if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil {
return err return err
} }
} }
return nil
}
func doDeduplicateEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
// check whether this volume has ecNodes that are over average
shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid)
for _, shardId := range shardBits.ShardIds() {
shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
}
}
for shardId, ecNodes := range shardToLocations {
if len(ecNodes) <= 1 {
continue
}
sortEcNodes(ecNodes)
fmt.Printf("ec shard %d.%d has %d copies, removing from %+v\n", vid, shardId, len(ecNodes), ecNodes[1:])
if !applyBalancing {
continue
}
for _, ecNode := range ecNodes[1:] {
duplicatedShardIds := []uint32{uint32(shardId)}
if err := unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
}
} }
return nil return nil
} }