2019-05-06 04:17:23 +00:00
package shell
import (
"flag"
"fmt"
2021-09-13 05:47:52 +00:00
"github.com/chrislusf/seaweedfs/weed/pb"
2020-09-11 07:29:25 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
2021-02-16 10:47:02 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/types"
2019-05-06 04:17:23 +00:00
"io"
"os"
"sort"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func init ( ) {
2019-06-05 08:30:24 +00:00
Commands = append ( Commands , & commandVolumeBalance { } )
2019-05-06 04:17:23 +00:00
}
type commandVolumeBalance struct {
}
func ( c * commandVolumeBalance ) Name ( ) string {
return "volume.balance"
}
func ( c * commandVolumeBalance ) Help ( ) string {
return ` balance all volumes among volume servers
2019-11-25 06:17:43 +00:00
volume . balance [ - collection ALL | EACH_COLLECTION | < collection_name > ] [ - force ] [ - dataCenter = < data_center_name > ]
2019-05-06 04:17:23 +00:00
Algorithm :
2019-05-07 21:02:01 +00:00
2019-05-06 04:17:23 +00:00
For each type of volume server ( different max volume count limit ) {
2019-05-06 20:30:12 +00:00
for each collection {
balanceWritableVolumes ( )
balanceReadOnlyVolumes ( )
}
2019-05-06 04:17:23 +00:00
}
func balanceWritableVolumes ( ) {
2020-09-12 08:01:19 +00:00
idealWritableVolumeRatio = totalWritableVolumes / totalNumberOfMaxVolumes
2019-06-11 04:32:56 +00:00
for hasMovedOneVolume {
2020-09-12 08:01:19 +00:00
sort all volume servers ordered by the localWritableVolumeRatio = localWritableVolumes to localVolumeMax
pick the volume server B with the highest localWritableVolumeRatio y
for any the volume server A with the number of writable volumes x + 1 <= idealWritableVolumeRatio * localVolumeMax {
if y > localWritableVolumeRatio {
2020-09-11 06:05:00 +00:00
if B has a writable volume id v that A does not have , and satisfy v replication requirements {
move writable volume v from A to B
}
2019-05-06 04:17:23 +00:00
}
}
}
}
func balanceReadOnlyVolumes ( ) {
//similar to balanceWritableVolumes
}
`
}
2019-06-05 08:30:24 +00:00
func ( c * commandVolumeBalance ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
2019-05-06 04:17:23 +00:00
2020-04-23 20:37:31 +00:00
if err = commandEnv . confirmIsLocked ( ) ; err != nil {
return
}
2019-05-06 04:17:23 +00:00
balanceCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
2019-06-03 09:26:31 +00:00
collection := balanceCommand . String ( "collection" , "EACH_COLLECTION" , "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection" )
2019-05-06 20:30:12 +00:00
dc := balanceCommand . String ( "dataCenter" , "" , "only apply the balancing for this dataCenter" )
2019-06-03 09:26:31 +00:00
applyBalancing := balanceCommand . Bool ( "force" , false , "apply the balancing plan." )
2019-05-06 04:17:23 +00:00
if err = balanceCommand . Parse ( args ) ; err != nil {
return nil
}
2021-02-22 08:28:42 +00:00
// collect topology information
topologyInfo , volumeSizeLimitMb , err := collectTopologyInfo ( commandEnv )
2019-05-06 04:17:23 +00:00
if err != nil {
return err
}
2021-02-22 08:28:42 +00:00
volumeServers := collectVolumeServersByDc ( topologyInfo , * dc )
volumeReplicas , _ := collectVolumeReplicaLocations ( topologyInfo )
diskTypes := collectVolumeDiskTypes ( topologyInfo )
2019-12-24 01:58:47 +00:00
2020-09-12 11:06:26 +00:00
if * collection == "EACH_COLLECTION" {
collections , err := ListCollectionNames ( commandEnv , true , false )
if err != nil {
return err
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
for _ , c := range collections {
2021-02-22 08:28:42 +00:00
if err = balanceVolumeServers ( commandEnv , diskTypes , volumeReplicas , volumeServers , volumeSizeLimitMb * 1024 * 1024 , c , * applyBalancing ) ; err != nil {
2019-05-06 05:28:14 +00:00
return err
}
2019-05-06 04:58:46 +00:00
}
2020-09-12 11:06:26 +00:00
} else if * collection == "ALL_COLLECTIONS" {
2021-02-22 08:28:42 +00:00
if err = balanceVolumeServers ( commandEnv , diskTypes , volumeReplicas , volumeServers , volumeSizeLimitMb * 1024 * 1024 , "ALL_COLLECTIONS" , * applyBalancing ) ; err != nil {
2020-09-12 11:06:26 +00:00
return err
}
} else {
2021-02-22 08:28:42 +00:00
if err = balanceVolumeServers ( commandEnv , diskTypes , volumeReplicas , volumeServers , volumeSizeLimitMb * 1024 * 1024 , * collection , * applyBalancing ) ; err != nil {
2020-09-12 11:06:26 +00:00
return err
}
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
2019-05-06 04:17:23 +00:00
return nil
}
2021-02-16 10:47:02 +00:00
func balanceVolumeServers ( commandEnv * CommandEnv , diskTypes [ ] types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , volumeSizeLimit uint64 , collection string , applyBalancing bool ) error {
2019-05-06 04:17:23 +00:00
2021-02-14 07:25:16 +00:00
for _ , diskType := range diskTypes {
2021-02-14 06:34:12 +00:00
if err := balanceVolumeServersByDiskType ( commandEnv , diskType , volumeReplicas , nodes , volumeSizeLimit , collection , applyBalancing ) ; err != nil {
return err
}
}
return nil
}
2021-02-16 10:47:02 +00:00
func balanceVolumeServersByDiskType ( commandEnv * CommandEnv , diskType types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , volumeSizeLimit uint64 , collection string , applyBalancing bool ) error {
2021-02-14 06:34:12 +00:00
2019-05-06 04:17:23 +00:00
for _ , n := range nodes {
2019-05-06 05:28:14 +00:00
n . selectVolumes ( func ( v * master_pb . VolumeInformationMessage ) bool {
2019-12-24 01:58:47 +00:00
if collection != "ALL_COLLECTIONS" {
2019-05-06 04:17:23 +00:00
if v . Collection != collection {
return false
}
}
2021-08-10 18:37:12 +00:00
return v . DiskType == string ( diskType )
2019-05-06 04:17:23 +00:00
} )
}
2021-08-10 18:10:09 +00:00
if err := balanceSelectedVolume ( commandEnv , diskType , volumeReplicas , nodes , capacityByMaxVolumeCount ( diskType ) , sortWritableVolumes , applyBalancing ) ; err != nil {
2020-12-13 11:40:33 +00:00
return err
}
2019-05-06 04:58:46 +00:00
return nil
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
func collectVolumeServersByDc ( t * master_pb . TopologyInfo , selectedDataCenter string ) ( nodes [ ] * Node ) {
2019-05-06 04:17:23 +00:00
for _ , dc := range t . DataCenterInfos {
2019-05-06 20:30:12 +00:00
if selectedDataCenter != "" && dc . Id != selectedDataCenter {
continue
}
2019-05-06 04:17:23 +00:00
for _ , r := range dc . RackInfos {
for _ , dn := range r . DataNodeInfos {
2020-09-12 11:06:26 +00:00
nodes = append ( nodes , & Node {
2020-02-22 05:23:25 +00:00
info : dn ,
dc : dc . Id ,
rack : r . Id ,
} )
2019-05-06 04:17:23 +00:00
}
}
}
return
}
2021-02-16 10:47:02 +00:00
func collectVolumeDiskTypes ( t * master_pb . TopologyInfo ) ( diskTypes [ ] types . DiskType ) {
2021-02-14 07:25:16 +00:00
knownTypes := make ( map [ string ] bool )
for _ , dc := range t . DataCenterInfos {
for _ , r := range dc . RackInfos {
for _ , dn := range r . DataNodeInfos {
2021-02-16 10:47:02 +00:00
for diskType , _ := range dn . DiskInfos {
if _ , found := knownTypes [ diskType ] ; ! found {
knownTypes [ diskType ] = true
2021-02-14 07:25:16 +00:00
}
}
}
}
}
for diskType , _ := range knownTypes {
2021-02-16 10:47:02 +00:00
diskTypes = append ( diskTypes , types . ToDiskType ( diskType ) )
2021-02-14 07:25:16 +00:00
}
return
}
2019-05-06 04:17:23 +00:00
type Node struct {
info * master_pb . DataNodeInfo
selectedVolumes map [ uint32 ] * master_pb . VolumeInformationMessage
2020-02-22 05:23:25 +00:00
dc string
rack string
2019-05-06 04:17:23 +00:00
}
2020-12-13 11:40:33 +00:00
type CapacityFunc func ( * master_pb . DataNodeInfo ) int
2021-02-16 10:47:02 +00:00
func capacityByMaxVolumeCount ( diskType types . DiskType ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) int {
2021-03-15 03:49:56 +00:00
diskInfo , found := info . DiskInfos [ string ( diskType ) ]
2021-02-16 12:16:46 +00:00
if ! found {
return 0
}
2021-02-16 10:47:02 +00:00
return int ( diskInfo . MaxVolumeCount )
}
2020-12-13 11:40:33 +00:00
}
2021-02-16 12:27:16 +00:00
func capacityByFreeVolumeCount ( diskType types . DiskType ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) int {
diskInfo , found := info . DiskInfos [ string ( diskType ) ]
if ! found {
return 0
}
return int ( diskInfo . MaxVolumeCount - diskInfo . VolumeCount )
}
}
2020-12-13 11:40:33 +00:00
func ( n * Node ) localVolumeRatio ( capacityFunc CapacityFunc ) float64 {
return divide ( len ( n . selectedVolumes ) , capacityFunc ( n . info ) )
2020-09-12 11:06:26 +00:00
}
2020-12-13 11:40:33 +00:00
func ( n * Node ) localVolumeNextRatio ( capacityFunc CapacityFunc ) float64 {
return divide ( len ( n . selectedVolumes ) + 1 , capacityFunc ( n . info ) )
2020-09-12 11:06:26 +00:00
}
func ( n * Node ) selectVolumes ( fn func ( v * master_pb . VolumeInformationMessage ) bool ) {
n . selectedVolumes = make ( map [ uint32 ] * master_pb . VolumeInformationMessage )
2021-02-16 10:47:02 +00:00
for _ , diskInfo := range n . info . DiskInfos {
for _ , v := range diskInfo . VolumeInfos {
if fn ( v ) {
n . selectedVolumes [ v . Id ] = v
}
2020-09-12 11:06:26 +00:00
}
}
}
2019-05-06 04:17:23 +00:00
func sortWritableVolumes ( volumes [ ] * master_pb . VolumeInformationMessage ) {
sort . Slice ( volumes , func ( i , j int ) bool {
return volumes [ i ] . Size < volumes [ j ] . Size
} )
}
func sortReadOnlyVolumes ( volumes [ ] * master_pb . VolumeInformationMessage ) {
sort . Slice ( volumes , func ( i , j int ) bool {
return volumes [ i ] . Id < volumes [ j ] . Id
} )
}
2021-08-10 18:10:09 +00:00
func balanceSelectedVolume ( commandEnv * CommandEnv , diskType types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , capacityFunc CapacityFunc , sortCandidatesFn func ( volumes [ ] * master_pb . VolumeInformationMessage ) , applyBalancing bool ) ( err error ) {
2020-09-12 11:06:26 +00:00
selectedVolumeCount , volumeMaxCount := 0 , 0
2020-12-18 18:39:30 +00:00
var nodesWithCapacity [ ] * Node
2019-05-06 04:17:23 +00:00
for _ , dn := range nodes {
selectedVolumeCount += len ( dn . selectedVolumes )
2020-12-18 18:39:30 +00:00
capacity := capacityFunc ( dn . info )
if capacity > 0 {
nodesWithCapacity = append ( nodesWithCapacity , dn )
}
volumeMaxCount += capacity
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
idealVolumeRatio := divide ( selectedVolumeCount , volumeMaxCount )
2019-05-06 04:17:23 +00:00
2020-09-11 06:05:00 +00:00
hasMoved := true
2019-05-06 04:17:23 +00:00
2020-12-14 08:11:52 +00:00
// fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio)
2020-09-11 06:05:00 +00:00
for hasMoved {
hasMoved = false
2020-12-18 18:39:30 +00:00
sort . Slice ( nodesWithCapacity , func ( i , j int ) bool {
return nodesWithCapacity [ i ] . localVolumeRatio ( capacityFunc ) < nodesWithCapacity [ j ] . localVolumeRatio ( capacityFunc )
2019-05-06 04:17:23 +00:00
} )
2020-12-18 18:39:30 +00:00
fullNode := nodesWithCapacity [ len ( nodesWithCapacity ) - 1 ]
2020-09-11 06:05:00 +00:00
var candidateVolumes [ ] * master_pb . VolumeInformationMessage
for _ , v := range fullNode . selectedVolumes {
candidateVolumes = append ( candidateVolumes , v )
}
sortCandidatesFn ( candidateVolumes )
2020-12-18 18:39:30 +00:00
for i := 0 ; i < len ( nodesWithCapacity ) - 1 ; i ++ {
emptyNode := nodesWithCapacity [ i ]
2020-12-13 11:40:33 +00:00
if ! ( fullNode . localVolumeRatio ( capacityFunc ) > idealVolumeRatio && emptyNode . localVolumeNextRatio ( capacityFunc ) <= idealVolumeRatio ) {
2020-09-11 06:05:00 +00:00
// no more volume servers with empty slots
break
2019-05-06 04:17:23 +00:00
}
2021-08-10 19:33:29 +00:00
fmt . Fprintf ( os . Stdout , "%s %.2f %.2f:%.2f\t" , diskType . ReadableString ( ) , idealVolumeRatio , fullNode . localVolumeRatio ( capacityFunc ) , emptyNode . localVolumeNextRatio ( capacityFunc ) )
2020-09-11 07:29:25 +00:00
hasMoved , err = attemptToMoveOneVolume ( commandEnv , volumeReplicas , fullNode , candidateVolumes , emptyNode , applyBalancing )
2020-09-11 06:05:00 +00:00
if err != nil {
return
}
if hasMoved {
// moved one volume
break
2019-05-06 04:17:23 +00:00
}
}
}
2019-05-06 04:58:46 +00:00
return nil
2019-05-06 04:17:23 +00:00
}
2020-09-11 07:29:25 +00:00
func attemptToMoveOneVolume ( commandEnv * CommandEnv , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , candidateVolumes [ ] * master_pb . VolumeInformationMessage , emptyNode * Node , applyBalancing bool ) ( hasMoved bool , err error ) {
2020-09-11 06:05:00 +00:00
for _ , v := range candidateVolumes {
2020-09-15 06:47:11 +00:00
hasMoved , err = maybeMoveOneVolume ( commandEnv , volumeReplicas , fullNode , v , emptyNode , applyBalancing )
if err != nil {
return
2020-09-11 06:05:00 +00:00
}
2020-09-15 06:47:11 +00:00
if hasMoved {
break
}
}
return
}
func maybeMoveOneVolume ( commandEnv * CommandEnv , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , candidateVolume * master_pb . VolumeInformationMessage , emptyNode * Node , applyChange bool ) ( hasMoved bool , err error ) {
if candidateVolume . ReplicaPlacement > 0 {
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( candidateVolume . ReplicaPlacement ) )
if ! isGoodMove ( replicaPlacement , volumeReplicas [ candidateVolume . Id ] , fullNode , emptyNode ) {
return false , nil
}
}
if _ , found := emptyNode . selectedVolumes [ candidateVolume . Id ] ; ! found {
if err = moveVolume ( commandEnv , candidateVolume , fullNode , emptyNode , applyChange ) ; err == nil {
adjustAfterMove ( candidateVolume , volumeReplicas , fullNode , emptyNode )
return true , nil
} else {
return
2020-09-11 06:05:00 +00:00
}
}
return
}
2020-09-15 06:47:11 +00:00
func moveVolume ( commandEnv * CommandEnv , v * master_pb . VolumeInformationMessage , fullNode * Node , emptyNode * Node , applyChange bool ) error {
2019-05-06 04:17:23 +00:00
collectionPrefix := v . Collection + "_"
if v . Collection == "" {
collectionPrefix = ""
}
2020-12-14 08:11:52 +00:00
fmt . Fprintf ( os . Stdout , " moving %s volume %s%d %s => %s\n" , v . DiskType , collectionPrefix , v . Id , fullNode . info . Id , emptyNode . info . Id )
2020-09-15 06:47:11 +00:00
if applyChange {
2021-09-13 05:47:52 +00:00
return LiveMoveVolume ( commandEnv . option . GrpcDialOption , os . Stderr , needle . VolumeId ( v . Id ) , pb . NewServerAddressFromDataNode ( fullNode . info ) , pb . NewServerAddressFromDataNode ( emptyNode . info ) , 5 * time . Second , v . DiskType , false )
2019-05-06 04:17:23 +00:00
}
2019-05-06 04:58:46 +00:00
return nil
2019-05-06 04:17:23 +00:00
}
2020-09-11 07:29:25 +00:00
func isGoodMove ( placement * super_block . ReplicaPlacement , existingReplicas [ ] * VolumeReplica , sourceNode , targetNode * Node ) bool {
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id == targetNode . info . Id &&
replica . location . rack == targetNode . rack &&
replica . location . dc == targetNode . dc {
// never move to existing nodes
return false
}
}
dcs , racks := make ( map [ string ] bool ) , make ( map [ string ] int )
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id != sourceNode . info . Id {
dcs [ replica . location . DataCenter ( ) ] = true
racks [ replica . location . Rack ( ) ] ++
}
}
dcs [ targetNode . dc ] = true
racks [ fmt . Sprintf ( "%s %s" , targetNode . dc , targetNode . rack ) ] ++
2020-11-20 09:12:25 +00:00
if len ( dcs ) != placement . DiffDataCenterCount + 1 {
2020-09-11 07:29:25 +00:00
return false
}
2020-11-20 09:12:25 +00:00
if len ( racks ) != placement . DiffRackCount + placement . DiffDataCenterCount + 1 {
2020-09-11 07:29:25 +00:00
return false
}
for _ , sameRackCount := range racks {
2020-11-20 09:12:25 +00:00
if sameRackCount != placement . SameRackCount + 1 {
2020-09-11 07:29:25 +00:00
return false
}
}
return true
}
2020-09-12 08:01:19 +00:00
func adjustAfterMove ( v * master_pb . VolumeInformationMessage , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , emptyNode * Node ) {
delete ( fullNode . selectedVolumes , v . Id )
2020-09-15 06:47:11 +00:00
if emptyNode . selectedVolumes != nil {
emptyNode . selectedVolumes [ v . Id ] = v
}
2020-09-12 08:01:19 +00:00
existingReplicas := volumeReplicas [ v . Id ]
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id == fullNode . info . Id &&
replica . location . rack == fullNode . rack &&
replica . location . dc == fullNode . dc {
2021-03-15 04:29:55 +00:00
loc := newLocation ( emptyNode . dc , emptyNode . rack , emptyNode . info )
replica . location = & loc
2020-09-12 08:01:19 +00:00
return
}
}
}