volume server: synchronously report volume readonly status to master

fix https://github.com/seaweedfs/seaweedfs/issues/3628
This commit is contained in:
chrislu 2022-09-11 19:29:10 -07:00
parent 6690236754
commit b9112747b5
4 changed files with 568 additions and 204 deletions

View file

@ -27,6 +27,10 @@ service Seaweed {
} }
rpc VacuumVolume (VacuumVolumeRequest) returns (VacuumVolumeResponse) { rpc VacuumVolume (VacuumVolumeRequest) returns (VacuumVolumeResponse) {
} }
rpc VolumeMarkReadonly (VolumeMarkReadonlyRequest) returns (VolumeMarkReadonlyResponse) {
}
rpc VolumeMarkWritable (VolumeMarkWritableRequest) returns (VolumeMarkWritableResponse) {
}
rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) { rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) {
} }
rpc ListClusterNodes (ListClusterNodesRequest) returns (ListClusterNodesResponse) { rpc ListClusterNodes (ListClusterNodesRequest) returns (ListClusterNodesResponse) {
@ -303,6 +307,18 @@ message VacuumVolumeRequest {
message VacuumVolumeResponse { message VacuumVolumeResponse {
} }
message VolumeMarkReadonlyRequest {
uint32 volume_id = 1;
}
message VolumeMarkReadonlyResponse {
}
message VolumeMarkWritableRequest {
uint32 volume_id = 1;
}
message VolumeMarkWritableResponse {
}
message GetMasterConfigurationRequest { message GetMasterConfigurationRequest {
} }
message GetMasterConfigurationResponse { message GetMasterConfigurationResponse {

File diff suppressed because it is too large Load diff

View file

@ -1,8 +1,4 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.4
// source: master.proto
package master_pb package master_pb
@ -32,6 +28,8 @@ type SeaweedClient interface {
VolumeList(ctx context.Context, in *VolumeListRequest, opts ...grpc.CallOption) (*VolumeListResponse, error) VolumeList(ctx context.Context, in *VolumeListRequest, opts ...grpc.CallOption) (*VolumeListResponse, error)
LookupEcVolume(ctx context.Context, in *LookupEcVolumeRequest, opts ...grpc.CallOption) (*LookupEcVolumeResponse, error) LookupEcVolume(ctx context.Context, in *LookupEcVolumeRequest, opts ...grpc.CallOption) (*LookupEcVolumeResponse, error)
VacuumVolume(ctx context.Context, in *VacuumVolumeRequest, opts ...grpc.CallOption) (*VacuumVolumeResponse, error) VacuumVolume(ctx context.Context, in *VacuumVolumeRequest, opts ...grpc.CallOption) (*VacuumVolumeResponse, error)
VolumeMarkReadonly(ctx context.Context, in *VolumeMarkReadonlyRequest, opts ...grpc.CallOption) (*VolumeMarkReadonlyResponse, error)
VolumeMarkWritable(ctx context.Context, in *VolumeMarkWritableRequest, opts ...grpc.CallOption) (*VolumeMarkWritableResponse, error)
GetMasterConfiguration(ctx context.Context, in *GetMasterConfigurationRequest, opts ...grpc.CallOption) (*GetMasterConfigurationResponse, error) GetMasterConfiguration(ctx context.Context, in *GetMasterConfigurationRequest, opts ...grpc.CallOption) (*GetMasterConfigurationResponse, error)
ListClusterNodes(ctx context.Context, in *ListClusterNodesRequest, opts ...grpc.CallOption) (*ListClusterNodesResponse, error) ListClusterNodes(ctx context.Context, in *ListClusterNodesRequest, opts ...grpc.CallOption) (*ListClusterNodesResponse, error)
LeaseAdminToken(ctx context.Context, in *LeaseAdminTokenRequest, opts ...grpc.CallOption) (*LeaseAdminTokenResponse, error) LeaseAdminToken(ctx context.Context, in *LeaseAdminTokenRequest, opts ...grpc.CallOption) (*LeaseAdminTokenResponse, error)
@ -184,6 +182,24 @@ func (c *seaweedClient) VacuumVolume(ctx context.Context, in *VacuumVolumeReques
return out, nil return out, nil
} }
func (c *seaweedClient) VolumeMarkReadonly(ctx context.Context, in *VolumeMarkReadonlyRequest, opts ...grpc.CallOption) (*VolumeMarkReadonlyResponse, error) {
out := new(VolumeMarkReadonlyResponse)
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/VolumeMarkReadonly", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedClient) VolumeMarkWritable(ctx context.Context, in *VolumeMarkWritableRequest, opts ...grpc.CallOption) (*VolumeMarkWritableResponse, error) {
out := new(VolumeMarkWritableResponse)
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/VolumeMarkWritable", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedClient) GetMasterConfiguration(ctx context.Context, in *GetMasterConfigurationRequest, opts ...grpc.CallOption) (*GetMasterConfigurationResponse, error) { func (c *seaweedClient) GetMasterConfiguration(ctx context.Context, in *GetMasterConfigurationRequest, opts ...grpc.CallOption) (*GetMasterConfigurationResponse, error) {
out := new(GetMasterConfigurationResponse) out := new(GetMasterConfigurationResponse)
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/GetMasterConfiguration", in, out, opts...) err := c.cc.Invoke(ctx, "/master_pb.Seaweed/GetMasterConfiguration", in, out, opts...)
@ -270,6 +286,8 @@ type SeaweedServer interface {
VolumeList(context.Context, *VolumeListRequest) (*VolumeListResponse, error) VolumeList(context.Context, *VolumeListRequest) (*VolumeListResponse, error)
LookupEcVolume(context.Context, *LookupEcVolumeRequest) (*LookupEcVolumeResponse, error) LookupEcVolume(context.Context, *LookupEcVolumeRequest) (*LookupEcVolumeResponse, error)
VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error) VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error)
VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error)
VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error)
GetMasterConfiguration(context.Context, *GetMasterConfigurationRequest) (*GetMasterConfigurationResponse, error) GetMasterConfiguration(context.Context, *GetMasterConfigurationRequest) (*GetMasterConfigurationResponse, error)
ListClusterNodes(context.Context, *ListClusterNodesRequest) (*ListClusterNodesResponse, error) ListClusterNodes(context.Context, *ListClusterNodesRequest) (*ListClusterNodesResponse, error)
LeaseAdminToken(context.Context, *LeaseAdminTokenRequest) (*LeaseAdminTokenResponse, error) LeaseAdminToken(context.Context, *LeaseAdminTokenRequest) (*LeaseAdminTokenResponse, error)
@ -315,6 +333,12 @@ func (UnimplementedSeaweedServer) LookupEcVolume(context.Context, *LookupEcVolum
func (UnimplementedSeaweedServer) VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error) { func (UnimplementedSeaweedServer) VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolume not implemented") return nil, status.Errorf(codes.Unimplemented, "method VacuumVolume not implemented")
} }
func (UnimplementedSeaweedServer) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
}
func (UnimplementedSeaweedServer) VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkWritable not implemented")
}
func (UnimplementedSeaweedServer) GetMasterConfiguration(context.Context, *GetMasterConfigurationRequest) (*GetMasterConfigurationResponse, error) { func (UnimplementedSeaweedServer) GetMasterConfiguration(context.Context, *GetMasterConfigurationRequest) (*GetMasterConfigurationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetMasterConfiguration not implemented") return nil, status.Errorf(codes.Unimplemented, "method GetMasterConfiguration not implemented")
} }
@ -548,6 +572,42 @@ func _Seaweed_VacuumVolume_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Seaweed_VolumeMarkReadonly_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeMarkReadonlyRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedServer).VolumeMarkReadonly(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/master_pb.Seaweed/VolumeMarkReadonly",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedServer).VolumeMarkReadonly(ctx, req.(*VolumeMarkReadonlyRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Seaweed_VolumeMarkWritable_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeMarkWritableRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedServer).VolumeMarkWritable(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/master_pb.Seaweed/VolumeMarkWritable",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedServer).VolumeMarkWritable(ctx, req.(*VolumeMarkWritableRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Seaweed_GetMasterConfiguration_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Seaweed_GetMasterConfiguration_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetMasterConfigurationRequest) in := new(GetMasterConfigurationRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -731,6 +791,14 @@ var Seaweed_ServiceDesc = grpc.ServiceDesc{
MethodName: "VacuumVolume", MethodName: "VacuumVolume",
Handler: _Seaweed_VacuumVolume_Handler, Handler: _Seaweed_VacuumVolume_Handler,
}, },
{
MethodName: "VolumeMarkReadonly",
Handler: _Seaweed_VolumeMarkReadonly_Handler,
},
{
MethodName: "VolumeMarkWritable",
Handler: _Seaweed_VolumeMarkWritable_Handler,
},
{ {
MethodName: "GetMasterConfiguration", MethodName: "GetMasterConfiguration",
Handler: _Seaweed_GetMasterConfiguration_Handler, Handler: _Seaweed_GetMasterConfiguration_Handler,

View file

@ -148,6 +148,19 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
resp := &volume_server_pb.VolumeMarkReadonlyResponse{} resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
_, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
VolumeId: req.VolumeId,
})
if err != nil {
return fmt.Errorf("set volume %d to read only on master: %v", req.VolumeId, err)
}
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
return resp, fmt.Errorf("grpc VolumeMarkReadonly with master: %v", vs.GetMaster(), grpcErr)
}
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId)) err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
if err != nil { if err != nil {
@ -163,6 +176,19 @@ func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_serv
resp := &volume_server_pb.VolumeMarkWritableResponse{} resp := &volume_server_pb.VolumeMarkWritableResponse{}
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
_, err := client.VolumeMarkWritable(context.Background(), &master_pb.VolumeMarkWritableRequest{
VolumeId: req.VolumeId,
})
if err != nil {
return fmt.Errorf("set volume %d to writable on master: %v", req.VolumeId, err)
}
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
return resp, fmt.Errorf("grpc VolumeMarkWritable with master: %v", vs.GetMaster(), grpcErr)
}
err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId)) err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
if err != nil { if err != nil {