file-store/pkg/storeserver/client.go

129 lines
3.1 KiB
Go
Raw Permalink Normal View History

package storeserver
import (
"context"
"io"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"git.keganmyers.com/terribleplan/file-store/pkg/proto"
)
var _ = (StoreServer)((*StoreServerClient)(nil))
func NewClient(address string) (*StoreServerClient, error) {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return &StoreServerClient{gc: conn, pc: proto.NewStoreServerClient(conn)}, nil
}
type StoreServerClient struct {
gc *grpc.ClientConn
pc proto.StoreServerClient
}
func (c *StoreServerClient) Close() error {
return c.gc.Close()
}
func (c *StoreServerClient) ListFiles(fn ListFilesFn) error {
stream, err := c.pc.ListFiles(context.Background(), &proto.Empty{})
if err != nil {
return err
}
for {
if file, err := stream.Recv(); err == io.EOF {
break
} else if err != nil {
return err
} else if err := fn(file.FileId); err != nil {
return err
}
}
return nil
}
func (c *StoreServerClient) ReadFile(fileId string) ([]byte, error) {
if err := ValidateFileId(fileId); err != nil {
return nil, err
}
sfm, err := c.pc.ReadFile(context.Background(), &proto.FileIdentifier{FileId: fileId})
if err != nil {
return nil, err
}
return sfm.MetaJson, nil
}
func (c *StoreServerClient) DeleteFile(fileId string) error {
if err := ValidateFileId(fileId); err != nil {
return err
}
_, err := c.pc.DeleteFile(context.Background(), &proto.FileIdentifier{FileId: fileId})
if err != nil {
return err
}
return nil
}
func (c *StoreServerClient) WriteFile(fileId string, metaJson []byte) error {
if err := ValidateFileId(fileId); err != nil {
return err
}
_, err := c.pc.WriteFile(context.Background(), &proto.StoreFileMeta{FileIdentifier: &proto.FileIdentifier{FileId: fileId}, MetaJson: metaJson})
if err != nil {
return err
}
return nil
}
type chunkWriter struct {
stream proto.StoreServer_WriteChunkClient
scd *proto.StoreChunkData
}
func (cw *chunkWriter) Write(p []byte) (int, error) {
cw.scd.Data.Data = p
cw.scd.DataIdentifier.Size = int64(len(p))
if err := cw.stream.Send(cw.scd); err != nil {
return 0, err
}
cw.scd.DataIdentifier.Offset += cw.scd.DataIdentifier.Size
return int(cw.scd.DataIdentifier.Size), nil
}
func (c *StoreServerClient) WriteChunk(fileId string, chunkId uint16, data io.ReadCloser) error {
defer data.Close()
if err := ValidateFileId(fileId); err != nil {
return err
}
stream, err := c.pc.WriteChunk(context.Background())
if err != nil {
return err
}
w := &chunkWriter{stream: stream, scd: &proto.StoreChunkData{
DataIdentifier: &proto.ChunkDataIdentifier{
Chunk: &proto.ChunkIdentifier{
FileIdentifier: &proto.FileIdentifier{
FileId: fileId,
},
ChunkId: uint32(chunkId),
},
},
Data: &proto.ChunkData{},
}}
if _, err := io.Copy(w, data); err != nil {
return err
}
if _, err := stream.CloseAndRecv(); err != nil {
return err
}
return nil
}