129 lines
3.1 KiB
Go
129 lines
3.1 KiB
Go
|
package storeserver
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"io"
|
||
|
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/credentials/insecure"
|
||
|
|
||
|
"git.keganmyers.com/terribleplan/file-store/pkg/proto"
|
||
|
)
|
||
|
|
||
|
var _ = (StoreServer)((*StoreServerClient)(nil))
|
||
|
|
||
|
func NewClient(address string) (*StoreServerClient, error) {
|
||
|
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return &StoreServerClient{gc: conn, pc: proto.NewStoreServerClient(conn)}, nil
|
||
|
}
|
||
|
|
||
|
type StoreServerClient struct {
|
||
|
gc *grpc.ClientConn
|
||
|
pc proto.StoreServerClient
|
||
|
}
|
||
|
|
||
|
func (c *StoreServerClient) Close() error {
|
||
|
return c.gc.Close()
|
||
|
}
|
||
|
|
||
|
func (c *StoreServerClient) ListFiles(fn ListFilesFn) error {
|
||
|
stream, err := c.pc.ListFiles(context.Background(), &proto.Empty{})
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
for {
|
||
|
|
||
|
if file, err := stream.Recv(); err == io.EOF {
|
||
|
break
|
||
|
} else if err != nil {
|
||
|
return err
|
||
|
} else if err := fn(file.FileId); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *StoreServerClient) ReadFile(fileId string) ([]byte, error) {
|
||
|
if err := ValidateFileId(fileId); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
sfm, err := c.pc.ReadFile(context.Background(), &proto.FileIdentifier{FileId: fileId})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return sfm.MetaJson, nil
|
||
|
}
|
||
|
|
||
|
func (c *StoreServerClient) DeleteFile(fileId string) error {
|
||
|
if err := ValidateFileId(fileId); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
_, err := c.pc.DeleteFile(context.Background(), &proto.FileIdentifier{FileId: fileId})
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *StoreServerClient) WriteFile(fileId string, metaJson []byte) error {
|
||
|
if err := ValidateFileId(fileId); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
_, err := c.pc.WriteFile(context.Background(), &proto.StoreFileMeta{FileIdentifier: &proto.FileIdentifier{FileId: fileId}, MetaJson: metaJson})
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type chunkWriter struct {
|
||
|
stream proto.StoreServer_WriteChunkClient
|
||
|
scd *proto.StoreChunkData
|
||
|
}
|
||
|
|
||
|
func (cw *chunkWriter) Write(p []byte) (int, error) {
|
||
|
cw.scd.Data.Data = p
|
||
|
cw.scd.DataIdentifier.Size = int64(len(p))
|
||
|
if err := cw.stream.Send(cw.scd); err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
cw.scd.DataIdentifier.Offset += cw.scd.DataIdentifier.Size
|
||
|
return int(cw.scd.DataIdentifier.Size), nil
|
||
|
}
|
||
|
|
||
|
func (c *StoreServerClient) WriteChunk(fileId string, chunkId uint16, data io.ReadCloser) error {
|
||
|
defer data.Close()
|
||
|
if err := ValidateFileId(fileId); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
stream, err := c.pc.WriteChunk(context.Background())
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
w := &chunkWriter{stream: stream, scd: &proto.StoreChunkData{
|
||
|
DataIdentifier: &proto.ChunkDataIdentifier{
|
||
|
Chunk: &proto.ChunkIdentifier{
|
||
|
FileIdentifier: &proto.FileIdentifier{
|
||
|
FileId: fileId,
|
||
|
},
|
||
|
ChunkId: uint32(chunkId),
|
||
|
},
|
||
|
},
|
||
|
Data: &proto.ChunkData{},
|
||
|
}}
|
||
|
if _, err := io.Copy(w, data); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if _, err := stream.CloseAndRecv(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|