file-store/pkg/erasureencode/encode.go

105 lines
2.8 KiB
Go

package erasureencode
import (
"fmt"
"io"
"os"
"github.com/klauspost/reedsolomon"
// chunkmeta "git.keganmyers.com/terribleplan/file-store/pkg/chunk/meta"
filemeta "git.keganmyers.com/terribleplan/file-store/pkg/file/meta"
// "git.keganmyers.com/terribleplan/file-store/pkg/util"
)
func EncodeFile(file *os.File, oututs []io.Writer, stride int32, shards, parity uint16) (*filemeta.Meta, error) {
stats, err := file.Stat()
if err != nil {
return nil, err
}
size := stats.Size()
meta := &filemeta.Meta{
Params: filemeta.Params{
Size: size,
Stride: stride,
Shards: shards,
Parity: parity,
},
}
if err := Encode(file, oututs, meta); err != nil {
return nil, err
}
return meta, nil
}
func Encode(file io.Reader, outputs []io.Writer, meta *filemeta.Meta) error {
if int(meta.Params.Shards)+int(meta.Params.Parity) != len(outputs) {
return fmt.Errorf("expected the number of shards+parity to equal the number of output files provided")
}
enc, err := reedsolomon.New(int(meta.Params.Shards), int(meta.Params.Parity), reedsolomon.WithAutoGoroutines(int(meta.Params.Stride)))
if err != nil {
return err
}
outputChunkCount := meta.Params.Shards + meta.Params.Parity
lastShardChunk := meta.Params.Shards - 1
data := make([][]byte, outputChunkCount)
data[0] = []byte{}
written := false // track whether the current stripe has been written
for i, csm := range meta.Params.Plan(0, meta.Params.Size) {
written = false
// prepare data slices, shard size only meaningfuly changes at stripe boundary
if csm.Chunk == 0 || i == 0 {
// if int32(len(data[0])) != csm.Size {
for i := uint16(0); i < outputChunkCount; i++ {
data[i] = make([]byte, csm.Size)
}
// }
}
// read the individual shard
if _, err := io.ReadFull(file, data[csm.Chunk][0:csm.Size]); err != nil {
return err
}
meta.ShardHashes = append(meta.ShardHashes, sha256sum(data[csm.Chunk]))
// if we are on the last chunk calculate the parity and write things out
if csm.Chunk == lastShardChunk {
if err := writeChunks(data, outputs, enc, meta, meta.Params.Shards, outputChunkCount); err != nil {
return err
}
written = true
}
}
if !written {
if err := writeChunks(data, outputs, enc, meta, meta.Params.Shards, outputChunkCount); err != nil {
return err
}
written = true
}
meta.ShardMerkle = merkleSha256(meta.ShardHashes)
meta.ParityMerkle = merkleSha256(meta.ParityHashes)
return nil
}
func writeChunks(data [][]byte, files []io.Writer, enc reedsolomon.Encoder, meta *filemeta.Meta, shards, totalShards uint16) error {
if err := enc.Encode(data); err != nil {
return err
}
for i := shards; i < totalShards; i++ {
meta.ParityHashes = append(meta.ParityHashes, sha256sum(data[i]))
}
for i := 0; i < len(data); i++ {
if _, err := files[i].Write(data[i]); err != nil {
return err
}
}
return nil
}