seaweedfs/weed/shell/command_ec_encode.go

262 lines
7.8 KiB
Go
Raw Normal View History

2019-05-24 18:52:23 +00:00
package shell
import (
"context"
"flag"
"fmt"
"io"
2019-05-25 09:02:44 +00:00
"sync"
"time"
2019-05-24 18:52:23 +00:00
"github.com/chrislusf/seaweedfs/weed/operation"
2019-05-25 09:02:44 +00:00
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
2019-05-24 18:52:23 +00:00
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
2019-05-25 09:02:44 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
2019-05-24 18:52:23 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2019-05-25 09:02:44 +00:00
"github.com/chrislusf/seaweedfs/weed/wdclient"
2019-05-24 18:52:23 +00:00
"google.golang.org/grpc"
)
func init() {
commands = append(commands, &commandEcEncode{})
}
type commandEcEncode struct {
}
func (c *commandEcEncode) Name() string {
return "ec.encode"
}
func (c *commandEcEncode) Help() string {
return `apply erasure coding to a volume
This command will:
1. freeze one volume
2. apply erasure coding to the volume
3. move the encoded shards to multiple volume servers
The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
to lose 4 volume servers.
If the number of volumes are not high, the worst case is that you only have 4 volume servers,
and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
If you only have less than 4 volume servers, with erasure coding, at least you can afford to
have 4 corrupted shard files.
`
}
func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
collection := encodeCommand.String("collection", "", "the collection name")
quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
2019-05-24 18:52:23 +00:00
if err = encodeCommand.Parse(args); err != nil {
return nil
}
ctx := context.Background()
2019-05-31 22:48:40 +00:00
vid := needle.VolumeId(*volumeId)
2019-05-24 18:52:23 +00:00
2019-05-31 22:48:40 +00:00
// volumeId is provided
if vid != 0 {
return doEcEncode(ctx, commandEnv, *collection, vid)
}
// apply to all volumes in the collection
volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod)
2019-05-31 22:48:40 +00:00
if err != nil {
return err
}
2019-06-03 09:26:31 +00:00
fmt.Printf("ec encode volumes: %v\n", volumeIds)
2019-05-31 22:48:40 +00:00
for _, vid := range volumeIds {
if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
return err
}
}
return nil
}
func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) {
2019-05-25 09:02:44 +00:00
// find volume location
2019-05-31 22:48:40 +00:00
locations := commandEnv.masterClient.GetLocations(uint32(vid))
2019-05-24 18:52:23 +00:00
if len(locations) == 0 {
2019-05-31 22:48:40 +00:00
return fmt.Errorf("volume %d not found", vid)
2019-05-24 18:52:23 +00:00
}
2019-05-25 09:02:44 +00:00
// generate ec shards
2019-05-31 22:48:40 +00:00
err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
2019-05-25 09:02:44 +00:00
if err != nil {
2019-05-31 22:48:40 +00:00
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
2019-05-25 09:02:44 +00:00
}
// balance the ec shards to current cluster
2019-06-03 09:26:31 +00:00
err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
2019-05-25 09:02:44 +00:00
if err != nil {
return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
2019-05-25 09:02:44 +00:00
}
2019-05-24 18:52:23 +00:00
2019-05-31 22:48:40 +00:00
return nil
2019-05-24 18:52:23 +00:00
}
func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
2019-05-24 18:52:23 +00:00
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: uint32(volumeId),
Collection: collection,
2019-05-24 18:52:23 +00:00
})
return genErr
})
return err
}
2019-05-25 09:02:44 +00:00
2019-06-03 09:26:31 +00:00
func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
2019-05-25 09:02:44 +00:00
2019-05-30 08:38:59 +00:00
allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv)
2019-05-25 09:02:44 +00:00
if err != nil {
return err
}
if totalFreeEcSlots < erasure_coding.TotalShardsCount {
return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
}
2019-05-30 08:38:59 +00:00
allocatedDataNodes := allEcNodes
if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
2019-05-25 09:02:44 +00:00
}
// calculate how many shards to allocate for these servers
2019-05-30 08:38:59 +00:00
allocated := balancedEcDistribution(allocatedDataNodes)
2019-05-25 09:02:44 +00:00
// ask the data nodes to copy from the source volume server
2019-05-30 08:38:59 +00:00
copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0])
2019-05-25 09:02:44 +00:00
if err != nil {
2019-06-03 09:26:31 +00:00
return err
}
// unmount the to be deleted shards
err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return err
2019-05-25 09:02:44 +00:00
}
// ask the source volume server to clean up copied ec shards
2019-06-03 09:26:31 +00:00
err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
2019-06-03 09:26:31 +00:00
return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
}
2019-05-25 09:02:44 +00:00
// ask the source volume server to delete the original volume
for _, location := range existingLocations {
err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
if err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
}
}
2019-05-25 09:02:44 +00:00
return err
}
func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
2019-05-30 08:38:59 +00:00
targetServers []*EcNode, allocated []int,
volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
2019-05-25 09:02:44 +00:00
// parallelize
shardIdChan := make(chan []uint32, len(targetServers))
2019-05-25 09:02:44 +00:00
var wg sync.WaitGroup
startFromShardId := uint32(0)
2019-05-25 09:02:44 +00:00
for i, server := range targetServers {
if allocated[i] <= 0 {
continue
}
wg.Add(1)
2019-05-30 08:38:59 +00:00
go func(server *EcNode, startFromShardId uint32, shardCount int) {
2019-05-25 09:02:44 +00:00
defer wg.Done()
2019-06-03 09:26:31 +00:00
copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server,
startFromShardId, shardCount, volumeId, collection, existingLocation.Url)
2019-05-25 09:02:44 +00:00
if copyErr != nil {
err = copyErr
} else {
shardIdChan <- copiedShardIds
2019-05-30 08:38:59 +00:00
server.freeEcSlot -= len(copiedShardIds)
2019-05-25 09:02:44 +00:00
}
}(server, startFromShardId, allocated[i])
2019-05-30 08:38:59 +00:00
startFromShardId += uint32(allocated[i])
2019-05-25 09:02:44 +00:00
}
wg.Wait()
close(shardIdChan)
2019-05-25 09:02:44 +00:00
if err != nil {
return nil, err
}
for shardIds := range shardIdChan {
actuallyCopied = append(actuallyCopied, shardIds...)
}
return
2019-05-25 09:02:44 +00:00
}
2019-05-30 08:38:59 +00:00
func balancedEcDistribution(servers []*EcNode) (allocated []int) {
allocated = make([]int, len(servers))
2019-05-25 09:02:44 +00:00
allocatedCount := 0
for allocatedCount < erasure_coding.TotalShardsCount {
2019-06-03 09:26:31 +00:00
for i, server := range servers {
if server.freeEcSlot-allocated[i] > 0 {
2019-05-25 09:02:44 +00:00
allocated[i] += 1
allocatedCount += 1
}
if allocatedCount >= erasure_coding.TotalShardsCount {
break
}
}
}
return allocated
}
func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
2019-05-31 22:48:40 +00:00
var resp *master_pb.VolumeListResponse
err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
})
if err != nil {
return
}
2019-06-03 09:26:31 +00:00
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
2019-06-03 09:26:31 +00:00
fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
2019-05-31 22:48:40 +00:00
vidMap := make(map[uint32]bool)
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
for _, v := range dn.VolumeInfos {
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
2019-05-31 22:48:40 +00:00
vidMap[v.Id] = true
}
}
}
}
}
for vid, _ := range vidMap {
vids = append(vids, needle.VolumeId(vid))
}
return
}