diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go new file mode 100644 index 000000000..80a0ccf5c --- /dev/null +++ b/weed/shell/command_ec_encode.go @@ -0,0 +1,78 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func init() { + commands = append(commands, &commandEcEncode{}) +} + +type commandEcEncode struct { +} + +func (c *commandEcEncode) Name() string { + return "ec.encode" +} + +func (c *commandEcEncode) Help() string { + return `apply erasure coding to a volume + + This command will: + 1. freeze one volume + 2. apply erasure coding to the volume + 3. move the encoded shards to multiple volume servers + + The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford + to lose 4 volume servers. + + If the number of volumes are not high, the worst case is that you only have 4 volume servers, + and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server. + + If you only have less than 4 volume servers, with erasure coding, at least you can afford to + have 4 corrupted shard files. + +` +} + +func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { + + encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := encodeCommand.Int("vid", 0, "the volume id") + if err = encodeCommand.Parse(args); err != nil { + return nil + } + + ctx := context.Background() + + locations := commandEnv.masterClient.GetLocations(uint32(*volumeId)) + + if len(locations) == 0 { + return fmt.Errorf("volume %d not found", *volumeId) + } + + err = generateEcSlices(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), locations[0].Url) + + return err +} + +func generateEcSlices(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) error { + + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcGenerateSlices(ctx, &volume_server_pb.VolumeEcGenerateSlicesRequest{ + VolumeId: uint32(volumeId), + }) + return genErr + }) + + return err + +} diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index ab6775f99..171f862c9 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -88,6 +88,9 @@ func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics { for _, vi := range t.VolumeInfos { s = s.plus(writeVolumeInformationMessage(writer, vi)) } + for _, ecShardInfo := range t.EcShardInfos { + fmt.Fprintf(writer, " ec %+v \n", ecShardInfo) + } fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s) return s } diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 54e757678..dbfe5858b 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -94,7 +94,7 @@ func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize i func openEcFiles(baseFileName string, forRead bool) (files []*os.File, err error) { for i := 0; i < DataShardsCount+ParityShardsCount; i++ { - fname := baseFileName + ToExt(i+1) + fname := baseFileName + ToExt(i) openOption := os.O_TRUNC | os.O_CREATE | os.O_WRONLY if forRead { openOption = os.O_RDONLY diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 645686a92..cefc95173 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -1,6 +1,8 @@ package erasure_coding import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) @@ -61,3 +63,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret []*master_p } return } + +func (ecInfo *EcVolumeInfo) String() string { + return fmt.Sprintf("id:%d shard:%v collection:%v", ecInfo.VolumeId, ecInfo.ShardIds(), ecInfo.Collection) +} diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 8c8a0b67b..379440684 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -30,6 +30,7 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf ecVolumeInfo = erasure_coding.NewEcVolumeInfo(shardInfo.Collection, needle.VolumeId(shardInfo.Id)) shards = append(shards, ecVolumeInfo) } + prevVolumeId = shardInfo.Id ecVolumeInfo.AddShardId(erasure_coding.ShardId(shardInfo.EcIndex)) } // find out the delta volumes