mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
master: implement grpc VolumeMarkWritable
fix https://github.com/seaweedfs/seaweedfs/issues/3657
This commit is contained in:
parent
c30f6abb11
commit
c8645fd232
|
@ -29,8 +29,6 @@ service Seaweed {
|
|||
}
|
||||
rpc VolumeMarkReadonly (VolumeMarkReadonlyRequest) returns (VolumeMarkReadonlyResponse) {
|
||||
}
|
||||
rpc VolumeMarkWritable (VolumeMarkWritableRequest) returns (VolumeMarkWritableResponse) {
|
||||
}
|
||||
rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) {
|
||||
}
|
||||
rpc ListClusterNodes (ListClusterNodesRequest) returns (ListClusterNodesResponse) {
|
||||
|
@ -308,17 +306,19 @@ message VacuumVolumeResponse {
|
|||
}
|
||||
|
||||
message VolumeMarkReadonlyRequest {
|
||||
uint32 volume_id = 1;
|
||||
string ip = 1;
|
||||
uint32 port = 2;
|
||||
uint32 volume_id = 4;
|
||||
string collection = 5;
|
||||
uint32 replica_placement = 6;
|
||||
uint32 version = 7;
|
||||
uint32 ttl = 8;
|
||||
string disk_type = 9;
|
||||
bool is_readonly = 10;
|
||||
}
|
||||
message VolumeMarkReadonlyResponse {
|
||||
}
|
||||
|
||||
message VolumeMarkWritableRequest {
|
||||
uint32 volume_id = 1;
|
||||
}
|
||||
message VolumeMarkWritableResponse {
|
||||
}
|
||||
|
||||
message GetMasterConfigurationRequest {
|
||||
}
|
||||
message GetMasterConfigurationResponse {
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -29,7 +29,6 @@ type SeaweedClient interface {
|
|||
LookupEcVolume(ctx context.Context, in *LookupEcVolumeRequest, opts ...grpc.CallOption) (*LookupEcVolumeResponse, error)
|
||||
VacuumVolume(ctx context.Context, in *VacuumVolumeRequest, opts ...grpc.CallOption) (*VacuumVolumeResponse, error)
|
||||
VolumeMarkReadonly(ctx context.Context, in *VolumeMarkReadonlyRequest, opts ...grpc.CallOption) (*VolumeMarkReadonlyResponse, error)
|
||||
VolumeMarkWritable(ctx context.Context, in *VolumeMarkWritableRequest, opts ...grpc.CallOption) (*VolumeMarkWritableResponse, error)
|
||||
GetMasterConfiguration(ctx context.Context, in *GetMasterConfigurationRequest, opts ...grpc.CallOption) (*GetMasterConfigurationResponse, error)
|
||||
ListClusterNodes(ctx context.Context, in *ListClusterNodesRequest, opts ...grpc.CallOption) (*ListClusterNodesResponse, error)
|
||||
LeaseAdminToken(ctx context.Context, in *LeaseAdminTokenRequest, opts ...grpc.CallOption) (*LeaseAdminTokenResponse, error)
|
||||
|
@ -191,15 +190,6 @@ func (c *seaweedClient) VolumeMarkReadonly(ctx context.Context, in *VolumeMarkRe
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (c *seaweedClient) VolumeMarkWritable(ctx context.Context, in *VolumeMarkWritableRequest, opts ...grpc.CallOption) (*VolumeMarkWritableResponse, error) {
|
||||
out := new(VolumeMarkWritableResponse)
|
||||
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/VolumeMarkWritable", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *seaweedClient) GetMasterConfiguration(ctx context.Context, in *GetMasterConfigurationRequest, opts ...grpc.CallOption) (*GetMasterConfigurationResponse, error) {
|
||||
out := new(GetMasterConfigurationResponse)
|
||||
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/GetMasterConfiguration", in, out, opts...)
|
||||
|
@ -287,7 +277,6 @@ type SeaweedServer interface {
|
|||
LookupEcVolume(context.Context, *LookupEcVolumeRequest) (*LookupEcVolumeResponse, error)
|
||||
VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error)
|
||||
VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error)
|
||||
VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error)
|
||||
GetMasterConfiguration(context.Context, *GetMasterConfigurationRequest) (*GetMasterConfigurationResponse, error)
|
||||
ListClusterNodes(context.Context, *ListClusterNodesRequest) (*ListClusterNodesResponse, error)
|
||||
LeaseAdminToken(context.Context, *LeaseAdminTokenRequest) (*LeaseAdminTokenResponse, error)
|
||||
|
@ -336,9 +325,6 @@ func (UnimplementedSeaweedServer) VacuumVolume(context.Context, *VacuumVolumeReq
|
|||
func (UnimplementedSeaweedServer) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
|
||||
}
|
||||
func (UnimplementedSeaweedServer) VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkWritable not implemented")
|
||||
}
|
||||
func (UnimplementedSeaweedServer) GetMasterConfiguration(context.Context, *GetMasterConfigurationRequest) (*GetMasterConfigurationResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetMasterConfiguration not implemented")
|
||||
}
|
||||
|
@ -590,24 +576,6 @@ func _Seaweed_VolumeMarkReadonly_Handler(srv interface{}, ctx context.Context, d
|
|||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Seaweed_VolumeMarkWritable_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(VolumeMarkWritableRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(SeaweedServer).VolumeMarkWritable(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/master_pb.Seaweed/VolumeMarkWritable",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(SeaweedServer).VolumeMarkWritable(ctx, req.(*VolumeMarkWritableRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Seaweed_GetMasterConfiguration_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(GetMasterConfigurationRequest)
|
||||
if err := dec(in); err != nil {
|
||||
|
@ -795,10 +763,6 @@ var Seaweed_ServiceDesc = grpc.ServiceDesc{
|
|||
MethodName: "VolumeMarkReadonly",
|
||||
Handler: _Seaweed_VolumeMarkReadonly_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "VolumeMarkWritable",
|
||||
Handler: _Seaweed_VolumeMarkWritable_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "GetMasterConfiguration",
|
||||
Handler: _Seaweed_GetMasterConfiguration_Handler,
|
||||
|
|
|
@ -282,3 +282,27 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
|
|||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.VolumeMarkReadonlyRequest) (*master_pb.VolumeMarkReadonlyResponse, error) {
|
||||
|
||||
if !ms.Topo.IsLeader() {
|
||||
return nil, raft.NotLeaderError
|
||||
}
|
||||
|
||||
resp := &master_pb.VolumeMarkReadonlyResponse{}
|
||||
|
||||
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(req.ReplicaPlacement))
|
||||
vl := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, needle.LoadTTLFromUint32(req.Ttl), types.ToDiskType(req.DiskType))
|
||||
dataNodes := ms.Topo.Lookup(req.Collection, needle.VolumeId(req.VolumeId))
|
||||
for _, dn := range dataNodes {
|
||||
if dn.Ip == req.Ip && dn.Port == int(req.Port) {
|
||||
if req.IsReadonly {
|
||||
vl.SetVolumeUnavailable(dn, needle.VolumeId(req.VolumeId))
|
||||
} else {
|
||||
vl.SetVolumeAvailable(dn, needle.VolumeId(req.VolumeId), false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package weed_server
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
|
@ -148,19 +149,19 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
|
|||
|
||||
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
|
||||
|
||||
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
_, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
|
||||
VolumeId: req.VolumeId,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("set volume %d to read only on master: %v", req.VolumeId, err)
|
||||
}
|
||||
return nil
|
||||
}); grpcErr != nil {
|
||||
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
|
||||
return resp, fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr)
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
}
|
||||
|
||||
// step 1: stop master from redirecting traffic here
|
||||
if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
|
||||
|
||||
// step 2: mark local volume as readonly
|
||||
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
|
||||
|
||||
if err != nil {
|
||||
|
@ -169,24 +170,44 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
|
|||
glog.V(2).Infof("volume mark readonly %v", req)
|
||||
}
|
||||
|
||||
// step 3: tell master from redirecting traffic here again, to prevent rare case 1.5
|
||||
if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error {
|
||||
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
_, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
|
||||
Ip: vs.store.Ip,
|
||||
Port: uint32(vs.store.Port),
|
||||
VolumeId: uint32(v.Id),
|
||||
Collection: v.Collection,
|
||||
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
|
||||
Ttl: v.Ttl.ToUint32(),
|
||||
DiskType: string(v.DiskType()),
|
||||
IsReadonly: isReadOnly,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("set volume %d to read only on master: %v", v.Id, err)
|
||||
}
|
||||
return nil
|
||||
}); grpcErr != nil {
|
||||
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
|
||||
return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeMarkWritableResponse{}
|
||||
|
||||
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
_, err := client.VolumeMarkWritable(context.Background(), &master_pb.VolumeMarkWritableRequest{
|
||||
VolumeId: req.VolumeId,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("set volume %d to writable on master: %v", req.VolumeId, err)
|
||||
}
|
||||
return nil
|
||||
}); grpcErr != nil {
|
||||
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
|
||||
return resp, fmt.Errorf("grpc VolumeMarkWritable with master %s: %v", vs.GetMaster(), grpcErr)
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
}
|
||||
|
||||
err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
|
||||
|
@ -197,6 +218,11 @@ func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_serv
|
|||
glog.V(2).Infof("volume mark writable %v", req)
|
||||
}
|
||||
|
||||
// enable master to redirect traffic here
|
||||
if err := vs.notifyMasterVolumeReadonly(v, false); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue