From a72cef3c429202aff5c04ed2c7b9296b2351174f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 31 May 2019 15:48:40 -0700 Subject: [PATCH] encode by collection --- weed/shell/command_ec_encode.go | 66 +++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 817529478..e69d1bfc7 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -59,26 +59,47 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } ctx := context.Background() + vid := needle.VolumeId(*volumeId) + // volumeId is provided + if vid != 0 { + return doEcEncode(ctx, commandEnv, *collection, vid) + } + + // apply to all volumes in the collection + volumeIds, err := collectVolumeByCollection(ctx, commandEnv, *collection) + if err != nil { + return err + } + for _, vid := range volumeIds { + if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { + return err + } + } + + return nil +} + +func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location - locations := commandEnv.masterClient.GetLocations(uint32(*volumeId)) + locations := commandEnv.masterClient.GetLocations(uint32(vid)) if len(locations) == 0 { - return fmt.Errorf("volume %d not found", *volumeId) + return fmt.Errorf("volume %d not found", vid) } // generate ec shards - err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), *collection, locations[0].Url) + err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) if err != nil { - return fmt.Errorf("generate ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) + return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } // balance the ec shards to current cluster - err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), *collection, locations) + err = balanceEcShards(ctx, commandEnv, vid, collection, locations) if err != nil { - return fmt.Errorf("balance ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) + return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } - return err + return nil } func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { @@ -325,3 +346,34 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN return } + +func collectVolumeByCollection(ctx context.Context, commandEnv *commandEnv, selectedCollection string) (vids []needle.VolumeId, err error) { + + var resp *master_pb.VolumeListResponse + err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + vidMap := make(map[uint32]bool) + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, v := range dn.VolumeInfos { + if v.Collection == selectedCollection { + vidMap[v.Id] = true + } + } + } + } + } + + for vid, _ := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +}