2019-03-23 18:34:09 +00:00
package shell
import (
"context"
2020-09-20 16:27:34 +00:00
"flag"
2019-03-23 18:34:09 +00:00
"fmt"
2021-09-13 05:47:52 +00:00
"github.com/chrislusf/seaweedfs/weed/pb"
2020-09-07 23:00:10 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2021-02-16 13:13:48 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/types"
2019-03-23 18:34:09 +00:00
"io"
2021-01-27 06:30:37 +00:00
"path/filepath"
2019-03-23 18:34:09 +00:00
"sort"
2021-10-01 13:51:22 +00:00
"strconv"
"time"
2019-12-23 20:48:20 +00:00
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
2019-03-23 18:34:09 +00:00
)
func init ( ) {
2019-06-05 08:30:24 +00:00
Commands = append ( Commands , & commandVolumeFixReplication { } )
2019-03-23 18:34:09 +00:00
}
type commandVolumeFixReplication struct {
2021-01-27 06:30:37 +00:00
collectionPattern * string
2019-03-23 18:34:09 +00:00
}
func ( c * commandVolumeFixReplication ) Name ( ) string {
return "volume.fix.replication"
}
func ( c * commandVolumeFixReplication ) Help ( ) string {
2021-08-30 05:19:25 +00:00
return ` add or remove replicas to volumes that are missing replicas or over - replicated
2019-03-23 18:34:09 +00:00
2020-09-07 23:00:10 +00:00
This command finds all over - replicated volumes . If found , it will purge the oldest copies and stop .
This command also finds all under - replicated volumes , and finds volume servers with free slots .
2019-03-23 18:54:26 +00:00
If the free slots satisfy the replication requirement , the volume content is copied over and mounted .
2021-01-27 06:30:37 +00:00
volume . fix . replication - n # do not take action
volume . fix . replication # actually deleting or copying the volume files and mount the volume
volume . fix . replication - collectionPattern = important * # fix any collections with prefix "important"
2019-03-23 18:54:26 +00:00
2019-03-23 19:57:35 +00:00
Note :
2021-01-27 08:48:31 +00:00
* each time this will only add back one replica for each volume id that is under replicated .
If there are multiple replicas are missing , e . g . replica count is > 2 , you may need to run this multiple times .
2020-09-07 18:31:33 +00:00
* do not run this too quickly within seconds , since the new volume replica may take a few seconds
2019-03-23 19:57:35 +00:00
to register itself to the master .
2019-03-23 18:34:09 +00:00
`
}
2019-06-05 08:30:24 +00:00
func ( c * commandVolumeFixReplication ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
2019-03-23 18:34:09 +00:00
2020-09-20 16:27:34 +00:00
volFixReplicationCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
2021-01-27 06:30:37 +00:00
c . collectionPattern = volFixReplicationCommand . String ( "collectionPattern" , "" , "match with wildcard characters '*' and '?'" )
2020-09-20 16:27:34 +00:00
skipChange := volFixReplicationCommand . Bool ( "n" , false , "skip the changes" )
2021-07-16 19:13:46 +00:00
retryCount := volFixReplicationCommand . Int ( "retry" , 0 , "how many times to retry" )
2021-09-30 19:17:54 +00:00
volumesPerStep := volFixReplicationCommand . Int ( "volumesPerStep" , 0 , "how many volumes to fix in one cycle" )
2021-09-30 15:24:24 +00:00
2020-09-20 16:27:34 +00:00
if err = volFixReplicationCommand . Parse ( args ) ; err != nil {
return nil
2019-03-23 18:34:09 +00:00
}
2021-09-14 05:13:34 +00:00
if err = commandEnv . confirmIsLocked ( ) ; err != nil {
return
}
2020-09-20 16:27:34 +00:00
takeAction := ! * skipChange
2021-09-30 15:24:24 +00:00
underReplicatedVolumeIdsCount := 1
for underReplicatedVolumeIdsCount > 0 {
2021-10-01 13:51:22 +00:00
fixedVolumeReplicas := map [ string ] int { }
2021-09-30 15:24:24 +00:00
// collect topology information
topologyInfo , _ , err := collectTopologyInfo ( commandEnv )
if err != nil {
return err
}
2019-03-23 18:34:09 +00:00
2021-09-30 15:24:24 +00:00
// find all volumes that needs replication
// collect all data nodes
volumeReplicas , allLocations := collectVolumeReplicaLocations ( topologyInfo )
2020-09-09 18:21:23 +00:00
2021-09-30 15:24:24 +00:00
if len ( allLocations ) == 0 {
return fmt . Errorf ( "no data nodes at all" )
2019-03-23 18:34:09 +00:00
}
2021-09-30 15:24:24 +00:00
// find all under replicated volumes
var underReplicatedVolumeIds , overReplicatedVolumeIds [ ] uint32
for vid , replicas := range volumeReplicas {
replica := replicas [ 0 ]
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replica . info . ReplicaPlacement ) )
if replicaPlacement . GetCopyCount ( ) > len ( replicas ) {
underReplicatedVolumeIds = append ( underReplicatedVolumeIds , vid )
} else if replicaPlacement . GetCopyCount ( ) < len ( replicas ) {
overReplicatedVolumeIds = append ( overReplicatedVolumeIds , vid )
fmt . Fprintf ( writer , "volume %d replication %s, but over replicated %+d\n" , replica . info . Id , replicaPlacement , len ( replicas ) )
}
}
2020-09-07 23:00:10 +00:00
2021-09-30 15:24:24 +00:00
if len ( overReplicatedVolumeIds ) > 0 {
if err := c . fixOverReplicatedVolumes ( commandEnv , writer , takeAction , overReplicatedVolumeIds , volumeReplicas , allLocations ) ; err != nil {
return err
}
}
2019-03-23 18:34:09 +00:00
2021-09-30 15:24:24 +00:00
underReplicatedVolumeIdsCount = len ( underReplicatedVolumeIds )
if underReplicatedVolumeIdsCount > 0 {
// find the most under populated data nodes
2021-10-01 19:10:11 +00:00
fixedVolumeReplicas , err = c . fixUnderReplicatedVolumes ( commandEnv , writer , takeAction , underReplicatedVolumeIds , volumeReplicas , allLocations , * retryCount , * volumesPerStep )
2021-10-01 13:51:22 +00:00
if err != nil {
2021-09-30 15:24:24 +00:00
return err
}
}
2020-09-07 18:31:33 +00:00
2021-09-30 15:24:24 +00:00
if * skipChange {
break
}
2021-10-01 13:51:22 +00:00
// check that the topology has been updated
if len ( fixedVolumeReplicas ) > 0 {
fixedVolumes := make ( [ ] string , 0 , len ( fixedVolumeReplicas ) )
for k , _ := range fixedVolumeReplicas {
fixedVolumes = append ( fixedVolumes , k )
}
2021-10-01 19:10:11 +00:00
volumeIdLocations , err := lookupVolumeIds ( commandEnv , fixedVolumes )
2021-10-01 13:51:22 +00:00
if err != nil {
return err
}
for _ , volumeIdLocation := range volumeIdLocations {
volumeId := volumeIdLocation . VolumeOrFileId
volumeIdLocationCount := len ( volumeIdLocation . Locations )
i := 0
for fixedVolumeReplicas [ volumeId ] >= volumeIdLocationCount {
fmt . Fprintf ( writer , "the number of locations for volume %s has not increased yet, let's wait\n" , volumeId )
time . Sleep ( time . Duration ( i + 1 ) * time . Second * 7 )
2021-10-01 19:10:11 +00:00
volumeLocIds , err := lookupVolumeIds ( commandEnv , [ ] string { volumeId } )
2021-10-01 13:51:22 +00:00
if err != nil {
return err
}
volumeIdLocationCount = len ( volumeLocIds [ 0 ] . Locations )
if * retryCount > i {
return fmt . Errorf ( "replicas volume %s mismatch in topology" , volumeId )
}
i += 1
}
}
}
2021-09-30 15:24:24 +00:00
}
return nil
2020-09-07 18:31:33 +00:00
}
2021-02-22 08:28:42 +00:00
func collectVolumeReplicaLocations ( topologyInfo * master_pb . TopologyInfo ) ( map [ uint32 ] [ ] * VolumeReplica , [ ] location ) {
2020-09-11 07:29:25 +00:00
volumeReplicas := make ( map [ uint32 ] [ ] * VolumeReplica )
var allLocations [ ] location
2021-02-22 08:28:42 +00:00
eachDataNode ( topologyInfo , func ( dc string , rack RackId , dn * master_pb . DataNodeInfo ) {
2020-09-11 07:29:25 +00:00
loc := newLocation ( dc , string ( rack ) , dn )
2021-02-16 10:47:02 +00:00
for _ , diskInfo := range dn . DiskInfos {
for _ , v := range diskInfo . VolumeInfos {
volumeReplicas [ v . Id ] = append ( volumeReplicas [ v . Id ] , & VolumeReplica {
location : & loc ,
info : v ,
} )
}
2020-09-11 07:29:25 +00:00
}
allLocations = append ( allLocations , loc )
} )
return volumeReplicas , allLocations
}
2020-09-07 23:00:10 +00:00
func ( c * commandVolumeFixReplication ) fixOverReplicatedVolumes ( commandEnv * CommandEnv , writer io . Writer , takeAction bool , overReplicatedVolumeIds [ ] uint32 , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , allLocations [ ] location ) error {
for _ , vid := range overReplicatedVolumeIds {
replicas := volumeReplicas [ vid ]
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replicas [ 0 ] . info . ReplicaPlacement ) )
replica := pickOneReplicaToDelete ( replicas , replicaPlacement )
2021-01-27 06:30:37 +00:00
// check collection name pattern
if * c . collectionPattern != "" {
matched , err := filepath . Match ( * c . collectionPattern , replica . info . Collection )
if err != nil {
return fmt . Errorf ( "match pattern %s with collection %s: %v" , * c . collectionPattern , replica . info . Collection , err )
}
if ! matched {
break
}
}
2020-09-07 23:00:10 +00:00
fmt . Fprintf ( writer , "deleting volume %d from %s ...\n" , replica . info . Id , replica . location . dataNode . Id )
if ! takeAction {
break
}
2021-09-13 05:47:52 +00:00
if err := deleteVolume ( commandEnv . option . GrpcDialOption , needle . VolumeId ( replica . info . Id ) , pb . NewServerAddressFromDataNode ( replica . location . dataNode ) ) ; err != nil {
2020-09-07 23:00:10 +00:00
return fmt . Errorf ( "deleting volume %d from %s : %v" , replica . info . Id , replica . location . dataNode . Id , err )
}
}
return nil
}
2021-10-01 19:10:11 +00:00
func ( c * commandVolumeFixReplication ) fixUnderReplicatedVolumes ( commandEnv * CommandEnv , writer io . Writer , takeAction bool , underReplicatedVolumeIds [ ] uint32 , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , allLocations [ ] location , retryCount int , volumesPerStep int ) ( fixedVolumes map [ string ] int , err error ) {
2021-10-01 13:51:22 +00:00
fixedVolumes = map [ string ] int { }
2021-09-30 19:17:54 +00:00
if len ( underReplicatedVolumeIds ) > volumesPerStep && volumesPerStep > 0 {
underReplicatedVolumeIds = underReplicatedVolumeIds [ 0 : volumesPerStep ]
2021-09-30 15:24:24 +00:00
}
2020-09-07 19:35:02 +00:00
for _ , vid := range underReplicatedVolumeIds {
2021-07-21 21:38:12 +00:00
for i := 0 ; i < retryCount + 1 ; i ++ {
2021-07-16 19:13:46 +00:00
if err = c . fixOneUnderReplicatedVolume ( commandEnv , writer , takeAction , volumeReplicas , vid , allLocations ) ; err == nil {
2021-10-01 13:51:22 +00:00
if takeAction {
fixedVolumes [ strconv . FormatUint ( uint64 ( vid ) , 10 ) ] = len ( volumeReplicas [ vid ] )
}
2021-08-03 16:32:55 +00:00
break
2021-07-16 19:13:46 +00:00
}
2021-07-16 18:46:04 +00:00
}
}
2021-10-01 19:10:11 +00:00
return fixedVolumes , nil
2021-07-16 18:46:04 +00:00
}
2019-03-23 18:34:09 +00:00
2021-07-16 18:46:04 +00:00
func ( c * commandVolumeFixReplication ) fixOneUnderReplicatedVolume ( commandEnv * CommandEnv , writer io . Writer , takeAction bool , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , vid uint32 , allLocations [ ] location ) error {
replicas := volumeReplicas [ vid ]
replica := pickOneReplicaToCopyFrom ( replicas )
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replica . info . ReplicaPlacement ) )
foundNewLocation := false
hasSkippedCollection := false
keepDataNodesSorted ( allLocations , types . ToDiskType ( replica . info . DiskType ) )
fn := capacityByFreeVolumeCount ( types . ToDiskType ( replica . info . DiskType ) )
for _ , dst := range allLocations {
// check whether data nodes satisfy the constraints
if fn ( dst . dataNode ) > 0 && satisfyReplicaPlacement ( replicaPlacement , replicas , dst ) {
// check collection name pattern
if * c . collectionPattern != "" {
matched , err := filepath . Match ( * c . collectionPattern , replica . info . Collection )
if err != nil {
return fmt . Errorf ( "match pattern %s with collection %s: %v" , * c . collectionPattern , replica . info . Collection , err )
}
if ! matched {
hasSkippedCollection = true
2019-03-23 18:34:09 +00:00
break
}
2021-07-16 18:46:04 +00:00
}
2019-03-23 18:34:09 +00:00
2021-07-16 18:46:04 +00:00
// ask the volume server to replicate the volume
foundNewLocation = true
fmt . Fprintf ( writer , "replicating volume %d %s from %s to dataNode %s ...\n" , replica . info . Id , replicaPlacement , replica . location . dataNode . Id , dst . dataNode . Id )
2019-03-23 18:34:09 +00:00
2021-07-16 18:46:04 +00:00
if ! takeAction {
2021-08-08 22:12:39 +00:00
// adjust free volume count
dst . dataNode . DiskInfos [ replica . info . DiskType ] . FreeVolumeCount --
2021-07-16 18:46:04 +00:00
break
}
2021-09-13 05:47:52 +00:00
err := operation . WithVolumeServerClient ( pb . NewServerAddressFromDataNode ( dst . dataNode ) , commandEnv . option . GrpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
2021-10-24 09:52:56 +00:00
stream , replicateErr := volumeServerClient . VolumeCopy ( context . Background ( ) , & volume_server_pb . VolumeCopyRequest {
2021-07-16 18:46:04 +00:00
VolumeId : replica . info . Id ,
2021-09-13 05:47:52 +00:00
SourceDataNode : string ( pb . NewServerAddressFromDataNode ( replica . location . dataNode ) ) ,
2021-07-16 18:46:04 +00:00
} )
if replicateErr != nil {
return fmt . Errorf ( "copying from %s => %s : %v" , replica . location . dataNode . Id , dst . dataNode . Id , replicateErr )
2019-03-23 18:34:09 +00:00
}
2021-10-24 09:52:56 +00:00
for {
resp , recvErr := stream . Recv ( )
if recvErr != nil {
if recvErr == io . EOF {
break
} else {
return recvErr
}
}
if resp . ProcessedBytes > 0 {
fmt . Fprintf ( writer , "volume %d processed %d bytes\n" , replica . info . Id , resp . ProcessedBytes )
}
}
2021-07-16 18:46:04 +00:00
return nil
} )
2019-03-23 18:34:09 +00:00
2021-07-16 18:46:04 +00:00
if err != nil {
return err
2019-03-23 18:34:09 +00:00
}
2021-02-16 10:47:02 +00:00
2021-07-16 18:46:04 +00:00
// adjust free volume count
dst . dataNode . DiskInfos [ replica . info . DiskType ] . FreeVolumeCount --
break
2019-03-23 18:34:09 +00:00
}
2021-07-16 18:46:04 +00:00
}
2019-03-23 18:34:09 +00:00
2021-07-16 18:46:04 +00:00
if ! foundNewLocation && ! hasSkippedCollection {
fmt . Fprintf ( writer , "failed to place volume %d replica as %s, existing:%+v\n" , replica . info . Id , replicaPlacement , len ( replicas ) )
2019-03-23 18:34:09 +00:00
}
return nil
}
2021-02-22 09:30:07 +00:00
func keepDataNodesSorted ( dataNodes [ ] location , diskType types . DiskType ) {
fn := capacityByFreeVolumeCount ( diskType )
2019-03-23 18:34:09 +00:00
sort . Slice ( dataNodes , func ( i , j int ) bool {
2021-02-16 13:13:48 +00:00
return fn ( dataNodes [ i ] . dataNode ) > fn ( dataNodes [ j ] . dataNode )
2019-03-23 18:34:09 +00:00
} )
}
2020-04-02 09:16:16 +00:00
/ *
if on an existing data node {
return false
}
if different from existing dcs {
if lack on different dcs {
return true
} else {
return false
}
}
if not on primary dc {
return false
}
if different from existing racks {
if lack on different racks {
return true
} else {
return false
}
}
if not on primary rack {
return false
}
if lacks on same rack {
return true
} else {
return false
}
* /
2020-09-07 19:35:02 +00:00
func satisfyReplicaPlacement ( replicaPlacement * super_block . ReplicaPlacement , replicas [ ] * VolumeReplica , possibleLocation location ) bool {
2019-03-23 18:34:09 +00:00
2020-09-07 23:00:10 +00:00
existingDataCenters , _ , existingDataNodes := countReplicas ( replicas )
if _ , found := existingDataNodes [ possibleLocation . String ( ) ] ; found {
// avoid duplicated volume on the same data node
2020-04-02 09:16:16 +00:00
return false
}
primaryDataCenters , _ := findTopKeys ( existingDataCenters )
// ensure data center count is within limit
if _ , found := existingDataCenters [ possibleLocation . DataCenter ( ) ] ; ! found {
// different from existing dcs
if len ( existingDataCenters ) < replicaPlacement . DiffDataCenterCount + 1 {
// lack on different dcs
return true
} else {
// adding this would go over the different dcs limit
return false
}
}
// now this is same as one of the existing data center
if ! isAmong ( possibleLocation . DataCenter ( ) , primaryDataCenters ) {
// not on one of the primary dcs
return false
2019-03-23 18:34:09 +00:00
}
2020-04-02 09:16:16 +00:00
// now this is one of the primary dcs
2020-09-07 23:00:10 +00:00
primaryDcRacks := make ( map [ string ] int )
2020-09-07 19:35:02 +00:00
for _ , replica := range replicas {
if replica . location . DataCenter ( ) != possibleLocation . DataCenter ( ) {
2020-04-02 09:16:16 +00:00
continue
}
2020-09-07 23:00:10 +00:00
primaryDcRacks [ replica . location . Rack ( ) ] += 1
2020-04-02 09:16:16 +00:00
}
2020-09-07 23:00:10 +00:00
primaryRacks , _ := findTopKeys ( primaryDcRacks )
sameRackCount := primaryDcRacks [ possibleLocation . Rack ( ) ]
2020-04-02 09:16:16 +00:00
// ensure rack count is within limit
2020-09-07 23:00:10 +00:00
if _ , found := primaryDcRacks [ possibleLocation . Rack ( ) ] ; ! found {
2020-04-02 09:16:16 +00:00
// different from existing racks
2020-09-07 23:00:10 +00:00
if len ( primaryDcRacks ) < replicaPlacement . DiffRackCount + 1 {
2020-04-02 09:16:16 +00:00
// lack on different racks
return true
} else {
// adding this would go over the different racks limit
return false
}
}
// now this is same as one of the existing racks
if ! isAmong ( possibleLocation . Rack ( ) , primaryRacks ) {
// not on the primary rack
return false
2019-03-23 18:34:09 +00:00
}
2020-04-02 09:16:16 +00:00
// now this is on the primary rack
// different from existing data nodes
if sameRackCount < replicaPlacement . SameRackCount + 1 {
// lack on same rack
return true
} else {
// adding this would go over the same data node limit
return false
}
}
func findTopKeys ( m map [ string ] int ) ( topKeys [ ] string , max int ) {
for k , c := range m {
if max < c {
topKeys = topKeys [ : 0 ]
topKeys = append ( topKeys , k )
max = c
} else if max == c {
topKeys = append ( topKeys , k )
}
}
return
}
func isAmong ( key string , keys [ ] string ) bool {
for _ , k := range keys {
if k == key {
return true
}
}
2019-03-23 18:34:09 +00:00
return false
}
2020-09-07 19:35:02 +00:00
type VolumeReplica struct {
location * location
info * master_pb . VolumeInformationMessage
}
2019-03-23 18:34:09 +00:00
type location struct {
dc string
rack string
dataNode * master_pb . DataNodeInfo
}
func newLocation ( dc , rack string , dataNode * master_pb . DataNodeInfo ) location {
return location {
dc : dc ,
rack : rack ,
dataNode : dataNode ,
}
}
func ( l location ) String ( ) string {
return fmt . Sprintf ( "%s %s %s" , l . dc , l . rack , l . dataNode . Id )
}
func ( l location ) Rack ( ) string {
return fmt . Sprintf ( "%s %s" , l . dc , l . rack )
}
func ( l location ) DataCenter ( ) string {
return l . dc
}
2020-09-07 23:00:10 +00:00
func pickOneReplicaToCopyFrom ( replicas [ ] * VolumeReplica ) * VolumeReplica {
mostRecent := replicas [ 0 ]
for _ , replica := range replicas {
if replica . info . ModifiedAtSecond > mostRecent . info . ModifiedAtSecond {
mostRecent = replica
}
}
return mostRecent
}
func countReplicas ( replicas [ ] * VolumeReplica ) ( diffDc , diffRack , diffNode map [ string ] int ) {
diffDc = make ( map [ string ] int )
diffRack = make ( map [ string ] int )
diffNode = make ( map [ string ] int )
for _ , replica := range replicas {
diffDc [ replica . location . DataCenter ( ) ] += 1
diffRack [ replica . location . Rack ( ) ] += 1
diffNode [ replica . location . String ( ) ] += 1
}
return
}
func pickOneReplicaToDelete ( replicas [ ] * VolumeReplica , replicaPlacement * super_block . ReplicaPlacement ) * VolumeReplica {
2020-11-10 20:26:05 +00:00
sort . Slice ( replicas , func ( i , j int ) bool {
a , b := replicas [ i ] , replicas [ j ]
2021-08-10 19:30:41 +00:00
if a . info . Size != b . info . Size {
return a . info . Size < b . info . Size
2020-09-07 23:00:10 +00:00
}
2020-11-10 20:26:05 +00:00
if a . info . ModifiedAtSecond != b . info . ModifiedAtSecond {
return a . info . ModifiedAtSecond < b . info . ModifiedAtSecond
}
2021-08-10 19:30:41 +00:00
if a . info . CompactRevision != b . info . CompactRevision {
return a . info . CompactRevision < b . info . CompactRevision
2020-11-10 20:26:05 +00:00
}
return false
} )
return replicas [ 0 ]
2020-09-07 23:00:10 +00:00
}