From 48f448ee0930f2784e90662d3de92e0ed2454de9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Aug 2021 03:08:29 -0700 Subject: [PATCH] parallelize tier move --- weed/shell/command_volume_tier_move.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index bf623b899..6a28b7970 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -133,9 +133,27 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer break } - if err := c.doMoveOneVolume(commandEnv, writer, vid, toDiskType, locations, sourceVolumeServer, dst); err != nil { - return err + c.activeServersCond.L.Lock() + _, isSourceActive := c.activeServers[sourceVolumeServer] + _, isDestActive := c.activeServers[dst.dataNode.Id] + for isSourceActive || isDestActive { + c.activeServersCond.Wait() + _, isSourceActive = c.activeServers[sourceVolumeServer] + _, isDestActive = c.activeServers[dst.dataNode.Id] } + c.activeServers[sourceVolumeServer] = struct{}{} + c.activeServers[dst.dataNode.Id] = struct{}{} + c.activeServersCond.L.Unlock() + + go func(dst location) { + if err := c.doMoveOneVolume(commandEnv, writer, vid, toDiskType, locations, sourceVolumeServer, dst); err != nil { + fmt.Fprintf(writer, "move volume %d %s => %s: %v", vid, sourceVolumeServer, dst.dataNode.Id, err) + } + delete(c.activeServers, sourceVolumeServer) + delete(c.activeServers, dst.dataNode.Id) + c.activeServersCond.Signal() + }(dst) + } }