diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index bdd90e196..4a0903428 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -20,10 +20,17 @@ func init() { Commands = append(Commands, &commandVolumeTierMove{}) } +type volumeTierMoveJob struct { + src pb.ServerAddress + vid needle.VolumeId +} + type commandVolumeTierMove struct { - activeServers map[pb.ServerAddress]struct{} - activeServersLock sync.Mutex - activeServersCond *sync.Cond + activeServers sync.Map + queues map[pb.ServerAddress]chan volumeTierMoveJob + //activeServers map[pb.ServerAddress]struct{} + //activeServersLock sync.Mutex + //activeServersCond *sync.Cond } func (c *commandVolumeTierMove) Name() string { @@ -43,15 +50,13 @@ func (c *commandVolumeTierMove) Help() string { func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - c.activeServers = make(map[pb.ServerAddress]struct{}) - c.activeServersCond = sync.NewCond(new(sync.Mutex)) - tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collectionPattern := tierCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") source := tierCommand.String("fromDiskType", "", "the source disk type") target := tierCommand.String("toDiskType", "", "the target disk type") + limitWorkers := tierCommand.Int("limitWorkers", 0, "limit the number of active copying workers") applyChange := tierCommand.Bool("force", false, "actually apply the changes") if err = tierCommand.Parse(args); err != nil { return nil @@ -82,15 +87,87 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer fmt.Printf("tier move volumes: %v\n", volumeIds) _, allLocations := collectVolumeReplicaLocations(topologyInfo) - for _, vid := range volumeIds { - if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations, *applyChange); err != nil { - fmt.Printf("tier move volume %d: %v\n", vid, err) - } + allLocations = filterLocationsByDiskType(allLocations, toDiskType) + keepDataNodesSorted(allLocations, toDiskType) + + if len(allLocations) > 0 && *limitWorkers > 0 && *limitWorkers < len(allLocations) { + allLocations = allLocations[:*limitWorkers] } + wg := sync.WaitGroup{} + bufferLen := len(allLocations) + c.queues = make(map[pb.ServerAddress]chan volumeTierMoveJob) + + for _, dst := range allLocations { + destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode) + c.queues[destServerAddress] = make(chan volumeTierMoveJob, bufferLen) + + wg.Add(1) + go func (dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) { + for job := range jobs { + fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString()) + if !applyChanges { + continue + } + + locations, found := commandEnv.MasterClient.GetLocations(uint32(job.vid)) + if !found { + fmt.Printf("volume %d not found", job.vid) + continue + } + + unlock := c.Lock(job.src) + + if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst); err != nil { + fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err) + } + unlock() + } + wg.Done() + }(dst, c.queues[destServerAddress], *applyChange) + } + + for _, vid := range volumeIds { + if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations); err != nil { + fmt.Printf("tier move volume %d: %v\n", vid, err) + } + allLocations = rotateDataNodes(allLocations) + } + for key, _ := range c.queues { + close(c.queues[key]) + } + + wg.Wait() + return nil } +func (c *commandVolumeTierMove) Lock(key pb.ServerAddress) func() { + value, _ := c.activeServers.LoadOrStore(key, &sync.Mutex{}) + mtx := value.(*sync.Mutex) + mtx.Lock() + + return func() { mtx.Unlock() } +} + +func filterLocationsByDiskType(dataNodes []location, diskType types.DiskType) (ret []location) { + for _, loc := range dataNodes { + _, found := loc.dataNode.DiskInfos[string(diskType)] + if found { + ret = append(ret, loc) + } + } + return +} + +func rotateDataNodes(dataNodes []location) []location { + if len(dataNodes) > 0 { + return append(dataNodes[1:], dataNodes[0]) + } else { + return dataNodes + } +} + func isOneOf(server string, locations []wdclient.Location) bool { for _, loc := range locations { if server == loc.Url { @@ -100,7 +177,7 @@ func isOneOf(server string, locations []wdclient.Location) bool { return false } -func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) { +func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { @@ -109,9 +186,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer // find one server with the most empty volume slots with target disk type hasFoundTarget := false - keepDataNodesSorted(allLocations, toDiskType) fn := capacityByFreeVolumeCount(toDiskType) - wg := sync.WaitGroup{} for _, dst := range allLocations { if fn(dst.dataNode) > 0 && !hasFoundTarget { // ask the volume server to replicate the volume @@ -127,44 +202,16 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer if sourceVolumeServer == "" { continue } - fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.ReadableString()) hasFoundTarget = true - if !applyChanges { - // adjust volume count - dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++ - break - } + // adjust volume count + dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++ destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode) - c.activeServersCond.L.Lock() - _, isSourceActive := c.activeServers[sourceVolumeServer] - _, isDestActive := c.activeServers[destServerAddress] - for isSourceActive || isDestActive { - c.activeServersCond.Wait() - _, isSourceActive = c.activeServers[sourceVolumeServer] - _, isDestActive = c.activeServers[destServerAddress] - } - c.activeServers[sourceVolumeServer] = struct{}{} - c.activeServers[destServerAddress] = struct{}{} - c.activeServersCond.L.Unlock() - - wg.Add(1) - go func(dst location) { - if err := c.doMoveOneVolume(commandEnv, writer, vid, toDiskType, locations, sourceVolumeServer, dst); err != nil { - fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", vid, sourceVolumeServer, dst.dataNode.Id, err) - } - delete(c.activeServers, sourceVolumeServer) - delete(c.activeServers, destServerAddress) - c.activeServersCond.Signal() - wg.Done() - }(dst) - + c.queues[destServerAddress] <- volumeTierMoveJob{sourceVolumeServer, vid} } } - wg.Wait() - if !hasFoundTarget { fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid) }