2019-05-06 04:17:23 +00:00
package shell
import (
"context"
"flag"
"fmt"
2020-09-11 07:29:25 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
2019-05-06 04:17:23 +00:00
"io"
"os"
"sort"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func init ( ) {
2019-06-05 08:30:24 +00:00
Commands = append ( Commands , & commandVolumeBalance { } )
2019-05-06 04:17:23 +00:00
}
type commandVolumeBalance struct {
}
func ( c * commandVolumeBalance ) Name ( ) string {
return "volume.balance"
}
func ( c * commandVolumeBalance ) Help ( ) string {
return ` balance all volumes among volume servers
2019-11-25 06:17:43 +00:00
volume . balance [ - collection ALL | EACH_COLLECTION | < collection_name > ] [ - force ] [ - dataCenter = < data_center_name > ]
2019-05-06 04:17:23 +00:00
Algorithm :
2019-05-07 21:02:01 +00:00
2019-05-06 04:17:23 +00:00
For each type of volume server ( different max volume count limit ) {
2019-05-06 20:30:12 +00:00
for each collection {
balanceWritableVolumes ( )
balanceReadOnlyVolumes ( )
}
2019-05-06 04:17:23 +00:00
}
func balanceWritableVolumes ( ) {
2020-09-12 08:01:19 +00:00
idealWritableVolumeRatio = totalWritableVolumes / totalNumberOfMaxVolumes
2019-06-11 04:32:56 +00:00
for hasMovedOneVolume {
2020-09-12 08:01:19 +00:00
sort all volume servers ordered by the localWritableVolumeRatio = localWritableVolumes to localVolumeMax
pick the volume server B with the highest localWritableVolumeRatio y
for any the volume server A with the number of writable volumes x + 1 <= idealWritableVolumeRatio * localVolumeMax {
if y > localWritableVolumeRatio {
2020-09-11 06:05:00 +00:00
if B has a writable volume id v that A does not have , and satisfy v replication requirements {
move writable volume v from A to B
}
2019-05-06 04:17:23 +00:00
}
}
}
}
func balanceReadOnlyVolumes ( ) {
//similar to balanceWritableVolumes
}
`
}
2019-06-05 08:30:24 +00:00
func ( c * commandVolumeBalance ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
2019-05-06 04:17:23 +00:00
2020-04-23 20:37:31 +00:00
if err = commandEnv . confirmIsLocked ( ) ; err != nil {
return
}
2019-05-06 04:17:23 +00:00
balanceCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
2019-06-03 09:26:31 +00:00
collection := balanceCommand . String ( "collection" , "EACH_COLLECTION" , "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection" )
2019-05-06 20:30:12 +00:00
dc := balanceCommand . String ( "dataCenter" , "" , "only apply the balancing for this dataCenter" )
2019-06-03 09:26:31 +00:00
applyBalancing := balanceCommand . Bool ( "force" , false , "apply the balancing plan." )
2019-05-06 04:17:23 +00:00
if err = balanceCommand . Parse ( args ) ; err != nil {
return nil
}
var resp * master_pb . VolumeListResponse
2020-02-26 05:50:12 +00:00
err = commandEnv . MasterClient . WithClient ( func ( client master_pb . SeaweedClient ) error {
resp , err = client . VolumeList ( context . Background ( ) , & master_pb . VolumeListRequest { } )
2019-05-06 04:17:23 +00:00
return err
} )
if err != nil {
return err
}
2020-09-12 11:06:26 +00:00
volumeServers := collectVolumeServersByDc ( resp . TopologyInfo , * dc )
2020-09-11 07:29:25 +00:00
volumeReplicas , _ := collectVolumeReplicaLocations ( resp )
2019-12-24 01:58:47 +00:00
2020-09-12 11:06:26 +00:00
if * collection == "EACH_COLLECTION" {
collections , err := ListCollectionNames ( commandEnv , true , false )
if err != nil {
return err
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
for _ , c := range collections {
if err = balanceVolumeServers ( commandEnv , volumeReplicas , volumeServers , resp . VolumeSizeLimitMb * 1024 * 1024 , c , * applyBalancing ) ; err != nil {
2019-05-06 05:28:14 +00:00
return err
}
2019-05-06 04:58:46 +00:00
}
2020-09-12 11:06:26 +00:00
} else if * collection == "ALL_COLLECTIONS" {
if err = balanceVolumeServers ( commandEnv , volumeReplicas , volumeServers , resp . VolumeSizeLimitMb * 1024 * 1024 , "ALL_COLLECTIONS" , * applyBalancing ) ; err != nil {
return err
}
} else {
if err = balanceVolumeServers ( commandEnv , volumeReplicas , volumeServers , resp . VolumeSizeLimitMb * 1024 * 1024 , * collection , * applyBalancing ) ; err != nil {
return err
}
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
2019-05-06 04:17:23 +00:00
return nil
}
2020-09-11 07:29:25 +00:00
func balanceVolumeServers ( commandEnv * CommandEnv , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , volumeSizeLimit uint64 , collection string , applyBalancing bool ) error {
2019-05-06 04:17:23 +00:00
// balance writable volumes
for _ , n := range nodes {
2019-05-06 05:28:14 +00:00
n . selectVolumes ( func ( v * master_pb . VolumeInformationMessage ) bool {
2019-12-24 01:58:47 +00:00
if collection != "ALL_COLLECTIONS" {
2019-05-06 04:17:23 +00:00
if v . Collection != collection {
return false
}
}
return ! v . ReadOnly && v . Size < volumeSizeLimit
} )
}
2020-09-11 07:29:25 +00:00
if err := balanceSelectedVolume ( commandEnv , volumeReplicas , nodes , sortWritableVolumes , applyBalancing ) ; err != nil {
2019-05-06 04:58:46 +00:00
return err
}
2019-05-06 04:17:23 +00:00
// balance readable volumes
for _ , n := range nodes {
2019-05-06 05:28:14 +00:00
n . selectVolumes ( func ( v * master_pb . VolumeInformationMessage ) bool {
2019-12-24 01:58:47 +00:00
if collection != "ALL_COLLECTIONS" {
2019-05-06 04:17:23 +00:00
if v . Collection != collection {
return false
}
}
return v . ReadOnly || v . Size >= volumeSizeLimit
} )
}
2020-09-11 07:29:25 +00:00
if err := balanceSelectedVolume ( commandEnv , volumeReplicas , nodes , sortReadOnlyVolumes , applyBalancing ) ; err != nil {
2019-05-06 04:58:46 +00:00
return err
}
return nil
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
func collectVolumeServersByDc ( t * master_pb . TopologyInfo , selectedDataCenter string ) ( nodes [ ] * Node ) {
2019-05-06 04:17:23 +00:00
for _ , dc := range t . DataCenterInfos {
2019-05-06 20:30:12 +00:00
if selectedDataCenter != "" && dc . Id != selectedDataCenter {
continue
}
2019-05-06 04:17:23 +00:00
for _ , r := range dc . RackInfos {
for _ , dn := range r . DataNodeInfos {
2020-09-12 11:06:26 +00:00
nodes = append ( nodes , & Node {
2020-02-22 05:23:25 +00:00
info : dn ,
dc : dc . Id ,
rack : r . Id ,
} )
2019-05-06 04:17:23 +00:00
}
}
}
return
}
type Node struct {
info * master_pb . DataNodeInfo
selectedVolumes map [ uint32 ] * master_pb . VolumeInformationMessage
2020-02-22 05:23:25 +00:00
dc string
rack string
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
func ( n * Node ) localVolumeRatio ( ) float64 {
return divide ( len ( n . selectedVolumes ) , int ( n . info . MaxVolumeCount ) )
}
func ( n * Node ) localVolumeNextRatio ( ) float64 {
2020-09-12 11:08:03 +00:00
return divide ( len ( n . selectedVolumes ) + 1 , int ( n . info . MaxVolumeCount ) )
2020-09-12 11:06:26 +00:00
}
func ( n * Node ) selectVolumes ( fn func ( v * master_pb . VolumeInformationMessage ) bool ) {
n . selectedVolumes = make ( map [ uint32 ] * master_pb . VolumeInformationMessage )
for _ , v := range n . info . VolumeInfos {
if fn ( v ) {
n . selectedVolumes [ v . Id ] = v
}
}
}
2019-05-06 04:17:23 +00:00
func sortWritableVolumes ( volumes [ ] * master_pb . VolumeInformationMessage ) {
sort . Slice ( volumes , func ( i , j int ) bool {
return volumes [ i ] . Size < volumes [ j ] . Size
} )
}
func sortReadOnlyVolumes ( volumes [ ] * master_pb . VolumeInformationMessage ) {
sort . Slice ( volumes , func ( i , j int ) bool {
return volumes [ i ] . Id < volumes [ j ] . Id
} )
}
2020-09-11 07:29:25 +00:00
func balanceSelectedVolume ( commandEnv * CommandEnv , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , sortCandidatesFn func ( volumes [ ] * master_pb . VolumeInformationMessage ) , applyBalancing bool ) ( err error ) {
2020-09-12 11:06:26 +00:00
selectedVolumeCount , volumeMaxCount := 0 , 0
2019-05-06 04:17:23 +00:00
for _ , dn := range nodes {
selectedVolumeCount += len ( dn . selectedVolumes )
2020-09-12 11:06:26 +00:00
volumeMaxCount += int ( dn . info . MaxVolumeCount )
2019-05-06 04:17:23 +00:00
}
2020-09-12 11:06:26 +00:00
idealVolumeRatio := divide ( selectedVolumeCount , volumeMaxCount )
2019-05-06 04:17:23 +00:00
2020-09-11 06:05:00 +00:00
hasMoved := true
2019-05-06 04:17:23 +00:00
2020-09-11 06:05:00 +00:00
for hasMoved {
hasMoved = false
2019-05-06 04:17:23 +00:00
sort . Slice ( nodes , func ( i , j int ) bool {
2020-09-12 11:06:26 +00:00
return nodes [ i ] . localVolumeRatio ( ) < nodes [ j ] . localVolumeRatio ( )
2019-05-06 04:17:23 +00:00
} )
2020-09-11 06:05:00 +00:00
fullNode := nodes [ len ( nodes ) - 1 ]
var candidateVolumes [ ] * master_pb . VolumeInformationMessage
for _ , v := range fullNode . selectedVolumes {
candidateVolumes = append ( candidateVolumes , v )
}
sortCandidatesFn ( candidateVolumes )
for i := 0 ; i < len ( nodes ) - 1 ; i ++ {
emptyNode := nodes [ i ]
2020-09-12 11:06:26 +00:00
if ! ( fullNode . localVolumeRatio ( ) > idealVolumeRatio && emptyNode . localVolumeNextRatio ( ) <= idealVolumeRatio ) {
2020-09-11 06:05:00 +00:00
// no more volume servers with empty slots
break
2019-05-06 04:17:23 +00:00
}
2020-09-11 07:29:25 +00:00
hasMoved , err = attemptToMoveOneVolume ( commandEnv , volumeReplicas , fullNode , candidateVolumes , emptyNode , applyBalancing )
2020-09-11 06:05:00 +00:00
if err != nil {
return
}
if hasMoved {
// moved one volume
break
2019-05-06 04:17:23 +00:00
}
}
}
2019-05-06 04:58:46 +00:00
return nil
2019-05-06 04:17:23 +00:00
}
2020-09-11 07:29:25 +00:00
func attemptToMoveOneVolume ( commandEnv * CommandEnv , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , candidateVolumes [ ] * master_pb . VolumeInformationMessage , emptyNode * Node , applyBalancing bool ) ( hasMoved bool , err error ) {
2020-09-11 06:05:00 +00:00
for _ , v := range candidateVolumes {
2020-09-15 06:47:11 +00:00
hasMoved , err = maybeMoveOneVolume ( commandEnv , volumeReplicas , fullNode , v , emptyNode , applyBalancing )
if err != nil {
return
2020-09-11 06:05:00 +00:00
}
2020-09-15 06:47:11 +00:00
if hasMoved {
break
}
}
return
}
func maybeMoveOneVolume ( commandEnv * CommandEnv , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , candidateVolume * master_pb . VolumeInformationMessage , emptyNode * Node , applyChange bool ) ( hasMoved bool , err error ) {
if candidateVolume . ReplicaPlacement > 0 {
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( candidateVolume . ReplicaPlacement ) )
if ! isGoodMove ( replicaPlacement , volumeReplicas [ candidateVolume . Id ] , fullNode , emptyNode ) {
return false , nil
}
}
if _ , found := emptyNode . selectedVolumes [ candidateVolume . Id ] ; ! found {
if err = moveVolume ( commandEnv , candidateVolume , fullNode , emptyNode , applyChange ) ; err == nil {
adjustAfterMove ( candidateVolume , volumeReplicas , fullNode , emptyNode )
return true , nil
} else {
return
2020-09-11 06:05:00 +00:00
}
}
return
}
2020-09-15 06:47:11 +00:00
func moveVolume ( commandEnv * CommandEnv , v * master_pb . VolumeInformationMessage , fullNode * Node , emptyNode * Node , applyChange bool ) error {
2019-05-06 04:17:23 +00:00
collectionPrefix := v . Collection + "_"
if v . Collection == "" {
collectionPrefix = ""
}
fmt . Fprintf ( os . Stdout , "moving volume %s%d %s => %s\n" , collectionPrefix , v . Id , fullNode . info . Id , emptyNode . info . Id )
2020-09-15 06:47:11 +00:00
if applyChange {
2020-02-26 05:50:12 +00:00
return LiveMoveVolume ( commandEnv . option . GrpcDialOption , needle . VolumeId ( v . Id ) , fullNode . info . Id , emptyNode . info . Id , 5 * time . Second )
2019-05-06 04:17:23 +00:00
}
2019-05-06 04:58:46 +00:00
return nil
2019-05-06 04:17:23 +00:00
}
2020-09-11 07:29:25 +00:00
func isGoodMove ( placement * super_block . ReplicaPlacement , existingReplicas [ ] * VolumeReplica , sourceNode , targetNode * Node ) bool {
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id == targetNode . info . Id &&
replica . location . rack == targetNode . rack &&
replica . location . dc == targetNode . dc {
// never move to existing nodes
return false
}
}
dcs , racks := make ( map [ string ] bool ) , make ( map [ string ] int )
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id != sourceNode . info . Id {
dcs [ replica . location . DataCenter ( ) ] = true
racks [ replica . location . Rack ( ) ] ++
}
}
dcs [ targetNode . dc ] = true
racks [ fmt . Sprintf ( "%s %s" , targetNode . dc , targetNode . rack ) ] ++
if len ( dcs ) > placement . DiffDataCenterCount + 1 {
return false
}
2020-09-11 08:21:17 +00:00
if len ( racks ) > placement . DiffRackCount + placement . DiffDataCenterCount + 1 {
2020-09-11 07:29:25 +00:00
return false
}
for _ , sameRackCount := range racks {
if sameRackCount > placement . SameRackCount + 1 {
return false
}
}
return true
}
2020-09-12 08:01:19 +00:00
func adjustAfterMove ( v * master_pb . VolumeInformationMessage , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , fullNode * Node , emptyNode * Node ) {
delete ( fullNode . selectedVolumes , v . Id )
2020-09-15 06:47:11 +00:00
if emptyNode . selectedVolumes != nil {
emptyNode . selectedVolumes [ v . Id ] = v
}
2020-09-12 08:01:19 +00:00
existingReplicas := volumeReplicas [ v . Id ]
for _ , replica := range existingReplicas {
if replica . location . dataNode . Id == fullNode . info . Id &&
replica . location . rack == fullNode . rack &&
replica . location . dc == fullNode . dc {
replica . location . dc = emptyNode . dc
replica . location . rack = emptyNode . rack
replica . location . dataNode = emptyNode . info
return
}
}
}