Add initial implementation of store server
This commit is contained in:
parent
db256918db
commit
327ee10d39
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -2,3 +2,4 @@ fls-*
|
|||
*.exe
|
||||
meta.json
|
||||
shard.*
|
||||
pkg/proto/*
|
||||
|
|
9
Makefile
9
Makefile
|
@ -1,8 +1,13 @@
|
|||
.PHONY: all build test
|
||||
.PHONY: all build protoc test
|
||||
|
||||
all: build
|
||||
|
||||
build:
|
||||
protoc:
|
||||
rm -rf pkg/proto
|
||||
mkdir pkg/proto
|
||||
PATH="$(PWD)/local/bin" protoc -I=proto --go_out=. --go-grpc_out=. proto/FiLeStore.proto
|
||||
|
||||
build: protoc
|
||||
GOOS=windows GOARCH=amd64 go build -o fls-windows-amd64.exe .
|
||||
GOOS=linux GOARCH=amd64 go build -o fls-linux-amd64 .
|
||||
|
||||
|
|
20
README.md
20
README.md
|
@ -21,6 +21,13 @@ Do not use it to store any data you care about.
|
|||
- Balancing of chunks
|
||||
- Filesystem features
|
||||
- FUSE mount of network filesystem
|
||||
- Properly organize code, unify logic
|
||||
|
||||
## IN-PROGRESS
|
||||
|
||||
- Networking features
|
||||
- Chunk storage
|
||||
- Basic functionality (mostly working)
|
||||
|
||||
## DONE
|
||||
|
||||
|
@ -34,3 +41,16 @@ Files are striped (with a configurable stripe width, 10MiB by default) across a
|
|||
# Why?
|
||||
|
||||
For fun. To solve a specific problem I have with existing options for distributed replicated file systems. The primary goal of this project is reliable file storage. Some are overly complex. Some are difficult to administer. Some scale poorly. Some don't have adequate data integrity features. Some require full file replication. Hopefully all of these shortcomings and more will be addressed for this specific problem space.
|
||||
|
||||
# Notes
|
||||
|
||||
## Deps
|
||||
|
||||
- protoc
|
||||
|
||||
### go exe deps
|
||||
|
||||
```
|
||||
GOBIN=`pwd`/local/bin go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
|
||||
GOBIN=`pwd`/local/bin go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
|
||||
```
|
||||
|
|
97
cmd/scmd.go
Normal file
97
cmd/scmd.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"git.keganmyers.com/terribleplan/file-store/pkg/storeserver"
|
||||
)
|
||||
|
||||
var (
|
||||
scmd_Chunks []int32
|
||||
scmd_FileId string
|
||||
)
|
||||
|
||||
func init() {
|
||||
root.AddCommand(scmd)
|
||||
|
||||
scmd_put.Flags().Int32SliceVar(&scmd_Chunks, "chunks", []int32{}, "Which chunk files to put onto the remote server (default: none)")
|
||||
scmd_put.Flags().StringVar(&scmd_FileId, "file-id", "", "The File ID to use for the stored file (default: detected from input file)")
|
||||
scmd.AddCommand(scmd_put)
|
||||
|
||||
scmd.AddCommand(scmd_listFiles)
|
||||
}
|
||||
|
||||
var scmd = &cobra.Command{
|
||||
Use: "scmd",
|
||||
Short: "Interact with store servers",
|
||||
}
|
||||
|
||||
var scmd_put = &cobra.Command{
|
||||
Use: "put [encoded-folder] [ssrv-address]",
|
||||
Short: "Put chunk files onto remote storage",
|
||||
Long: `Put chunk files onto remote storage.
|
||||
|
||||
The File ID (from either the basename of encoded-folder or --file-id) must be a UUID, which is how the file will be identified on remote storage.`,
|
||||
Args: cobra.ExactArgs(2),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
// todo: sanity check of encoded-folder
|
||||
encodedFolder, err := filepath.Abs(args[0])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if scmd_FileId == "" {
|
||||
scmd_FileId = filepath.Base(encodedFolder)
|
||||
}
|
||||
if _, err := uuid.Parse(scmd_FileId); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
file, err := os.Open(path.Join(encodedFolder, "meta.json"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
data, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
file.Close()
|
||||
client, err := storeserver.NewClient(args[1])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := client.WriteFile(scmd_FileId, data); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for _, chunkNumber := range scmd_Chunks {
|
||||
file, err := os.Open(path.Join(encodedFolder, fmt.Sprintf("shard.%04d", chunkNumber)))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
client.WriteChunk(scmd_FileId, uint16(chunkNumber), file)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
var scmd_listFiles = &cobra.Command{
|
||||
Use: "list-files [ssrv-address]",
|
||||
Short: "List files in remote storage",
|
||||
Args: cobra.ExactArgs(1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
client, err := storeserver.NewClient(args[0])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := client.ListFiles(func(fileId string) error {
|
||||
fmt.Printf("%s\n", fileId)
|
||||
return nil
|
||||
}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
},
|
||||
}
|
38
cmd/ssrv.go
Normal file
38
cmd/ssrv.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
// "path/filepath"
|
||||
// "fmt"
|
||||
"net"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"git.keganmyers.com/terribleplan/file-store/pkg/storeserver"
|
||||
)
|
||||
|
||||
var (
|
||||
scmd_Listen string
|
||||
)
|
||||
|
||||
func init() {
|
||||
ssrv.Flags().StringVar(&scmd_Listen, "listen", "127.0.0.1:9182", "The address on which to listen for connections (default: 127.0.0.1:9182)")
|
||||
root.AddCommand(ssrv)
|
||||
}
|
||||
|
||||
var ssrv = &cobra.Command{
|
||||
Use: "ssrv [storage-dir]",
|
||||
Short: "Run a store server",
|
||||
Args: cobra.ExactArgs(1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
sock, err := net.Listen("tcp", scmd_Listen)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var opts []grpc.ServerOption
|
||||
// ...
|
||||
grpcServer := grpc.NewServer(opts...)
|
||||
storeserver.NewProto(storeserver.NewLocal(args[0]), grpcServer)
|
||||
grpcServer.Serve(sock)
|
||||
},
|
||||
}
|
10
go.mod
10
go.mod
|
@ -3,13 +3,21 @@ module git.keganmyers.com/terribleplan/file-store
|
|||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/klauspost/reedsolomon v1.10.0
|
||||
github.com/mattn/go-sqlite3 v1.14.15
|
||||
github.com/spf13/cobra v1.5.0
|
||||
google.golang.org/grpc v1.49.0
|
||||
google.golang.org/protobuf v1.28.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.0.14 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.15 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
|
||||
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
|
||||
golang.org/x/text v0.3.3 // indirect
|
||||
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
|
||||
)
|
||||
|
|
85
go.sum
85
go.sum
|
@ -1,4 +1,31 @@
|
|||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/klauspost/cpuid/v2 v2.0.14 h1:QRqdp6bb9M9S5yyKeYteXKuoKE4p0tGlra81fKOpWH8=
|
||||
|
@ -7,10 +34,68 @@ github.com/klauspost/reedsolomon v1.10.0 h1:MonMtg979rxSHjwtsla5dZLhreS0Lu42AyQ2
|
|||
github.com/klauspost/reedsolomon v1.10.0/go.mod h1:qHMIzMkuZUWqIh8mS/GruPdo3u0qwX2jk/LH440ON7Y=
|
||||
github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI=
|
||||
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU=
|
||||
github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
|
||||
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
|
||||
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
|
||||
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
|
||||
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
|
||||
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
|
|
4
local/.gitignore
vendored
Normal file
4
local/.gitignore
vendored
Normal file
|
@ -0,0 +1,4 @@
|
|||
*
|
||||
!.gitignore
|
||||
!bin
|
||||
!include
|
2
local/bin/.gitignore
vendored
Normal file
2
local/bin/.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
*
|
||||
!.gitignore
|
2
local/include/.gitignore
vendored
Normal file
2
local/include/.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
*
|
||||
!.gitignore
|
12
pkg/chunk/meta/main.go
Normal file
12
pkg/chunk/meta/main.go
Normal file
|
@ -0,0 +1,12 @@
|
|||
package meta
|
||||
|
||||
import ()
|
||||
|
||||
type ShardMeta struct {
|
||||
Shard uint16
|
||||
ShardOffset int64
|
||||
Chunk uint16
|
||||
ChunkOffset int64
|
||||
GlobalOffset int64
|
||||
Size int32
|
||||
}
|
|
@ -4,18 +4,22 @@ import (
|
|||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
chunkmeta "git.keganmyers.com/terribleplan/file-store/pkg/chunk/meta"
|
||||
filemeta "git.keganmyers.com/terribleplan/file-store/pkg/file/meta"
|
||||
"git.keganmyers.com/terribleplan/file-store/pkg/util"
|
||||
)
|
||||
|
||||
type ReadPlanner func(meta *EEMeta) []ChunkShardMeta
|
||||
type ReadHandler func(data []byte, plan ChunkShardMeta, readNum int) error
|
||||
type ReadPlanner func(meta *filemeta.Meta) []chunkmeta.ShardMeta
|
||||
type ReadHandler func(data []byte, plan chunkmeta.ShardMeta, readNum int) error
|
||||
|
||||
func decodeFn(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta, getPlan ReadPlanner, handleRead ReadHandler) error {
|
||||
func decodeFn(inputs []io.ReadSeeker, file io.Writer, meta *filemeta.Meta, getPlan ReadPlanner, handleRead ReadHandler) error {
|
||||
raw := []byte{}
|
||||
rawLen := int32(0)
|
||||
fullPlan := getPlan(meta)
|
||||
|
||||
// we only need to seek once as the rest of the reads should be linear
|
||||
for _, plan := range fullPlan[0:min(int64(meta.Params.Shards), int64(len(fullPlan)))] {
|
||||
for _, plan := range fullPlan[0:util.Min(int64(meta.Params.Shards), int64(len(fullPlan)))] {
|
||||
if _, err := inputs[plan.Chunk].Seek(plan.ChunkOffset, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -37,10 +41,10 @@ func decodeFn(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta, getPlan Read
|
|||
return nil
|
||||
}
|
||||
|
||||
func Decode(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta) error {
|
||||
return decodeFn(inputs, file, meta, func(meta *EEMeta) []ChunkShardMeta {
|
||||
func Decode(inputs []io.ReadSeeker, file io.Writer, meta *filemeta.Meta) error {
|
||||
return decodeFn(inputs, file, meta, func(meta *filemeta.Meta) []chunkmeta.ShardMeta {
|
||||
return meta.Params.Plan(0, meta.Params.Size)
|
||||
}, func(data []byte, _ ChunkShardMeta, _ int) error {
|
||||
}, func(data []byte, _ chunkmeta.ShardMeta, _ int) error {
|
||||
if _, err := file.Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -48,7 +52,7 @@ func Decode(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta) error {
|
|||
})
|
||||
}
|
||||
|
||||
func DecodeAndValidate(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta) error {
|
||||
func DecodeAndValidate(inputs []io.ReadSeeker, file io.Writer, meta *filemeta.Meta) error {
|
||||
shards := int64(meta.Params.Shards)
|
||||
|
||||
// get set up to read meta including the padding
|
||||
|
@ -57,9 +61,9 @@ func DecodeAndValidate(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta) err
|
|||
validateParams.Size = (meta.Params.Size / shards) * (shards + 1)
|
||||
}
|
||||
|
||||
return decodeFn(inputs, file, meta, func(_ *EEMeta) []ChunkShardMeta {
|
||||
return decodeFn(inputs, file, meta, func(_ *filemeta.Meta) []chunkmeta.ShardMeta {
|
||||
return validateParams.Plan(0, validateParams.Size)
|
||||
}, func(data []byte, read ChunkShardMeta, i int) error {
|
||||
}, func(data []byte, read chunkmeta.ShardMeta, i int) error {
|
||||
actual := sha256sum(data)
|
||||
if !bytes.Equal(actual, meta.ShardHashes[i]) {
|
||||
return fmt.Errorf("shard hash mismatch")
|
||||
|
|
|
@ -6,17 +6,21 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/klauspost/reedsolomon"
|
||||
|
||||
// chunkmeta "git.keganmyers.com/terribleplan/file-store/pkg/chunk/meta"
|
||||
filemeta "git.keganmyers.com/terribleplan/file-store/pkg/file/meta"
|
||||
// "git.keganmyers.com/terribleplan/file-store/pkg/util"
|
||||
)
|
||||
|
||||
func EncodeFile(file *os.File, oututs []io.Writer, stride int32, shards, parity uint16) (*EEMeta, error) {
|
||||
func EncodeFile(file *os.File, oututs []io.Writer, stride int32, shards, parity uint16) (*filemeta.Meta, error) {
|
||||
stats, err := file.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
size := stats.Size()
|
||||
|
||||
meta := &EEMeta{
|
||||
Params: Params{
|
||||
meta := &filemeta.Meta{
|
||||
Params: filemeta.Params{
|
||||
Size: size,
|
||||
Stride: stride,
|
||||
Shards: shards,
|
||||
|
@ -31,7 +35,7 @@ func EncodeFile(file *os.File, oututs []io.Writer, stride int32, shards, parity
|
|||
return meta, nil
|
||||
}
|
||||
|
||||
func Encode(file io.Reader, outputs []io.Writer, meta *EEMeta) error {
|
||||
func Encode(file io.Reader, outputs []io.Writer, meta *filemeta.Meta) error {
|
||||
if int(meta.Params.Shards)+int(meta.Params.Parity) != len(outputs) {
|
||||
return fmt.Errorf("expected the number of shards+parity to equal the number of output files provided")
|
||||
}
|
||||
|
@ -84,7 +88,7 @@ func Encode(file io.Reader, outputs []io.Writer, meta *EEMeta) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func writeChunks(data [][]byte, files []io.Writer, enc reedsolomon.Encoder, meta *EEMeta, shards, totalShards uint16) error {
|
||||
func writeChunks(data [][]byte, files []io.Writer, enc reedsolomon.Encoder, meta *filemeta.Meta, shards, totalShards uint16) error {
|
||||
if err := enc.Encode(data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,97 +1 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type EEMeta struct {
|
||||
Params
|
||||
Name string `json:"name"`
|
||||
ShardMerkle FileHash `json:"shardMerkle"`
|
||||
ShardHashes HashList `json:"shardHashes"`
|
||||
ParityMerkle FileHash `json:"parityMerkle"`
|
||||
ParityHashes HashList `json:"parityHashes"`
|
||||
}
|
||||
|
||||
type FileHash []byte
|
||||
|
||||
func (hash FileHash) MarshalJSON() ([]byte, error) {
|
||||
text, err := hash.MarshalText()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(string(text))
|
||||
}
|
||||
|
||||
func (hash FileHash) MarshalText() ([]byte, error) {
|
||||
return []byte(base64.StdEncoding.EncodeToString(hash)), nil
|
||||
}
|
||||
|
||||
func (hash *FileHash) UnmarshalJSON(data []byte) error {
|
||||
s := ""
|
||||
json.Unmarshal(data, &s)
|
||||
return hash.UnmarshalText([]byte(s))
|
||||
}
|
||||
|
||||
func (hash *FileHash) UnmarshalText(text []byte) error {
|
||||
raw, err := base64.StdEncoding.DecodeString(string(text))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(raw) != sha256.Size {
|
||||
return fmt.Errorf("expected hash to be %d bytes (got %d)", sha256.Size, len(raw))
|
||||
}
|
||||
*hash = raw
|
||||
return nil
|
||||
}
|
||||
|
||||
type HashList [][]byte
|
||||
|
||||
func (hashes HashList) MarshalJSON() ([]byte, error) {
|
||||
text, err := hashes.MarshalText()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(string(text))
|
||||
}
|
||||
|
||||
func (hashes HashList) MarshalText() ([]byte, error) {
|
||||
// fmt.Printf("marshal hash list...\n")
|
||||
hashesLen := len(hashes)
|
||||
hashLen := 0
|
||||
dataLen := 0
|
||||
if hashesLen != 0 {
|
||||
hashLen = len(hashes[0])
|
||||
dataLen = hashesLen * hashLen
|
||||
}
|
||||
data := make([]byte, dataLen)
|
||||
for i, hash := range hashes {
|
||||
copy(data[i*hashLen:], hash)
|
||||
}
|
||||
return []byte(base64.StdEncoding.EncodeToString(data)), nil
|
||||
}
|
||||
|
||||
func (hashes *HashList) UnmarshalJSON(data []byte) error {
|
||||
s := ""
|
||||
json.Unmarshal(data, &s)
|
||||
return hashes.UnmarshalText([]byte(s))
|
||||
}
|
||||
|
||||
func (hashes *HashList) UnmarshalText(text []byte) error {
|
||||
raw, err := base64.StdEncoding.DecodeString(string(text))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(raw)%sha256.Size != 0 {
|
||||
return fmt.Errorf("expected hash list to be multiple of %d bytes", sha256.Size)
|
||||
}
|
||||
split := make([][]byte, len(raw)/sha256.Size)
|
||||
for i := 0; i < len(raw); i += sha256.Size {
|
||||
split[i/sha256.Size] = raw[i : i+sha256.Size]
|
||||
}
|
||||
*hashes = split
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,71 +0,0 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type Params struct {
|
||||
Size int64 `json:"s,string"`
|
||||
Stride int32 `json:"t"`
|
||||
Shards uint16 `json:"h"`
|
||||
Parity uint16 `json:"p"`
|
||||
}
|
||||
|
||||
type ChunkShardMeta struct {
|
||||
Shard uint16
|
||||
ShardOffset int64
|
||||
Chunk uint16
|
||||
ChunkOffset int64
|
||||
GlobalOffset int64
|
||||
Size int32
|
||||
}
|
||||
|
||||
func (params Params) Plan(offset, size int64) []ChunkShardMeta {
|
||||
outputs := []ChunkShardMeta{}
|
||||
end := offset + size
|
||||
if (end - 1) > params.Size {
|
||||
panic(fmt.Errorf("attempted read beyond end of file"))
|
||||
}
|
||||
// constant
|
||||
shards := int64(params.Shards)
|
||||
baseStride := int64(params.Stride)
|
||||
baseStripeWidth := baseStride * shards
|
||||
|
||||
oddStripeOffset := (params.Size / baseStripeWidth) * baseStripeWidth
|
||||
oddChunkOffset := oddStripeOffset / shards
|
||||
oddSize := (params.Size - oddStripeOffset)
|
||||
oddStride := oddSize / shards
|
||||
if oddSize%shards > 0 {
|
||||
oddStride += 1
|
||||
}
|
||||
|
||||
for offset < end {
|
||||
output := ChunkShardMeta{GlobalOffset: offset}
|
||||
// var chunk uint64 // which chunk the shard is in
|
||||
// var chunkOffset uint64 // the location within the chunk that the shard begins
|
||||
// var shardOffset uint64 // the location within the shard at which the data begins
|
||||
// var readSize uint64 // the number of bytes to read from the shard
|
||||
|
||||
if offset >= oddStripeOffset {
|
||||
localOffset := offset - oddStripeOffset // the location relative to the odd data at which the desired data begins
|
||||
output.Chunk = uint16(localOffset / oddStride)
|
||||
output.Shard = uint16(oddStripeOffset/baseStride) + output.Chunk
|
||||
output.ShardOffset = localOffset % oddStride
|
||||
output.ChunkOffset = oddChunkOffset + output.ShardOffset
|
||||
output.Size = int32(min(end-offset, oddStride-output.ShardOffset))
|
||||
} else {
|
||||
shard := offset / baseStride
|
||||
output.Shard = uint16(offset / baseStride) // which shard the data is in
|
||||
output.Chunk = uint16(shard % shards)
|
||||
output.ShardOffset = offset % baseStride
|
||||
output.ChunkOffset = ((shard / shards) * baseStride) + output.ShardOffset
|
||||
output.Size = int32(min(end-offset, baseStride-output.ShardOffset))
|
||||
}
|
||||
if output.Size <= 0 {
|
||||
panic(fmt.Errorf("invalid read size"))
|
||||
}
|
||||
outputs = append(outputs, output)
|
||||
offset += int64(output.Size)
|
||||
}
|
||||
return outputs
|
||||
}
|
|
@ -1,153 +0,0 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPlan_EvenStride_Full(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 18,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf("Expected \n%#v to equal \n%#v at %d", actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_EvenStride_Short2(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 16,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_EvenStride_Short1(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 17,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_OddStride_Short1(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 13,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 1},
|
||||
ChunkShardMeta{Shard: 7, ShardOffset: 0, Chunk: 1, ChunkOffset: 4, GlobalOffset: 13, Size: 1},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_OddStride_Full(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 14,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 1},
|
||||
ChunkShardMeta{Shard: 7, ShardOffset: 0, Chunk: 1, ChunkOffset: 4, GlobalOffset: 13, Size: 1},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -4,16 +4,6 @@ import (
|
|||
"crypto/sha256"
|
||||
)
|
||||
|
||||
func min(input ...int64) int64 {
|
||||
min := input[0]
|
||||
for i := 1; i < len(input); i++ {
|
||||
if input[i] < min {
|
||||
min = input[i]
|
||||
}
|
||||
}
|
||||
return min
|
||||
}
|
||||
|
||||
func sha256sum(input []byte) []byte {
|
||||
v := sha256.Sum256(input)
|
||||
return v[:]
|
||||
|
|
97
pkg/file/meta/main.go
Normal file
97
pkg/file/meta/main.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type Meta struct {
|
||||
Params
|
||||
Name string `json:"name"`
|
||||
ShardMerkle FileHash `json:"shardMerkle"`
|
||||
ShardHashes HashList `json:"shardHashes"`
|
||||
ParityMerkle FileHash `json:"parityMerkle"`
|
||||
ParityHashes HashList `json:"parityHashes"`
|
||||
}
|
||||
|
||||
type FileHash []byte
|
||||
|
||||
func (hash FileHash) MarshalJSON() ([]byte, error) {
|
||||
text, err := hash.MarshalText()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(string(text))
|
||||
}
|
||||
|
||||
func (hash FileHash) MarshalText() ([]byte, error) {
|
||||
return []byte(base64.StdEncoding.EncodeToString(hash)), nil
|
||||
}
|
||||
|
||||
func (hash *FileHash) UnmarshalJSON(data []byte) error {
|
||||
s := ""
|
||||
json.Unmarshal(data, &s)
|
||||
return hash.UnmarshalText([]byte(s))
|
||||
}
|
||||
|
||||
func (hash *FileHash) UnmarshalText(text []byte) error {
|
||||
raw, err := base64.StdEncoding.DecodeString(string(text))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(raw) != sha256.Size {
|
||||
return fmt.Errorf("expected hash to be %d bytes (got %d)", sha256.Size, len(raw))
|
||||
}
|
||||
*hash = raw
|
||||
return nil
|
||||
}
|
||||
|
||||
type HashList [][]byte
|
||||
|
||||
func (hashes HashList) MarshalJSON() ([]byte, error) {
|
||||
text, err := hashes.MarshalText()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(string(text))
|
||||
}
|
||||
|
||||
func (hashes HashList) MarshalText() ([]byte, error) {
|
||||
// fmt.Printf("marshal hash list...\n")
|
||||
hashesLen := len(hashes)
|
||||
hashLen := 0
|
||||
dataLen := 0
|
||||
if hashesLen != 0 {
|
||||
hashLen = len(hashes[0])
|
||||
dataLen = hashesLen * hashLen
|
||||
}
|
||||
data := make([]byte, dataLen)
|
||||
for i, hash := range hashes {
|
||||
copy(data[i*hashLen:], hash)
|
||||
}
|
||||
return []byte(base64.StdEncoding.EncodeToString(data)), nil
|
||||
}
|
||||
|
||||
func (hashes *HashList) UnmarshalJSON(data []byte) error {
|
||||
s := ""
|
||||
json.Unmarshal(data, &s)
|
||||
return hashes.UnmarshalText([]byte(s))
|
||||
}
|
||||
|
||||
func (hashes *HashList) UnmarshalText(text []byte) error {
|
||||
raw, err := base64.StdEncoding.DecodeString(string(text))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(raw)%sha256.Size != 0 {
|
||||
return fmt.Errorf("expected hash list to be multiple of %d bytes", sha256.Size)
|
||||
}
|
||||
split := make([][]byte, len(raw)/sha256.Size)
|
||||
for i := 0; i < len(raw); i += sha256.Size {
|
||||
split[i/sha256.Size] = raw[i : i+sha256.Size]
|
||||
}
|
||||
*hashes = split
|
||||
return nil
|
||||
}
|
65
pkg/file/meta/params.go
Normal file
65
pkg/file/meta/params.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
chunkmeta "git.keganmyers.com/terribleplan/file-store/pkg/chunk/meta"
|
||||
"git.keganmyers.com/terribleplan/file-store/pkg/util"
|
||||
)
|
||||
|
||||
type Params struct {
|
||||
Size int64 `json:"s,string"`
|
||||
Stride int32 `json:"t"`
|
||||
Shards uint16 `json:"h"`
|
||||
Parity uint16 `json:"p"`
|
||||
}
|
||||
|
||||
func (params Params) Plan(offset, size int64) []chunkmeta.ShardMeta {
|
||||
outputs := []chunkmeta.ShardMeta{}
|
||||
end := offset + size
|
||||
if (end - 1) > params.Size {
|
||||
panic(fmt.Errorf("attempted read beyond end of file"))
|
||||
}
|
||||
// constant
|
||||
shards := int64(params.Shards)
|
||||
baseStride := int64(params.Stride)
|
||||
baseStripeWidth := baseStride * shards
|
||||
|
||||
oddStripeOffset := (params.Size / baseStripeWidth) * baseStripeWidth
|
||||
oddChunkOffset := oddStripeOffset / shards
|
||||
oddSize := (params.Size - oddStripeOffset)
|
||||
oddStride := oddSize / shards
|
||||
if oddSize%shards > 0 {
|
||||
oddStride += 1
|
||||
}
|
||||
|
||||
for offset < end {
|
||||
output := chunkmeta.ShardMeta{GlobalOffset: offset}
|
||||
// var chunk uint64 // which chunk the shard is in
|
||||
// var chunkOffset uint64 // the location within the chunk that the shard begins
|
||||
// var shardOffset uint64 // the location within the shard at which the data begins
|
||||
// var readSize uint64 // the number of bytes to read from the shard
|
||||
|
||||
if offset >= oddStripeOffset {
|
||||
localOffset := offset - oddStripeOffset // the location relative to the odd data at which the desired data begins
|
||||
output.Chunk = uint16(localOffset / oddStride)
|
||||
output.Shard = uint16(oddStripeOffset/baseStride) + output.Chunk
|
||||
output.ShardOffset = localOffset % oddStride
|
||||
output.ChunkOffset = oddChunkOffset + output.ShardOffset
|
||||
output.Size = int32(util.Min(end-offset, oddStride-output.ShardOffset))
|
||||
} else {
|
||||
shard := offset / baseStride
|
||||
output.Shard = uint16(offset / baseStride) // which shard the data is in
|
||||
output.Chunk = uint16(shard % shards)
|
||||
output.ShardOffset = offset % baseStride
|
||||
output.ChunkOffset = ((shard / shards) * baseStride) + output.ShardOffset
|
||||
output.Size = int32(util.Min(end-offset, baseStride-output.ShardOffset))
|
||||
}
|
||||
if output.Size <= 0 {
|
||||
panic(fmt.Errorf("invalid read size"))
|
||||
}
|
||||
outputs = append(outputs, output)
|
||||
offset += int64(output.Size)
|
||||
}
|
||||
return outputs
|
||||
}
|
153
pkg/file/meta/params_test.go
Normal file
153
pkg/file/meta/params_test.go
Normal file
|
@ -0,0 +1,153 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPlan_EvenStride_Full(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 18,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf("Expected \n%#v to equal \n%#v at %d", actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_EvenStride_Short2(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 16,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_EvenStride_Short1(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 17,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_OddStride_Short1(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 13,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 1},
|
||||
ChunkShardMeta{Shard: 7, ShardOffset: 0, Chunk: 1, ChunkOffset: 4, GlobalOffset: 13, Size: 1},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_OddStride_Full(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 14,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Shard: 3, ShardOffset: 1, Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Shard: 4, ShardOffset: 0, Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Shard: 5, ShardOffset: 0, Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Shard: 6, ShardOffset: 0, Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 1},
|
||||
ChunkShardMeta{Shard: 7, ShardOffset: 0, Chunk: 1, ChunkOffset: 4, GlobalOffset: 13, Size: 1},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
21
pkg/file/meta/read.go
Normal file
21
pkg/file/meta/read.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
)
|
||||
|
||||
func ReadBytes(input io.ReadCloser) ([]byte, error) {
|
||||
defer input.Close()
|
||||
meta := &Meta{}
|
||||
buf := &bytes.Buffer{}
|
||||
if _, err := io.Copy(buf, input); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bufBytes := buf.Bytes()
|
||||
if err := json.Unmarshal(bufBytes, meta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bufBytes, nil
|
||||
}
|
104
pkg/ifs/dirfs.go
Normal file
104
pkg/ifs/dirfs.go
Normal file
|
@ -0,0 +1,104 @@
|
|||
package ifs
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
type FS interface {
|
||||
Mkdir(name string, perm fs.FileMode) error
|
||||
MkdirAll(name string, perm fs.FileMode) error
|
||||
Open(name string) (fs.File, error)
|
||||
OpenFile(name string, flag int, perm fs.FileMode) (File, error)
|
||||
Remove(name string) error
|
||||
RemoveAll(name string) error
|
||||
}
|
||||
|
||||
func DirFS(dir string) FS {
|
||||
return dirFS(dir)
|
||||
}
|
||||
|
||||
type File interface {
|
||||
Stat() (fs.FileInfo, error)
|
||||
Read([]byte) (int, error)
|
||||
Write([]byte) (int, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type dirFS string
|
||||
|
||||
func (dir dirFS) validPath(name, op string) error {
|
||||
if !fs.ValidPath(name) || runtime.GOOS == "windows" && containsAny(name, `\:`) {
|
||||
return &os.PathError{Op: op, Path: name, Err: os.ErrInvalid}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dir dirFS) Mkdir(name string, perm fs.FileMode) error {
|
||||
if err := dir.validPath(name, "mkdir"); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Mkdir(string(dir)+"/"+name, perm)
|
||||
}
|
||||
|
||||
func (dir dirFS) MkdirAll(name string, perm fs.FileMode) error {
|
||||
if err := dir.validPath(name, "mkdirAll"); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.MkdirAll(string(dir)+"/"+name, perm)
|
||||
}
|
||||
|
||||
func (dir dirFS) Open(name string) (fs.File, error) {
|
||||
if err := dir.validPath(name, "open"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return os.Open(string(dir) + "/" + name)
|
||||
}
|
||||
|
||||
func (dir dirFS) OpenFile(name string, flag int, perm fs.FileMode) (File, error) {
|
||||
if err := dir.validPath(name, "openFile"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f, err := os.OpenFile(string(dir)+"/"+name, flag, perm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (dir dirFS) Remove(name string) error {
|
||||
if err := dir.validPath(name, "remove"); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Remove(string(dir) + "/" + name)
|
||||
}
|
||||
|
||||
func (dir dirFS) RemoveAll(name string) error {
|
||||
if err := dir.validPath(name, "removeAll"); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.RemoveAll(string(dir) + "/" + name)
|
||||
}
|
||||
|
||||
func (dir dirFS) Stat(name string) (fs.FileInfo, error) {
|
||||
if err := dir.validPath(name, "stat"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f, err := os.Stat(string(dir) + "/" + name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func containsAny(s, chars string) bool {
|
||||
for i := 0; i < len(s); i++ {
|
||||
for j := 0; j < len(chars); j++ {
|
||||
if s[i] == chars[j] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
95
pkg/ifs/walk_dir.go
Normal file
95
pkg/ifs/walk_dir.go
Normal file
|
@ -0,0 +1,95 @@
|
|||
package ifs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"io/fs"
|
||||
"path"
|
||||
)
|
||||
|
||||
var SkipDir = fs.SkipDir
|
||||
|
||||
func WalkDir(fsys FS, root string, fn fs.WalkDirFunc) error {
|
||||
info, err := fs.Stat(fsys, root)
|
||||
if err != nil {
|
||||
err = fn(root, nil, err)
|
||||
} else {
|
||||
err = walkDir(fsys, root, &statDirEntry{info}, fn)
|
||||
}
|
||||
if err == SkipDir {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type ReadDirFn func(fs.DirEntry) error
|
||||
|
||||
func ReadDir(fsys fs.FS, name string, fn ReadDirFn) error {
|
||||
file, err := fsys.Open(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
dir, ok := file.(fs.ReadDirFile)
|
||||
if !ok {
|
||||
return &fs.PathError{Op: "readdir", Path: name, Err: errors.New("not implemented")}
|
||||
}
|
||||
|
||||
for {
|
||||
list, err := dir.ReadDir(100)
|
||||
if err != nil && err != io.EOF {
|
||||
return err
|
||||
}
|
||||
for _, dirent := range list {
|
||||
if err := fn(dirent); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func walkDir(fsys fs.FS, name string, d fs.DirEntry, walkDirFn fs.WalkDirFunc) error {
|
||||
if err := walkDirFn(name, d, nil); err != nil || !d.IsDir() {
|
||||
if err == SkipDir && d.IsDir() {
|
||||
// Successfully skipped directory.
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ReadDir(fsys, name, func(d1 fs.DirEntry) error {
|
||||
name1 := path.Join(name, d1.Name())
|
||||
if err := walkDir(fsys, name1, d1, walkDirFn); err != nil {
|
||||
if err == SkipDir {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
// Second call, to report ReadDir error.
|
||||
err = walkDirFn(name, d, err)
|
||||
if err != nil {
|
||||
if err == SkipDir && d.IsDir() {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type statDirEntry struct {
|
||||
info fs.FileInfo
|
||||
}
|
||||
|
||||
func (d *statDirEntry) Name() string { return d.info.Name() }
|
||||
func (d *statDirEntry) IsDir() bool { return d.info.IsDir() }
|
||||
func (d *statDirEntry) Type() fs.FileMode { return d.info.Mode().Type() }
|
||||
func (d *statDirEntry) Info() (fs.FileInfo, error) { return d.info, nil }
|
|
@ -8,6 +8,7 @@ import (
|
|||
"path/filepath"
|
||||
|
||||
eenc "git.keganmyers.com/terribleplan/file-store/pkg/erasureencode"
|
||||
filemeta "git.keganmyers.com/terribleplan/file-store/pkg/file/meta"
|
||||
)
|
||||
|
||||
func StoreFile(inputPath, outputPath string, size int32, shards, parity uint16, name string) error {
|
||||
|
@ -98,7 +99,7 @@ func ReadFile(inputPath, outputPath string, overwrite bool) error {
|
|||
}
|
||||
defer metaFile.Close()
|
||||
|
||||
meta := &eenc.EEMeta{}
|
||||
meta := &filemeta.Meta{}
|
||||
dec := json.NewDecoder(metaFile)
|
||||
if err := dec.Decode(meta); err != nil {
|
||||
panic(err)
|
||||
|
|
128
pkg/storeserver/client.go
Normal file
128
pkg/storeserver/client.go
Normal file
|
@ -0,0 +1,128 @@
|
|||
package storeserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"git.keganmyers.com/terribleplan/file-store/pkg/proto"
|
||||
)
|
||||
|
||||
var _ = (StoreServer)((*StoreServerClient)(nil))
|
||||
|
||||
func NewClient(address string) (*StoreServerClient, error) {
|
||||
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &StoreServerClient{gc: conn, pc: proto.NewStoreServerClient(conn)}, nil
|
||||
}
|
||||
|
||||
type StoreServerClient struct {
|
||||
gc *grpc.ClientConn
|
||||
pc proto.StoreServerClient
|
||||
}
|
||||
|
||||
func (c *StoreServerClient) Close() error {
|
||||
return c.gc.Close()
|
||||
}
|
||||
|
||||
func (c *StoreServerClient) ListFiles(fn ListFilesFn) error {
|
||||
stream, err := c.pc.ListFiles(context.Background(), &proto.Empty{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
|
||||
if file, err := stream.Recv(); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
} else if err := fn(file.FileId); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *StoreServerClient) ReadFile(fileId string) ([]byte, error) {
|
||||
if err := ValidateFileId(fileId); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sfm, err := c.pc.ReadFile(context.Background(), &proto.FileIdentifier{FileId: fileId})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sfm.MetaJson, nil
|
||||
}
|
||||
|
||||
func (c *StoreServerClient) DeleteFile(fileId string) error {
|
||||
if err := ValidateFileId(fileId); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := c.pc.DeleteFile(context.Background(), &proto.FileIdentifier{FileId: fileId})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *StoreServerClient) WriteFile(fileId string, metaJson []byte) error {
|
||||
if err := ValidateFileId(fileId); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := c.pc.WriteFile(context.Background(), &proto.StoreFileMeta{FileIdentifier: &proto.FileIdentifier{FileId: fileId}, MetaJson: metaJson})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type chunkWriter struct {
|
||||
stream proto.StoreServer_WriteChunkClient
|
||||
scd *proto.StoreChunkData
|
||||
}
|
||||
|
||||
func (cw *chunkWriter) Write(p []byte) (int, error) {
|
||||
cw.scd.Data.Data = p
|
||||
cw.scd.DataIdentifier.Size = int64(len(p))
|
||||
if err := cw.stream.Send(cw.scd); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
cw.scd.DataIdentifier.Offset += cw.scd.DataIdentifier.Size
|
||||
return int(cw.scd.DataIdentifier.Size), nil
|
||||
}
|
||||
|
||||
func (c *StoreServerClient) WriteChunk(fileId string, chunkId uint16, data io.ReadCloser) error {
|
||||
defer data.Close()
|
||||
if err := ValidateFileId(fileId); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stream, err := c.pc.WriteChunk(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w := &chunkWriter{stream: stream, scd: &proto.StoreChunkData{
|
||||
DataIdentifier: &proto.ChunkDataIdentifier{
|
||||
Chunk: &proto.ChunkIdentifier{
|
||||
FileIdentifier: &proto.FileIdentifier{
|
||||
FileId: fileId,
|
||||
},
|
||||
ChunkId: uint32(chunkId),
|
||||
},
|
||||
},
|
||||
Data: &proto.ChunkData{},
|
||||
}}
|
||||
if _, err := io.Copy(w, data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := stream.CloseAndRecv(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
138
pkg/storeserver/local.go
Normal file
138
pkg/storeserver/local.go
Normal file
|
@ -0,0 +1,138 @@
|
|||
package storeserver
|
||||
|
||||
import (
|
||||
// "context"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
filemeta "git.keganmyers.com/terribleplan/file-store/pkg/file/meta"
|
||||
"git.keganmyers.com/terribleplan/file-store/pkg/ifs"
|
||||
// "git.keganmyers.com/terribleplan/file-store/pkg/proto"
|
||||
)
|
||||
|
||||
var _ = (StoreServer)((*LocalStoreServer)(nil))
|
||||
|
||||
func NewLocal(dir string) StoreServer {
|
||||
f := ifs.DirFS(dir)
|
||||
return &LocalStoreServer{f: f}
|
||||
}
|
||||
|
||||
type LocalStoreServer struct {
|
||||
f ifs.FS
|
||||
}
|
||||
|
||||
func (s *LocalStoreServer) fileIdPath(fileId string) (string, error) {
|
||||
if err := ValidateFileId(fileId); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return path.Join(fileId[0:2], fileId[2:4], fileId[4:6], fileId[6:8], fileId), nil
|
||||
}
|
||||
|
||||
func (s *LocalStoreServer) fileMetaPath(fileId string) (string, error) {
|
||||
fileIdPath, err := s.fileIdPath(fileId)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return path.Join(fileIdPath, "meta.json"), nil
|
||||
}
|
||||
|
||||
func (s *LocalStoreServer) ListFiles(fn ListFilesFn) error {
|
||||
return ifs.WalkDir(s.f, "/", func(path string, d fs.DirEntry, err error) error {
|
||||
if d.IsDir() && len(path) > 48 {
|
||||
return ifs.SkipDir
|
||||
}
|
||||
|
||||
if !d.IsDir() && len(path) == 48 && d.Name() == "meta.json" {
|
||||
fileId := path[strings.LastIndex(path, "/"):]
|
||||
if _, err := uuid.Parse(fileId); err == nil {
|
||||
fn(fileId)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *LocalStoreServer) ReadFile(fileId string) ([]byte, error) {
|
||||
filePath, err := s.fileMetaPath(fileId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
file, err := s.f.Open(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
meta, err := filemeta.ReadBytes(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
func (s *LocalStoreServer) WriteFile(fileId string, meta []byte) error {
|
||||
dirPath, err := s.fileIdPath(fileId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.f.MkdirAll(dirPath, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metaPath, err := s.fileMetaPath(fileId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
file, err := s.f.OpenFile(metaPath, os.O_WRONLY|os.O_EXCL|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
if err == os.ErrExist {
|
||||
existing, err := s.ReadFile(fileId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if bytes.Equal(meta, existing) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
if _, err := file.Write(meta); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *LocalStoreServer) DeleteFile(fileId string) error {
|
||||
fileIdPath, err := s.fileIdPath(fileId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.f.RemoveAll(fileIdPath)
|
||||
}
|
||||
|
||||
func (s *LocalStoreServer) WriteChunk(fileId string, chunkId uint16, data io.ReadCloser) error {
|
||||
fileIdPath, err := s.fileIdPath(fileId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
filePath := path.Join(fileIdPath, fmt.Sprintf("chunk.%04d", chunkId))
|
||||
|
||||
file, err := s.f.OpenFile(filePath, os.O_WRONLY|os.O_EXCL|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.Copy(file, data); err != nil {
|
||||
os.Remove(filePath)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
25
pkg/storeserver/main.go
Normal file
25
pkg/storeserver/main.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
package storeserver
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/google/uuid"
|
||||
// "git.keganmyers.com/terribleplan/file-store/pkg/proto"
|
||||
)
|
||||
|
||||
func ValidateFileId(fileId string) error {
|
||||
if _, err := uuid.Parse(fileId); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type StoreServer interface {
|
||||
ListFiles(ListFilesFn) error
|
||||
ReadFile(string) ([]byte, error)
|
||||
WriteFile(fileId string, meta []byte) error
|
||||
DeleteFile(fileId string) error
|
||||
WriteChunk(fileId string, chunkId uint16, data io.ReadCloser) error
|
||||
}
|
||||
|
||||
type ListFilesFn func(string) error
|
107
pkg/storeserver/server.go
Normal file
107
pkg/storeserver/server.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package storeserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"git.keganmyers.com/terribleplan/file-store/pkg/proto"
|
||||
)
|
||||
|
||||
var _ = (proto.StoreServerServer)((*protoStoreServer)(nil))
|
||||
|
||||
// proto.StoreServerServer
|
||||
func NewProto(s StoreServer, gs *grpc.Server) {
|
||||
// return
|
||||
proto.RegisterStoreServerServer(gs, &protoStoreServer{s: s})
|
||||
return
|
||||
}
|
||||
|
||||
type protoStoreServer struct {
|
||||
s StoreServer
|
||||
proto.UnimplementedStoreServerServer
|
||||
}
|
||||
|
||||
// files
|
||||
func (p *protoStoreServer) DeleteFile(context.Context, *proto.FileIdentifier) (*proto.Empty, error) {
|
||||
return nil, fmt.Errorf("todo")
|
||||
}
|
||||
|
||||
func (p *protoStoreServer) ListFiles(_ *proto.Empty, lfs proto.StoreServer_ListFilesServer) error {
|
||||
f := proto.FileIdentifier{}
|
||||
return p.s.ListFiles(func(fileId string) error {
|
||||
f.FileId = fileId
|
||||
return lfs.Send(&f)
|
||||
})
|
||||
}
|
||||
|
||||
func (p *protoStoreServer) ReadFile(context.Context, *proto.FileIdentifier) (*proto.StoreFileMeta, error) {
|
||||
return nil, fmt.Errorf("todo")
|
||||
}
|
||||
|
||||
func (p *protoStoreServer) WriteFile(_ context.Context, f *proto.StoreFileMeta) (*proto.Empty, error) {
|
||||
return &proto.Empty{}, p.s.WriteFile(f.FileIdentifier.FileId, f.MetaJson)
|
||||
// return nil, fmt.Errorf("todo")
|
||||
}
|
||||
|
||||
// chunks
|
||||
func (p *protoStoreServer) DeleteChunk(context.Context, *proto.ChunkIdentifier) (*proto.Empty, error) {
|
||||
return nil, fmt.Errorf("todo")
|
||||
}
|
||||
|
||||
func (p *protoStoreServer) DescribeChunk(context.Context, *proto.ChunkIdentifier) (*proto.StoreChunkMeta, error) {
|
||||
return nil, fmt.Errorf("todo")
|
||||
}
|
||||
|
||||
func (p *protoStoreServer) ListChunks(*proto.Empty, proto.StoreServer_ListChunksServer) error {
|
||||
return fmt.Errorf("todo")
|
||||
}
|
||||
|
||||
func (p *protoStoreServer) ReadChunk(context.Context, *proto.ChunkDataIdentifier) (*proto.ChunkData, error) {
|
||||
return nil, fmt.Errorf("todo")
|
||||
}
|
||||
|
||||
type serverChunkReader struct {
|
||||
buffer *bytes.Buffer
|
||||
stream proto.StoreServer_WriteChunkServer
|
||||
err error
|
||||
}
|
||||
|
||||
// todo
|
||||
func (r *serverChunkReader) Read(p []byte) (int, error) {
|
||||
for r.buffer.Len() < len(p) && r.err == nil {
|
||||
chunk, err := r.stream.Recv()
|
||||
if chunk != nil {
|
||||
r.buffer.Write(chunk.Data.Data)
|
||||
}
|
||||
if err != nil {
|
||||
r.err = err
|
||||
break
|
||||
}
|
||||
}
|
||||
if r.err != nil && r.err != io.EOF {
|
||||
return 0, r.err
|
||||
}
|
||||
return r.buffer.Read(p)
|
||||
}
|
||||
|
||||
// todo
|
||||
func (r *serverChunkReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *protoStoreServer) WriteChunk(stream proto.StoreServer_WriteChunkServer) error {
|
||||
chunk, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
buf.Write(chunk.Data.Data)
|
||||
p.s.WriteChunk(chunk.DataIdentifier.Chunk.FileIdentifier.FileId, uint16(chunk.DataIdentifier.Chunk.ChunkId), &serverChunkReader{buffer: buf, stream: stream})
|
||||
|
||||
return fmt.Errorf("todo")
|
||||
}
|
11
pkg/util/main.go
Normal file
11
pkg/util/main.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package util
|
||||
|
||||
func Min(input ...int64) int64 {
|
||||
min := input[0]
|
||||
for i := 1; i < len(input); i++ {
|
||||
if input[i] < min {
|
||||
min = input[i]
|
||||
}
|
||||
}
|
||||
return min
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package erasureencode
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
80
proto/FiLeStore.proto
Normal file
80
proto/FiLeStore.proto
Normal file
|
@ -0,0 +1,80 @@
|
|||
syntax = "proto3";
|
||||
package flsproto;
|
||||
option go_package = "pkg/proto";
|
||||
|
||||
service StoreServer {
|
||||
rpc ListFiles (Empty) returns (stream FileIdentifier);
|
||||
rpc ReadFile (FileIdentifier) returns (StoreFileMeta);
|
||||
// Should error if file already exists and is different
|
||||
rpc WriteFile (StoreFileMeta) returns (Empty);
|
||||
// Should only delete when there are no chunks, error otherwise
|
||||
rpc DeleteFile (FileIdentifier) returns (Empty);
|
||||
|
||||
rpc ListChunks (Empty) returns (stream StoreChunkMeta);
|
||||
rpc DescribeChunk (ChunkIdentifier) returns (StoreChunkMeta);
|
||||
rpc ReadChunk (ChunkDataIdentifier) returns (ChunkData);
|
||||
// Should error if chunk already exists
|
||||
rpc WriteChunk (stream StoreChunkData) returns (StoreChunkMeta);
|
||||
rpc DeleteChunk (ChunkIdentifier) returns (Empty);
|
||||
}
|
||||
|
||||
message Empty {}
|
||||
|
||||
message FileIdentifier {
|
||||
string fileId = 1;
|
||||
}
|
||||
|
||||
message ChunkIdentifier {
|
||||
FileIdentifier fileIdentifier = 1;
|
||||
uint32 chunkId = 2;
|
||||
}
|
||||
|
||||
message ChunkDataIdentifier {
|
||||
ChunkIdentifier chunk = 1;
|
||||
int64 offset = 2;
|
||||
int64 size = 3;
|
||||
}
|
||||
|
||||
message StoreFileMeta {
|
||||
FileIdentifier fileIdentifier = 1;
|
||||
bytes metaJson = 2;
|
||||
}
|
||||
|
||||
message StoreChunkMeta {
|
||||
ChunkIdentifier chunk = 1;
|
||||
int64 size = 2;
|
||||
}
|
||||
|
||||
message ChunkData {
|
||||
bytes data = 1;
|
||||
}
|
||||
|
||||
message StoreChunkData {
|
||||
ChunkDataIdentifier dataIdentifier = 1;
|
||||
ChunkData data = 2;
|
||||
}
|
||||
|
||||
// todo: fs stuff
|
||||
// maybe it looks like this, maybe it is more primitive/imperative...
|
||||
|
||||
// service FileServer {
|
||||
// rpc Open (stream FileOperationRequest) returns (stream FileOperationResponse);
|
||||
// }
|
||||
|
||||
// message FileOperationRequest {
|
||||
// oneof Request {
|
||||
// FileOperationRequest_Open open = 1;
|
||||
// }
|
||||
// }
|
||||
|
||||
// message FileOperationRequest_Open {
|
||||
// string path = 1;
|
||||
// repeated string mode = 2;
|
||||
// }
|
||||
|
||||
|
||||
// message FileOperationResponse {
|
||||
// oneof Response {
|
||||
|
||||
// }
|
||||
// }
|
Loading…
Reference in a new issue