Merge pull request #2618 from divanikus/master

This commit is contained in:
Chris Lu 2022-01-28 03:34:52 -08:00 committed by GitHub
commit b3f0f170b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -20,10 +20,17 @@ func init() {
Commands = append(Commands, &commandVolumeTierMove{})
}
type volumeTierMoveJob struct {
src pb.ServerAddress
vid needle.VolumeId
}
type commandVolumeTierMove struct {
activeServers map[pb.ServerAddress]struct{}
activeServersLock sync.Mutex
activeServersCond *sync.Cond
activeServers sync.Map
queues map[pb.ServerAddress]chan volumeTierMoveJob
//activeServers map[pb.ServerAddress]struct{}
//activeServersLock sync.Mutex
//activeServersCond *sync.Cond
}
func (c *commandVolumeTierMove) Name() string {
@ -43,15 +50,13 @@ func (c *commandVolumeTierMove) Help() string {
func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
c.activeServers = make(map[pb.ServerAddress]struct{})
c.activeServersCond = sync.NewCond(new(sync.Mutex))
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collectionPattern := tierCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
source := tierCommand.String("fromDiskType", "", "the source disk type")
target := tierCommand.String("toDiskType", "", "the target disk type")
limitWorkers := tierCommand.Int("limitWorkers", 0, "limit the number of active copying workers")
applyChange := tierCommand.Bool("force", false, "actually apply the changes")
if err = tierCommand.Parse(args); err != nil {
return nil
@ -82,15 +87,87 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
fmt.Printf("tier move volumes: %v\n", volumeIds)
_, allLocations := collectVolumeReplicaLocations(topologyInfo)
for _, vid := range volumeIds {
if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations, *applyChange); err != nil {
fmt.Printf("tier move volume %d: %v\n", vid, err)
}
allLocations = filterLocationsByDiskType(allLocations, toDiskType)
keepDataNodesSorted(allLocations, toDiskType)
if len(allLocations) > 0 && *limitWorkers > 0 && *limitWorkers < len(allLocations) {
allLocations = allLocations[:*limitWorkers]
}
wg := sync.WaitGroup{}
bufferLen := len(allLocations)
c.queues = make(map[pb.ServerAddress]chan volumeTierMoveJob)
for _, dst := range allLocations {
destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
c.queues[destServerAddress] = make(chan volumeTierMoveJob, bufferLen)
wg.Add(1)
go func (dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) {
for job := range jobs {
fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString())
if !applyChanges {
continue
}
locations, found := commandEnv.MasterClient.GetLocations(uint32(job.vid))
if !found {
fmt.Printf("volume %d not found", job.vid)
continue
}
unlock := c.Lock(job.src)
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
}
unlock()
}
wg.Done()
}(dst, c.queues[destServerAddress], *applyChange)
}
for _, vid := range volumeIds {
if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations); err != nil {
fmt.Printf("tier move volume %d: %v\n", vid, err)
}
allLocations = rotateDataNodes(allLocations)
}
for key, _ := range c.queues {
close(c.queues[key])
}
wg.Wait()
return nil
}
func (c *commandVolumeTierMove) Lock(key pb.ServerAddress) func() {
value, _ := c.activeServers.LoadOrStore(key, &sync.Mutex{})
mtx := value.(*sync.Mutex)
mtx.Lock()
return func() { mtx.Unlock() }
}
func filterLocationsByDiskType(dataNodes []location, diskType types.DiskType) (ret []location) {
for _, loc := range dataNodes {
_, found := loc.dataNode.DiskInfos[string(diskType)]
if found {
ret = append(ret, loc)
}
}
return
}
func rotateDataNodes(dataNodes []location) []location {
if len(dataNodes) > 0 {
return append(dataNodes[1:], dataNodes[0])
} else {
return dataNodes
}
}
func isOneOf(server string, locations []wdclient.Location) bool {
for _, loc := range locations {
if server == loc.Url {
@ -100,7 +177,7 @@ func isOneOf(server string, locations []wdclient.Location) bool {
return false
}
func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
@ -109,9 +186,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
// find one server with the most empty volume slots with target disk type
hasFoundTarget := false
keepDataNodesSorted(allLocations, toDiskType)
fn := capacityByFreeVolumeCount(toDiskType)
wg := sync.WaitGroup{}
for _, dst := range allLocations {
if fn(dst.dataNode) > 0 && !hasFoundTarget {
// ask the volume server to replicate the volume
@ -127,44 +202,16 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
if sourceVolumeServer == "" {
continue
}
fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.ReadableString())
hasFoundTarget = true
if !applyChanges {
// adjust volume count
dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++
break
}
destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
c.activeServersCond.L.Lock()
_, isSourceActive := c.activeServers[sourceVolumeServer]
_, isDestActive := c.activeServers[destServerAddress]
for isSourceActive || isDestActive {
c.activeServersCond.Wait()
_, isSourceActive = c.activeServers[sourceVolumeServer]
_, isDestActive = c.activeServers[destServerAddress]
}
c.activeServers[sourceVolumeServer] = struct{}{}
c.activeServers[destServerAddress] = struct{}{}
c.activeServersCond.L.Unlock()
wg.Add(1)
go func(dst location) {
if err := c.doMoveOneVolume(commandEnv, writer, vid, toDiskType, locations, sourceVolumeServer, dst); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", vid, sourceVolumeServer, dst.dataNode.Id, err)
}
delete(c.activeServers, sourceVolumeServer)
delete(c.activeServers, destServerAddress)
c.activeServersCond.Signal()
wg.Done()
}(dst)
c.queues[destServerAddress] <- volumeTierMoveJob{sourceVolumeServer, vid}
}
}
wg.Wait()
if !hasFoundTarget {
fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid)
}