file-store/pkg/erasureencode/decode.go

100 lines
2.7 KiB
Go
Raw Normal View History

2022-08-24 02:54:01 +00:00
package erasureencode
import (
"bytes"
"fmt"
"io"
)
type ReadPlanner func(meta *EEMeta) []ChunkShardMeta
type ReadHandler func(data []byte, plan ChunkShardMeta, readNum int) error
func decodeFn(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta, getPlan ReadPlanner, handleRead ReadHandler) error {
raw := []byte{}
rawLen := int32(0)
fullPlan := getPlan(meta)
// we only need to seek once as the rest of the reads should be linear
for _, plan := range fullPlan[0:min(int64(meta.Params.Shards), int64(len(fullPlan)))] {
if _, err := inputs[plan.Chunk].Seek(plan.ChunkOffset, io.SeekStart); err != nil {
2022-08-24 02:54:01 +00:00
return err
}
}
for i, plan := range fullPlan {
if rawLen != plan.Size {
raw = make([]byte, plan.Size)
rawLen = plan.Size
}
if _, err := io.ReadFull(inputs[plan.Chunk], raw); err != nil {
return err
}
if err := handleRead(raw, plan, i); err != nil {
return err
}
}
return nil
}
func Decode(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta) error {
return decodeFn(inputs, file, meta, func(meta *EEMeta) []ChunkShardMeta {
return meta.Params.Plan(0, meta.Params.Size)
}, func(data []byte, _ ChunkShardMeta, _ int) error {
if _, err := file.Write(data); err != nil {
return err
}
return nil
})
}
func DecodeAndValidate(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta) error {
shards := int64(meta.Params.Shards)
2022-08-24 02:54:01 +00:00
// get set up to read meta including the padding
validateParams := *meta
if meta.Params.Size%shards > 0 {
validateParams.Size = (meta.Params.Size / shards) * (shards + 1)
2022-08-24 02:54:01 +00:00
}
return decodeFn(inputs, file, meta, func(_ *EEMeta) []ChunkShardMeta {
return validateParams.Plan(0, validateParams.Size)
}, func(data []byte, read ChunkShardMeta, i int) error {
actual := sha256sum(data)
if !bytes.Equal(actual, meta.ShardHashes[i]) {
return fmt.Errorf("shard hash mismatch")
}
dataLen := int64(len(data))
2022-08-24 02:54:01 +00:00
writeData := data
if read.GlobalOffset > meta.Params.Size {
2022-08-24 02:54:01 +00:00
writeData = nil
} else if read.GlobalOffset+dataLen > meta.Params.Size {
writeData = data[0 : read.GlobalOffset-meta.Params.Size]
2022-08-24 02:54:01 +00:00
}
if writeData != nil {
if _, err := file.Write(writeData); err != nil {
return err
}
}
return nil
})
raw := make([]byte, meta.Params.Stride)
rawLen := int32(len(raw))
for _, plan := range meta.Params.Plan(0, meta.Params.Size) {
if rawLen != plan.Size {
raw = make([]byte, plan.Size)
rawLen = plan.Size
}
// We can be particularly lazy and ignore Offset since we are reading the full file from the chunks
if _, err := io.ReadFull(inputs[plan.Chunk], raw); err != nil {
return err
}
if _, err := file.Write(raw); err != nil {
return err
}
}
return nil
}