streaming updates for large entries or large index file

fix https://github.com/chrislusf/seaweedfs/issues/801
This commit is contained in:
Chris Lu 2018-12-22 11:10:08 -08:00
parent 36d13355bb
commit 5333f2984a
5 changed files with 245 additions and 139 deletions

View file

@ -2,6 +2,8 @@ package operation
import (
"context"
"fmt"
"io"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
@ -23,17 +25,30 @@ func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.Volu
func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
VolumdId: vid,
})
if err != nil {
return err
}
dataSize := len(resp.IndexFileContent)
var indexFileContent []byte
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("read index entries: %v", err)
}
indexFileContent = append(indexFileContent, resp.IndexFileContent...)
}
dataSize := len(indexFileContent)
for idx := 0; idx+NeedleEntrySize <= dataSize; idx += NeedleEntrySize {
line := resp.IndexFileContent[idx : idx+NeedleEntrySize]
line := indexFileContent[idx : idx+NeedleEntrySize]
key := BytesToNeedleId(line[:NeedleIdSize])
offset := BytesToOffset(line[NeedleIdSize : NeedleIdSize+OffsetSize])
size := util.BytesToUint32(line[NeedleIdSize+OffsetSize : NeedleIdSize+OffsetSize+SizeSize])

View file

@ -24,9 +24,9 @@ service VolumeServer {
rpc VolumeSyncStatus (VolumeSyncStatusRequest) returns (VolumeSyncStatusResponse) {
}
rpc VolumeSyncIndex (VolumeSyncIndexRequest) returns (VolumeSyncIndexResponse) {
rpc VolumeSyncIndex (VolumeSyncIndexRequest) returns (stream VolumeSyncIndexResponse) {
}
rpc VolumeSyncData (VolumeSyncDataRequest) returns (VolumeSyncDataResponse) {
rpc VolumeSyncData (VolumeSyncDataRequest) returns (stream VolumeSyncDataResponse) {
}
rpc VolumeMount (VolumeMountRequest) returns (VolumeMountResponse) {
@ -104,6 +104,7 @@ message VolumeSyncStatusRequest {
}
message VolumeSyncStatusResponse {
uint32 volumd_id = 1;
string collection = 2;
string replication = 4;
string ttl = 5;
uint64 tail_offset = 6;

View file

@ -352,6 +352,7 @@ func (m *VolumeSyncStatusRequest) GetVolumdId() uint32 {
type VolumeSyncStatusResponse struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"`
Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"`
TailOffset uint64 `protobuf:"varint,6,opt,name=tail_offset,json=tailOffset" json:"tail_offset,omitempty"`
@ -371,6 +372,13 @@ func (m *VolumeSyncStatusResponse) GetVolumdId() uint32 {
return 0
}
func (m *VolumeSyncStatusResponse) GetCollection() string {
if m != nil {
return m.Collection
}
return ""
}
func (m *VolumeSyncStatusResponse) GetReplication() string {
if m != nil {
return m.Replication
@ -723,8 +731,8 @@ type VolumeServerClient interface {
DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*DeleteCollectionResponse, error)
AssignVolume(ctx context.Context, in *AssignVolumeRequest, opts ...grpc.CallOption) (*AssignVolumeResponse, error)
VolumeSyncStatus(ctx context.Context, in *VolumeSyncStatusRequest, opts ...grpc.CallOption) (*VolumeSyncStatusResponse, error)
VolumeSyncIndex(ctx context.Context, in *VolumeSyncIndexRequest, opts ...grpc.CallOption) (*VolumeSyncIndexResponse, error)
VolumeSyncData(ctx context.Context, in *VolumeSyncDataRequest, opts ...grpc.CallOption) (*VolumeSyncDataResponse, error)
VolumeSyncIndex(ctx context.Context, in *VolumeSyncIndexRequest, opts ...grpc.CallOption) (VolumeServer_VolumeSyncIndexClient, error)
VolumeSyncData(ctx context.Context, in *VolumeSyncDataRequest, opts ...grpc.CallOption) (VolumeServer_VolumeSyncDataClient, error)
VolumeMount(ctx context.Context, in *VolumeMountRequest, opts ...grpc.CallOption) (*VolumeMountResponse, error)
VolumeUnmount(ctx context.Context, in *VolumeUnmountRequest, opts ...grpc.CallOption) (*VolumeUnmountResponse, error)
}
@ -809,22 +817,68 @@ func (c *volumeServerClient) VolumeSyncStatus(ctx context.Context, in *VolumeSyn
return out, nil
}
func (c *volumeServerClient) VolumeSyncIndex(ctx context.Context, in *VolumeSyncIndexRequest, opts ...grpc.CallOption) (*VolumeSyncIndexResponse, error) {
out := new(VolumeSyncIndexResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VolumeSyncIndex", in, out, c.cc, opts...)
func (c *volumeServerClient) VolumeSyncIndex(ctx context.Context, in *VolumeSyncIndexRequest, opts ...grpc.CallOption) (VolumeServer_VolumeSyncIndexClient, error) {
stream, err := grpc.NewClientStream(ctx, &_VolumeServer_serviceDesc.Streams[0], c.cc, "/volume_server_pb.VolumeServer/VolumeSyncIndex", opts...)
if err != nil {
return nil, err
}
return out, nil
x := &volumeServerVolumeSyncIndexClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
func (c *volumeServerClient) VolumeSyncData(ctx context.Context, in *VolumeSyncDataRequest, opts ...grpc.CallOption) (*VolumeSyncDataResponse, error) {
out := new(VolumeSyncDataResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VolumeSyncData", in, out, c.cc, opts...)
type VolumeServer_VolumeSyncIndexClient interface {
Recv() (*VolumeSyncIndexResponse, error)
grpc.ClientStream
}
type volumeServerVolumeSyncIndexClient struct {
grpc.ClientStream
}
func (x *volumeServerVolumeSyncIndexClient) Recv() (*VolumeSyncIndexResponse, error) {
m := new(VolumeSyncIndexResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *volumeServerClient) VolumeSyncData(ctx context.Context, in *VolumeSyncDataRequest, opts ...grpc.CallOption) (VolumeServer_VolumeSyncDataClient, error) {
stream, err := grpc.NewClientStream(ctx, &_VolumeServer_serviceDesc.Streams[1], c.cc, "/volume_server_pb.VolumeServer/VolumeSyncData", opts...)
if err != nil {
return nil, err
}
return out, nil
x := &volumeServerVolumeSyncDataClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type VolumeServer_VolumeSyncDataClient interface {
Recv() (*VolumeSyncDataResponse, error)
grpc.ClientStream
}
type volumeServerVolumeSyncDataClient struct {
grpc.ClientStream
}
func (x *volumeServerVolumeSyncDataClient) Recv() (*VolumeSyncDataResponse, error) {
m := new(VolumeSyncDataResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *volumeServerClient) VolumeMount(ctx context.Context, in *VolumeMountRequest, opts ...grpc.CallOption) (*VolumeMountResponse, error) {
@ -857,8 +911,8 @@ type VolumeServerServer interface {
DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error)
AssignVolume(context.Context, *AssignVolumeRequest) (*AssignVolumeResponse, error)
VolumeSyncStatus(context.Context, *VolumeSyncStatusRequest) (*VolumeSyncStatusResponse, error)
VolumeSyncIndex(context.Context, *VolumeSyncIndexRequest) (*VolumeSyncIndexResponse, error)
VolumeSyncData(context.Context, *VolumeSyncDataRequest) (*VolumeSyncDataResponse, error)
VolumeSyncIndex(*VolumeSyncIndexRequest, VolumeServer_VolumeSyncIndexServer) error
VolumeSyncData(*VolumeSyncDataRequest, VolumeServer_VolumeSyncDataServer) error
VolumeMount(context.Context, *VolumeMountRequest) (*VolumeMountResponse, error)
VolumeUnmount(context.Context, *VolumeUnmountRequest) (*VolumeUnmountResponse, error)
}
@ -1011,40 +1065,46 @@ func _VolumeServer_VolumeSyncStatus_Handler(srv interface{}, ctx context.Context
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VolumeSyncIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeSyncIndexRequest)
if err := dec(in); err != nil {
return nil, err
func _VolumeServer_VolumeSyncIndex_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(VolumeSyncIndexRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
if interceptor == nil {
return srv.(VolumeServerServer).VolumeSyncIndex(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/VolumeSyncIndex",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).VolumeSyncIndex(ctx, req.(*VolumeSyncIndexRequest))
}
return interceptor(ctx, in, info, handler)
return srv.(VolumeServerServer).VolumeSyncIndex(m, &volumeServerVolumeSyncIndexServer{stream})
}
func _VolumeServer_VolumeSyncData_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeSyncDataRequest)
if err := dec(in); err != nil {
return nil, err
type VolumeServer_VolumeSyncIndexServer interface {
Send(*VolumeSyncIndexResponse) error
grpc.ServerStream
}
if interceptor == nil {
return srv.(VolumeServerServer).VolumeSyncData(ctx, in)
type volumeServerVolumeSyncIndexServer struct {
grpc.ServerStream
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/VolumeSyncData",
func (x *volumeServerVolumeSyncIndexServer) Send(m *VolumeSyncIndexResponse) error {
return x.ServerStream.SendMsg(m)
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).VolumeSyncData(ctx, req.(*VolumeSyncDataRequest))
func _VolumeServer_VolumeSyncData_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(VolumeSyncDataRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return interceptor(ctx, in, info, handler)
return srv.(VolumeServerServer).VolumeSyncData(m, &volumeServerVolumeSyncDataServer{stream})
}
type VolumeServer_VolumeSyncDataServer interface {
Send(*VolumeSyncDataResponse) error
grpc.ServerStream
}
type volumeServerVolumeSyncDataServer struct {
grpc.ServerStream
}
func (x *volumeServerVolumeSyncDataServer) Send(m *VolumeSyncDataResponse) error {
return x.ServerStream.SendMsg(m)
}
func _VolumeServer_VolumeMount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
@ -1119,14 +1179,6 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
MethodName: "VolumeSyncStatus",
Handler: _VolumeServer_VolumeSyncStatus_Handler,
},
{
MethodName: "VolumeSyncIndex",
Handler: _VolumeServer_VolumeSyncIndex_Handler,
},
{
MethodName: "VolumeSyncData",
Handler: _VolumeServer_VolumeSyncData_Handler,
},
{
MethodName: "VolumeMount",
Handler: _VolumeServer_VolumeMount_Handler,
@ -1136,76 +1188,87 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
Handler: _VolumeServer_VolumeUnmount_Handler,
},
},
Streams: []grpc.StreamDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "VolumeSyncIndex",
Handler: _VolumeServer_VolumeSyncIndex_Handler,
ServerStreams: true,
},
{
StreamName: "VolumeSyncData",
Handler: _VolumeServer_VolumeSyncData_Handler,
ServerStreams: true,
},
},
Metadata: "volume_server.proto",
}
func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 1013 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x57, 0xdf, 0x72, 0xdb, 0xc4,
0x17, 0x8e, 0x62, 0x3b, 0x76, 0x8e, 0xed, 0x5f, 0xfd, 0x5b, 0xa7, 0x89, 0xaa, 0x42, 0x30, 0x02,
0x5a, 0xa7, 0x0d, 0x61, 0x68, 0x07, 0x28, 0xc3, 0x0d, 0x90, 0x00, 0x93, 0x8b, 0x4e, 0x99, 0xcd,
0xb4, 0xc3, 0x0c, 0x9d, 0xf1, 0x28, 0xd2, 0x3a, 0xd9, 0x89, 0x2c, 0xa9, 0xda, 0x55, 0x26, 0xe5,
0x0d, 0x78, 0x04, 0xae, 0xb9, 0xe1, 0x9d, 0x78, 0x19, 0x66, 0xff, 0x48, 0xd1, 0x3f, 0x57, 0x82,
0xbb, 0xd5, 0xb7, 0xe7, 0x9c, 0x6f, 0xcf, 0xd9, 0xb3, 0xe7, 0xb3, 0x61, 0x7a, 0x1d, 0xfa, 0xc9,
0x8a, 0x2c, 0x18, 0x89, 0xaf, 0x49, 0x7c, 0x14, 0xc5, 0x21, 0x0f, 0xd1, 0xa4, 0x00, 0x2e, 0xa2,
0x73, 0xfb, 0x33, 0x40, 0xdf, 0x3b, 0xdc, 0xbd, 0x3c, 0x21, 0x3e, 0xe1, 0x04, 0x93, 0x37, 0x09,
0x61, 0x1c, 0xdd, 0x83, 0xc1, 0x92, 0xfa, 0x64, 0x41, 0x3d, 0x66, 0x1a, 0xb3, 0xce, 0x7c, 0x1b,
0xf7, 0xc5, 0xf7, 0xa9, 0xc7, 0xec, 0x17, 0x30, 0x2d, 0x38, 0xb0, 0x28, 0x0c, 0x18, 0x41, 0xcf,
0xa0, 0x1f, 0x13, 0x96, 0xf8, 0x5c, 0x39, 0x0c, 0x9f, 0xec, 0x1f, 0x95, 0xb9, 0x8e, 0x32, 0x97,
0xc4, 0xe7, 0x38, 0x35, 0xb7, 0x29, 0x8c, 0xf2, 0x1b, 0x68, 0x0f, 0xfa, 0x9a, 0xdb, 0x34, 0x66,
0xc6, 0x7c, 0x1b, 0x6f, 0x29, 0x6a, 0xb4, 0x0b, 0x5b, 0x8c, 0x3b, 0x3c, 0x61, 0xe6, 0xe6, 0xcc,
0x98, 0xf7, 0xb0, 0xfe, 0x42, 0x3b, 0xd0, 0x23, 0x71, 0x1c, 0xc6, 0x66, 0x47, 0x9a, 0xab, 0x0f,
0x84, 0xa0, 0xcb, 0xe8, 0x6f, 0xc4, 0xec, 0xce, 0x8c, 0xf9, 0x18, 0xcb, 0xb5, 0xdd, 0x87, 0xde,
0x0f, 0xab, 0x88, 0xbf, 0xb5, 0xbf, 0x02, 0xf3, 0x95, 0xe3, 0x26, 0xc9, 0xea, 0x95, 0x3c, 0xe3,
0xf1, 0x25, 0x71, 0xaf, 0xd2, 0xdc, 0xef, 0xc3, 0xb6, 0x3c, 0xb9, 0x97, 0x9e, 0x60, 0x8c, 0x07,
0x0a, 0x38, 0xf5, 0xec, 0x6f, 0xe1, 0x5e, 0x8d, 0xa3, 0xae, 0xc1, 0x47, 0x30, 0xbe, 0x70, 0xe2,
0x73, 0xe7, 0x82, 0x2c, 0x62, 0x87, 0xd3, 0x50, 0x7a, 0x1b, 0x78, 0xa4, 0x41, 0x2c, 0x30, 0xfb,
0x57, 0xb0, 0x0a, 0x11, 0xc2, 0x55, 0xe4, 0xb8, 0xbc, 0x0d, 0x39, 0x9a, 0xc1, 0x30, 0x8a, 0x89,
0xe3, 0xfb, 0xa1, 0xeb, 0x70, 0x22, 0xab, 0xd0, 0xc1, 0x79, 0xc8, 0x7e, 0x1f, 0xee, 0xd7, 0x06,
0x57, 0x07, 0xb4, 0x9f, 0x95, 0x4e, 0x1f, 0xae, 0x56, 0xb4, 0x15, 0xb5, 0xfd, 0x5e, 0xe5, 0xd4,
0xd2, 0x53, 0xc7, 0xfd, 0xba, 0xb4, 0xeb, 0x13, 0x27, 0x48, 0xa2, 0x56, 0x81, 0xcb, 0x27, 0x4e,
0x5d, 0xb3, 0xc8, 0x7b, 0xaa, 0x39, 0x8e, 0x43, 0xdf, 0x27, 0x2e, 0xa7, 0x61, 0x90, 0x86, 0xdd,
0x07, 0x70, 0x33, 0x50, 0xb7, 0x4a, 0x0e, 0xb1, 0x2d, 0x30, 0xab, 0xae, 0x3a, 0xec, 0x5f, 0x06,
0x4c, 0xbf, 0x63, 0x8c, 0x5e, 0x04, 0x8a, 0xb6, 0x55, 0xf9, 0x8b, 0x84, 0x9b, 0x65, 0xc2, 0xf2,
0xf5, 0x74, 0x2a, 0xd7, 0x23, 0x2c, 0x62, 0x12, 0xf9, 0xd4, 0x75, 0x64, 0x88, 0xae, 0x0c, 0x91,
0x87, 0xd0, 0x04, 0x3a, 0x9c, 0xfb, 0x66, 0x4f, 0xee, 0x88, 0xa5, 0xbd, 0x0b, 0x3b, 0xc5, 0x93,
0xea, 0x14, 0xbe, 0x84, 0x3d, 0x85, 0x9c, 0xbd, 0x0d, 0xdc, 0x33, 0xf9, 0x12, 0x5a, 0x15, 0xfc,
0x6f, 0x03, 0xcc, 0xaa, 0xa3, 0xee, 0xe0, 0xa6, 0xf6, 0xfb, 0xb7, 0xa7, 0x47, 0x1f, 0xc0, 0x90,
0x3b, 0xd4, 0x5f, 0x84, 0xcb, 0x25, 0x23, 0xdc, 0xdc, 0x9a, 0x19, 0xf3, 0x2e, 0x06, 0x01, 0xbd,
0x90, 0x08, 0x3a, 0x80, 0x89, 0xab, 0xba, 0x74, 0x11, 0x93, 0x6b, 0xca, 0x44, 0xe4, 0xbe, 0x24,
0xbe, 0xe3, 0xa6, 0xdd, 0xab, 0x60, 0x64, 0xc3, 0x98, 0x7a, 0x37, 0x0b, 0x39, 0x1c, 0xe4, 0xd3,
0x1e, 0xc8, 0x68, 0x43, 0xea, 0xdd, 0xfc, 0x48, 0x7d, 0x72, 0x26, 0x5e, 0xf8, 0x17, 0xb0, 0x7b,
0x9b, 0xdc, 0x69, 0xe0, 0x91, 0x9b, 0x56, 0x45, 0xf9, 0x29, 0x5f, 0x4c, 0xed, 0xa6, 0x4b, 0x72,
0x08, 0x88, 0x0a, 0x40, 0xf1, 0xba, 0x61, 0xc0, 0x49, 0xc0, 0x65, 0x80, 0x11, 0x9e, 0xc8, 0x1d,
0x41, 0x7e, 0xac, 0x70, 0xfb, 0x0f, 0x03, 0xee, 0xde, 0x46, 0x3a, 0x71, 0xb8, 0xd3, 0xaa, 0xb5,
0x2c, 0x18, 0x64, 0xd9, 0x6f, 0xaa, 0xbd, 0xf4, 0x5b, 0x8c, 0x3d, 0x5d, 0xbd, 0x8e, 0xdc, 0xd1,
0x5f, 0x75, 0x03, 0x4e, 0x90, 0x04, 0x84, 0x78, 0x6a, 0x7a, 0xaa, 0x6b, 0x18, 0x28, 0xe0, 0xd4,
0xb3, 0xbf, 0xc9, 0xd7, 0x46, 0x1d, 0x4d, 0xe7, 0xf8, 0x21, 0x8c, 0x6a, 0xb2, 0x1b, 0x2e, 0x73,
0x89, 0x7d, 0x0e, 0x48, 0x39, 0x3f, 0x0f, 0x93, 0xa0, 0xdd, 0xcc, 0xb8, 0x0b, 0xd3, 0x82, 0x8b,
0x6e, 0xdc, 0xa7, 0xb0, 0xa3, 0xe0, 0x97, 0xc1, 0xaa, 0x75, 0xac, 0xbd, 0xb4, 0xac, 0x99, 0x93,
0x8e, 0x96, 0x91, 0xbc, 0xa4, 0x3f, 0x8b, 0x19, 0xab, 0x82, 0x89, 0x57, 0x53, 0x84, 0xb5, 0xf9,
0x2f, 0x00, 0x27, 0x94, 0x5d, 0xa9, 0xb6, 0x17, 0xfd, 0xea, 0xd1, 0x58, 0xcf, 0x0e, 0xb1, 0x14,
0x88, 0xe3, 0xfb, 0xf2, 0x0e, 0xba, 0x58, 0x2c, 0x45, 0x99, 0x13, 0x46, 0x3c, 0x59, 0xfc, 0x2e,
0x96, 0x6b, 0x81, 0x2d, 0x63, 0xa2, 0x4a, 0xdf, 0xc5, 0x72, 0x6d, 0xff, 0x69, 0xc0, 0xf6, 0x73,
0xb2, 0xd2, 0x91, 0xf7, 0x01, 0x2e, 0xc2, 0x38, 0x4c, 0x38, 0x0d, 0x08, 0x93, 0x04, 0x3d, 0x9c,
0x43, 0xfe, 0x3b, 0x8f, 0xbc, 0x76, 0xe2, 0x2f, 0xe5, 0xed, 0x76, 0xb1, 0x5c, 0x0b, 0xec, 0x92,
0x38, 0x91, 0x7e, 0x5e, 0x72, 0x2d, 0x54, 0x91, 0x71, 0xc7, 0xbd, 0x92, 0xaf, 0xa9, 0x8b, 0xd5,
0xc7, 0x93, 0xdf, 0x01, 0x46, 0xba, 0x09, 0xa4, 0x2c, 0xa3, 0xd7, 0x30, 0xcc, 0xc9, 0x39, 0xfa,
0xb8, 0xaa, 0xda, 0xd5, 0x9f, 0x07, 0xd6, 0x27, 0x0d, 0x56, 0xba, 0xd8, 0x1b, 0x28, 0x80, 0xff,
0x57, 0xe4, 0x12, 0x3d, 0xaa, 0x7a, 0xaf, 0x13, 0x63, 0xeb, 0x71, 0x2b, 0xdb, 0x8c, 0x8f, 0xc3,
0xb4, 0x46, 0xff, 0xd0, 0x61, 0x43, 0x94, 0x82, 0x06, 0x5b, 0x9f, 0xb6, 0xb4, 0xce, 0x58, 0xdf,
0x00, 0xaa, 0x8a, 0x23, 0x7a, 0xdc, 0x18, 0xe6, 0x56, 0x7c, 0xad, 0xc3, 0x76, 0xc6, 0x6b, 0x13,
0x55, 0xb2, 0xd9, 0x98, 0x68, 0x41, 0x98, 0x1b, 0x13, 0x2d, 0x69, 0xf1, 0x06, 0xba, 0x82, 0x49,
0x59, 0x52, 0xd1, 0xc1, 0xba, 0xdf, 0x79, 0x15, 0xc5, 0xb6, 0x1e, 0xb5, 0x31, 0xcd, 0xc8, 0x16,
0x30, 0xca, 0x0b, 0x1f, 0xaa, 0x69, 0xba, 0x1a, 0x09, 0xb7, 0x1e, 0x34, 0x99, 0xe5, 0xb3, 0x29,
0x0b, 0x61, 0x5d, 0x36, 0x6b, 0x54, 0xb6, 0x2e, 0x9b, 0x75, 0xba, 0x6a, 0x6f, 0xa0, 0x4b, 0xb8,
0x53, 0x52, 0x18, 0x34, 0x7f, 0x57, 0x80, 0xbc, 0x76, 0x59, 0x07, 0x2d, 0x2c, 0x33, 0x26, 0x02,
0xff, 0x2b, 0x8e, 0x79, 0xf4, 0xf0, 0x5d, 0xee, 0x39, 0x8d, 0xb2, 0xe6, 0xcd, 0x86, 0x19, 0xcd,
0x6b, 0x18, 0xe6, 0xa6, 0x7b, 0xdd, 0xe0, 0xa8, 0xea, 0x45, 0xdd, 0xe0, 0xa8, 0x93, 0x88, 0x0d,
0x74, 0x0e, 0xe3, 0xc2, 0xbc, 0x47, 0x0f, 0xd6, 0x79, 0x16, 0x55, 0xc4, 0x7a, 0xd8, 0x68, 0x97,
0x72, 0x9c, 0x6f, 0xc9, 0xff, 0x44, 0x4f, 0xff, 0x09, 0x00, 0x00, 0xff, 0xff, 0x60, 0xe6, 0x57,
0xdf, 0x2a, 0x0d, 0x00, 0x00,
// 1015 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x57, 0x5f, 0x73, 0xdb, 0x44,
0x10, 0x8f, 0x62, 0x3b, 0x76, 0xd6, 0x36, 0x35, 0xe7, 0x34, 0x51, 0x55, 0x08, 0x46, 0x40, 0xeb,
0xb4, 0x21, 0x40, 0x3b, 0x40, 0x19, 0x5e, 0x80, 0x04, 0x98, 0x3c, 0x74, 0xca, 0x5c, 0xa6, 0x1d,
0x66, 0xe8, 0x8c, 0x47, 0x91, 0xce, 0xce, 0x11, 0x59, 0x52, 0x75, 0xa7, 0x4c, 0xca, 0x77, 0xe0,
0x03, 0xf0, 0xcc, 0x0b, 0xdf, 0x91, 0x17, 0xe6, 0xfe, 0x48, 0xd1, 0x3f, 0xd7, 0x02, 0xde, 0x4e,
0xbf, 0xdb, 0xfd, 0xed, 0xed, 0xde, 0xde, 0xfe, 0x6c, 0x18, 0x5f, 0x85, 0x7e, 0xb2, 0x24, 0x33,
0x46, 0xe2, 0x2b, 0x12, 0x1f, 0x45, 0x71, 0xc8, 0x43, 0x34, 0x2a, 0x80, 0xb3, 0xe8, 0xdc, 0xfe,
0x04, 0xd0, 0x77, 0x0e, 0x77, 0x2f, 0x4e, 0x88, 0x4f, 0x38, 0xc1, 0xe4, 0x55, 0x42, 0x18, 0x47,
0x77, 0xa0, 0x37, 0xa7, 0x3e, 0x99, 0x51, 0x8f, 0x99, 0xc6, 0xa4, 0x35, 0xdd, 0xc6, 0x5d, 0xf1,
0x7d, 0xea, 0x31, 0xfb, 0x19, 0x8c, 0x0b, 0x0e, 0x2c, 0x0a, 0x03, 0x46, 0xd0, 0x13, 0xe8, 0xc6,
0x84, 0x25, 0x3e, 0x57, 0x0e, 0xfd, 0x47, 0xfb, 0x47, 0xe5, 0x58, 0x47, 0x99, 0x4b, 0xe2, 0x73,
0x9c, 0x9a, 0xdb, 0x14, 0x06, 0xf9, 0x0d, 0xb4, 0x07, 0x5d, 0x1d, 0xdb, 0x34, 0x26, 0xc6, 0x74,
0x1b, 0x6f, 0xa9, 0xd0, 0x68, 0x17, 0xb6, 0x18, 0x77, 0x78, 0xc2, 0xcc, 0xcd, 0x89, 0x31, 0xed,
0x60, 0xfd, 0x85, 0x76, 0xa0, 0x43, 0xe2, 0x38, 0x8c, 0xcd, 0x96, 0x34, 0x57, 0x1f, 0x08, 0x41,
0x9b, 0xd1, 0xdf, 0x88, 0xd9, 0x9e, 0x18, 0xd3, 0x21, 0x96, 0x6b, 0xbb, 0x0b, 0x9d, 0xef, 0x97,
0x11, 0x7f, 0x6d, 0x7f, 0x09, 0xe6, 0x0b, 0xc7, 0x4d, 0x92, 0xe5, 0x0b, 0x79, 0xc6, 0xe3, 0x0b,
0xe2, 0x5e, 0xa6, 0xb9, 0xdf, 0x85, 0x6d, 0x79, 0x72, 0x2f, 0x3d, 0xc1, 0x10, 0xf7, 0x14, 0x70,
0xea, 0xd9, 0xdf, 0xc0, 0x9d, 0x1a, 0x47, 0x5d, 0x83, 0x0f, 0x60, 0xb8, 0x70, 0xe2, 0x73, 0x67,
0x41, 0x66, 0xb1, 0xc3, 0x69, 0x28, 0xbd, 0x0d, 0x3c, 0xd0, 0x20, 0x16, 0x98, 0xfd, 0x0b, 0x58,
0x05, 0x86, 0x70, 0x19, 0x39, 0x2e, 0x6f, 0x12, 0x1c, 0x4d, 0xa0, 0x1f, 0xc5, 0xc4, 0xf1, 0xfd,
0xd0, 0x75, 0x38, 0x91, 0x55, 0x68, 0xe1, 0x3c, 0x64, 0xbf, 0x0b, 0x77, 0x6b, 0xc9, 0xd5, 0x01,
0xed, 0x27, 0xa5, 0xd3, 0x87, 0xcb, 0x25, 0x6d, 0x14, 0xda, 0x7e, 0xa7, 0x72, 0x6a, 0xe9, 0xa9,
0x79, 0xbf, 0x2a, 0xed, 0xfa, 0xc4, 0x09, 0x92, 0xa8, 0x11, 0x71, 0xf9, 0xc4, 0xa9, 0x6b, 0xc6,
0xbc, 0xa7, 0x9a, 0xe3, 0x38, 0xf4, 0x7d, 0xe2, 0x72, 0x1a, 0x06, 0x29, 0xed, 0x3e, 0x80, 0x9b,
0x81, 0xba, 0x55, 0x72, 0x88, 0x6d, 0x81, 0x59, 0x75, 0xd5, 0xb4, 0x7f, 0x19, 0x30, 0xfe, 0x96,
0x31, 0xba, 0x08, 0x54, 0xd8, 0x46, 0xe5, 0x2f, 0x06, 0xdc, 0x2c, 0x07, 0x2c, 0x5f, 0x4f, 0xab,
0x72, 0x3d, 0xc2, 0x22, 0x26, 0x91, 0x4f, 0x5d, 0x47, 0x52, 0xb4, 0x25, 0x45, 0x1e, 0x42, 0x23,
0x68, 0x71, 0xee, 0x9b, 0x1d, 0xb9, 0x23, 0x96, 0xf6, 0x2e, 0xec, 0x14, 0x4f, 0xaa, 0x53, 0xf8,
0x02, 0xf6, 0x14, 0x72, 0xf6, 0x3a, 0x70, 0xcf, 0xe4, 0x4b, 0x68, 0x54, 0xf0, 0xbf, 0x0d, 0x30,
0xab, 0x8e, 0xba, 0x83, 0xff, 0x6f, 0xfe, 0xff, 0x36, 0x3b, 0xf4, 0x1e, 0xf4, 0xb9, 0x43, 0xfd,
0x59, 0x38, 0x9f, 0x33, 0xc2, 0xcd, 0xad, 0x89, 0x31, 0x6d, 0x63, 0x10, 0xd0, 0x33, 0x89, 0xa0,
0x03, 0x18, 0xb9, 0xaa, 0x8b, 0x67, 0x31, 0xb9, 0xa2, 0x4c, 0x30, 0x77, 0xe5, 0xc1, 0x6e, 0xb9,
0x69, 0x77, 0x2b, 0x18, 0xd9, 0x30, 0xa4, 0xde, 0xf5, 0x4c, 0x0e, 0x0f, 0xf9, 0xf4, 0x7b, 0x92,
0xad, 0x4f, 0xbd, 0xeb, 0x1f, 0xa8, 0x4f, 0xce, 0xc4, 0x04, 0xf8, 0x1c, 0x76, 0x6f, 0x92, 0x3f,
0x0d, 0x3c, 0x72, 0xdd, 0xa8, 0x68, 0x3f, 0xe6, 0x8b, 0xad, 0xdd, 0x74, 0xc9, 0x0e, 0x01, 0x51,
0x01, 0xa8, 0xb8, 0x6e, 0x18, 0x70, 0x12, 0x70, 0x49, 0x30, 0xc0, 0x23, 0xb9, 0x23, 0x82, 0x1f,
0x2b, 0xdc, 0xfe, 0xc3, 0x80, 0xdb, 0x37, 0x4c, 0x27, 0x0e, 0x77, 0x1a, 0xb5, 0x9e, 0x05, 0xbd,
0x2c, 0xfb, 0x4d, 0xb5, 0x97, 0x7e, 0x8b, 0xb1, 0xa8, 0xab, 0xd7, 0x92, 0x3b, 0xfa, 0xab, 0x6e,
0x00, 0x8a, 0x20, 0x01, 0x21, 0x9e, 0x9a, 0xae, 0xea, 0x1a, 0x7a, 0x0a, 0x38, 0xf5, 0xec, 0xaf,
0xf3, 0xb5, 0x51, 0x47, 0xd3, 0x39, 0xbe, 0x0f, 0x83, 0x9a, 0xec, 0xfa, 0xf3, 0x5c, 0x62, 0x9f,
0x01, 0x52, 0xce, 0x4f, 0xc3, 0x24, 0x68, 0x36, 0x53, 0x6e, 0xc3, 0xb8, 0xe0, 0xa2, 0x1b, 0xfb,
0x31, 0xec, 0x28, 0xf8, 0x79, 0xb0, 0x6c, 0xcc, 0xb5, 0x97, 0x96, 0x35, 0x73, 0xd2, 0x6c, 0x59,
0x90, 0xe7, 0xf4, 0x27, 0x31, 0x83, 0x15, 0x99, 0x78, 0x55, 0x45, 0x58, 0x9b, 0xff, 0x0c, 0x70,
0x42, 0xd9, 0xa5, 0x7a, 0x16, 0xa2, 0x5f, 0x3d, 0x1a, 0xeb, 0xd9, 0x22, 0x96, 0x02, 0x71, 0x7c,
0x5f, 0xde, 0x41, 0x1b, 0x8b, 0xa5, 0x28, 0x73, 0xc2, 0x88, 0x27, 0x8b, 0xdf, 0xc6, 0x72, 0x2d,
0xb0, 0x79, 0x4c, 0x54, 0xe9, 0xdb, 0x58, 0xae, 0xed, 0x3f, 0x0d, 0xd8, 0x7e, 0x4a, 0x96, 0x9a,
0x79, 0x1f, 0x60, 0x11, 0xc6, 0x61, 0xc2, 0x69, 0x40, 0x98, 0x0c, 0xd0, 0xc1, 0x39, 0xe4, 0xbf,
0xc7, 0x91, 0xd7, 0x4e, 0xfc, 0xb9, 0xbc, 0xdd, 0x36, 0x96, 0x6b, 0x81, 0x5d, 0x10, 0x27, 0xd2,
0xcf, 0x4b, 0xae, 0x85, 0x6a, 0x32, 0xee, 0xb8, 0x97, 0xf2, 0x35, 0xb5, 0xb1, 0xfa, 0x78, 0xf4,
0x3b, 0xc0, 0x40, 0x37, 0x81, 0x94, 0x6d, 0xf4, 0x12, 0xfa, 0x39, 0xb9, 0x47, 0x1f, 0x56, 0x55,
0xbd, 0xfa, 0xf3, 0xc1, 0xfa, 0x68, 0x8d, 0x95, 0x2e, 0xf6, 0x06, 0x0a, 0xe0, 0xed, 0x8a, 0x9c,
0xa2, 0x07, 0x55, 0xef, 0x55, 0x62, 0x6d, 0x3d, 0x6c, 0x64, 0x9b, 0xc5, 0xe3, 0x30, 0xae, 0xd1,
0x47, 0x74, 0xb8, 0x86, 0xa5, 0xa0, 0xd1, 0xd6, 0xc7, 0x0d, 0xad, 0xb3, 0xa8, 0xaf, 0x00, 0x55,
0xc5, 0x13, 0x3d, 0x5c, 0x4b, 0x73, 0x23, 0xce, 0xd6, 0x61, 0x33, 0xe3, 0x95, 0x89, 0x2a, 0x59,
0x5d, 0x9b, 0x68, 0x41, 0xb8, 0xd7, 0x26, 0x5a, 0xd2, 0xea, 0x0d, 0x74, 0x09, 0xa3, 0xb2, 0xe4,
0xa2, 0x83, 0x55, 0xbf, 0x03, 0x2b, 0x8a, 0x6e, 0x3d, 0x68, 0x62, 0x9a, 0x05, 0x9b, 0xc1, 0x20,
0x2f, 0x8c, 0xa8, 0xa6, 0xe9, 0x6a, 0x24, 0xde, 0xba, 0xb7, 0xce, 0x2c, 0x9f, 0x4d, 0x59, 0x28,
0xeb, 0xb2, 0x59, 0xa1, 0xc2, 0x75, 0xd9, 0xac, 0xd2, 0x5d, 0x7b, 0x03, 0xfd, 0x0a, 0xb7, 0x4a,
0x0a, 0x83, 0xa6, 0x6f, 0x22, 0xc8, 0x6b, 0x97, 0x75, 0xd0, 0xc0, 0x32, 0x8d, 0xf4, 0xa9, 0x81,
0x16, 0xf0, 0x56, 0x71, 0xd0, 0xa3, 0xfb, 0x6f, 0x22, 0xc8, 0xa9, 0x94, 0x35, 0x5d, 0x6f, 0x98,
0x0b, 0xf4, 0x12, 0xfa, 0xb9, 0x09, 0x5f, 0x37, 0x3c, 0xaa, 0x9a, 0x51, 0x37, 0x3c, 0xea, 0x64,
0x62, 0x03, 0x9d, 0xc3, 0xb0, 0x30, 0xf3, 0xd1, 0xbd, 0x55, 0x9e, 0x45, 0x25, 0xb1, 0xee, 0xaf,
0xb5, 0x4b, 0x63, 0x9c, 0x6f, 0xc9, 0xff, 0x4d, 0x8f, 0xff, 0x09, 0x00, 0x00, 0xff, 0xff, 0x5d,
0xf3, 0x60, 0x76, 0x4e, 0x0d, 0x00, 0x00,
}

View file

@ -25,13 +25,11 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server
}
func (vs *VolumeServer) VolumeSyncIndex(ctx context.Context, req *volume_server_pb.VolumeSyncIndexRequest) (*volume_server_pb.VolumeSyncIndexResponse, error) {
resp := &volume_server_pb.VolumeSyncIndexResponse{}
func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexRequest, stream volume_server_pb.VolumeServer_VolumeSyncIndexServer) error {
v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
if v == nil {
return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
return fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
}
content, err := v.IndexFileContent()
@ -42,46 +40,62 @@ func (vs *VolumeServer) VolumeSyncIndex(ctx context.Context, req *volume_server_
glog.V(2).Infof("sync volume %d index", req.VolumdId)
}
resp.IndexFileContent = content
const blockSizeLimit = 1024 * 1024 * 2
for i := 0; i < len(content); i += blockSizeLimit {
blockSize := len(content) - i
if blockSize > blockSizeLimit {
blockSize = blockSizeLimit
}
resp := &volume_server_pb.VolumeSyncIndexResponse{}
resp.IndexFileContent = content[i : i+blockSize]
stream.Send(resp)
}
return resp, nil
return nil
}
func (vs *VolumeServer) VolumeSyncData(ctx context.Context, req *volume_server_pb.VolumeSyncDataRequest) (*volume_server_pb.VolumeSyncDataResponse, error) {
resp := &volume_server_pb.VolumeSyncDataResponse{}
func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataRequest, stream volume_server_pb.VolumeServer_VolumeSyncDataServer) error {
v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
if v == nil {
return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
return fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
}
if uint32(v.SuperBlock.CompactRevision) != req.Revision {
return nil, fmt.Errorf("Requested Volume Revision is %d, but current revision is %d", req.Revision, v.SuperBlock.CompactRevision)
return fmt.Errorf("Requested Volume Revision is %d, but current revision is %d", req.Revision, v.SuperBlock.CompactRevision)
}
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(req.Offset)*types.NeedlePaddingSize, req.Size, v.Version())
if err != nil {
return nil, fmt.Errorf("read offset:%d size:%d", req.Offset, req.Size)
return fmt.Errorf("read offset:%d size:%d", req.Offset, req.Size)
}
id, err := types.ParseNeedleId(req.NeedleId)
if err != nil {
return nil, fmt.Errorf("parsing needle id %s: %v", req.NeedleId, err)
return fmt.Errorf("parsing needle id %s: %v", req.NeedleId, err)
}
n := new(storage.Needle)
n.ParseNeedleHeader(content)
if id != n.Id {
return nil, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)
return fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)
}
if err != nil {
glog.Errorf("sync volume %d data: %v", req.VolumdId, err)
}
resp.FileContent = content
const blockSizeLimit = 1024 * 1024 * 2
for i := 0; i < len(content); i += blockSizeLimit {
blockSize := len(content) - i
if blockSize > blockSizeLimit {
blockSize = blockSizeLimit
}
resp := &volume_server_pb.VolumeSyncDataResponse{}
resp.FileContent = content[i : i+blockSize]
stream.Send(resp)
}
return resp, nil
return nil
}

View file

@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
"io"
"os"
"sort"
@ -164,6 +165,7 @@ func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusRespons
if stat, err := v.dataFile.Stat(); err == nil {
syncStatus.TailOffset = uint64(stat.Size())
}
syncStatus.Collection = v.Collection
syncStatus.IdxFileSize = v.nm.IndexFileSize()
syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision)
syncStatus.Ttl = v.SuperBlock.Ttl.String()
@ -188,7 +190,7 @@ func (v *Volume) removeNeedle(key NeedleId) {
func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue, compactRevision uint16) error {
return operation.WithVolumeServerClient(volumeServer, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
VolumdId: uint32(v.Id),
Revision: uint32(compactRevision),
Offset: uint32(needleValue.Offset),
@ -198,8 +200,19 @@ func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue
if err != nil {
return err
}
var fileContent []byte
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("read needle %v: %v", needleValue.Key.String(), err)
}
fileContent = append(fileContent, resp.FileContent...)
}
offset, err := v.AppendBlob(resp.FileContent)
offset, err := v.AppendBlob(fileContent)
if err != nil {
return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
}