mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Added VolumeMarkWritable and VolumeStatus grpc methods
This is necessary for copy to mark as read-only and then restore the original state afterwards.
This commit is contained in:
parent
3b4b1d4a77
commit
3ccfa4c6ad
|
@ -37,8 +37,12 @@ service VolumeServer {
|
||||||
}
|
}
|
||||||
rpc VolumeMarkReadonly (VolumeMarkReadonlyRequest) returns (VolumeMarkReadonlyResponse) {
|
rpc VolumeMarkReadonly (VolumeMarkReadonlyRequest) returns (VolumeMarkReadonlyResponse) {
|
||||||
}
|
}
|
||||||
|
rpc VolumeMarkWritable (VolumeMarkWritableRequest) returns (VolumeMarkWritableResponse) {
|
||||||
|
}
|
||||||
rpc VolumeConfigure (VolumeConfigureRequest) returns (VolumeConfigureResponse) {
|
rpc VolumeConfigure (VolumeConfigureRequest) returns (VolumeConfigureResponse) {
|
||||||
}
|
}
|
||||||
|
rpc VolumeStatus (VolumeStatusRequest) returns (VolumeStatusResponse) {
|
||||||
|
}
|
||||||
|
|
||||||
// copy the .idx .dat files, and mount this volume
|
// copy the .idx .dat files, and mount this volume
|
||||||
rpc VolumeCopy (VolumeCopyRequest) returns (VolumeCopyResponse) {
|
rpc VolumeCopy (VolumeCopyRequest) returns (VolumeCopyResponse) {
|
||||||
|
@ -200,6 +204,12 @@ message VolumeMarkReadonlyRequest {
|
||||||
message VolumeMarkReadonlyResponse {
|
message VolumeMarkReadonlyResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message VolumeMarkWritableRequest {
|
||||||
|
uint32 volume_id = 1;
|
||||||
|
}
|
||||||
|
message VolumeMarkWritableResponse {
|
||||||
|
}
|
||||||
|
|
||||||
message VolumeConfigureRequest {
|
message VolumeConfigureRequest {
|
||||||
uint32 volume_id = 1;
|
uint32 volume_id = 1;
|
||||||
string replication = 2;
|
string replication = 2;
|
||||||
|
@ -208,6 +218,13 @@ message VolumeConfigureResponse {
|
||||||
string error = 1;
|
string error = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message VolumeStatusRequest {
|
||||||
|
uint32 volume_id = 1;
|
||||||
|
}
|
||||||
|
message VolumeStatusResponse {
|
||||||
|
bool is_read_only = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message VolumeCopyRequest {
|
message VolumeCopyRequest {
|
||||||
uint32 volume_id = 1;
|
uint32 volume_id = 1;
|
||||||
string collection = 2;
|
string collection = 2;
|
||||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -149,7 +149,35 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, err
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
|
||||||
|
|
||||||
|
resp := &volume_server_pb.VolumeMarkWritableResponse{}
|
||||||
|
|
||||||
|
err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("volume mark writable %v: %v", req, err)
|
||||||
|
} else {
|
||||||
|
glog.V(2).Infof("volume mark writable %v", req)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
|
||||||
|
|
||||||
|
resp := &volume_server_pb.VolumeStatusResponse{}
|
||||||
|
|
||||||
|
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||||
|
if v == nil {
|
||||||
|
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.IsReadOnly = v.IsReadOnly()
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
|
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
|
||||||
|
|
|
@ -91,6 +91,43 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, so
|
||||||
|
|
||||||
func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
|
func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
|
||||||
|
|
||||||
|
// check to see if the volume is already read-only and if its not then we need
|
||||||
|
// to mark it as read-only and then before we return we need to undo what we
|
||||||
|
// did
|
||||||
|
var shouldMarkWritable bool
|
||||||
|
defer func() {
|
||||||
|
if !shouldMarkWritable {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
|
_, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
|
||||||
|
VolumeId: uint32(volumeId),
|
||||||
|
})
|
||||||
|
return writableErr
|
||||||
|
})
|
||||||
|
if clientErr != nil {
|
||||||
|
log.Printf("failed to mark volume %d as writable after copy from %s: %v", volumeId, sourceVolumeServer, clientErr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
|
resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
|
||||||
|
VolumeId: uint32(volumeId),
|
||||||
|
})
|
||||||
|
if statusErr == nil && !resp.IsReadOnly {
|
||||||
|
shouldMarkWritable = true
|
||||||
|
_, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||||
|
VolumeId: uint32(volumeId),
|
||||||
|
})
|
||||||
|
return readonlyErr
|
||||||
|
}
|
||||||
|
return statusErr
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
|
|
|
@ -307,7 +307,20 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
|
||||||
if v == nil {
|
if v == nil {
|
||||||
return fmt.Errorf("volume %d not found", i)
|
return fmt.Errorf("volume %d not found", i)
|
||||||
}
|
}
|
||||||
|
v.noWriteLock.Lock()
|
||||||
v.noWriteOrDelete = true
|
v.noWriteOrDelete = true
|
||||||
|
v.noWriteLock.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) MarkVolumeWritable(i needle.VolumeId) error {
|
||||||
|
v := s.findVolume(i)
|
||||||
|
if v == nil {
|
||||||
|
return fmt.Errorf("volume %d not found", i)
|
||||||
|
}
|
||||||
|
v.noWriteLock.Lock()
|
||||||
|
v.noWriteOrDelete = false
|
||||||
|
v.noWriteLock.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ type Volume struct {
|
||||||
needleMapKind NeedleMapType
|
needleMapKind NeedleMapType
|
||||||
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
||||||
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
||||||
|
noWriteLock sync.RWMutex
|
||||||
hasRemoteFile bool // if the volume has a remote file
|
hasRemoteFile bool // if the volume has a remote file
|
||||||
MemoryMapMaxSizeMb uint32
|
MemoryMapMaxSizeMb uint32
|
||||||
|
|
||||||
|
@ -58,6 +59,8 @@ func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapK
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) String() string {
|
func (v *Volume) String() string {
|
||||||
|
v.noWriteLock.RLock()
|
||||||
|
defer v.noWriteLock.RUnlock()
|
||||||
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
|
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,5 +248,7 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) IsReadOnly() bool {
|
func (v *Volume) IsReadOnly() bool {
|
||||||
|
v.noWriteLock.RLock()
|
||||||
|
defer v.noWriteLock.RUnlock()
|
||||||
return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
|
return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,10 @@ func (v *Volume) maybeWriteSuperBlock() error {
|
||||||
if dataFile, e = os.Create(v.DataBackend.Name()); e == nil {
|
if dataFile, e = os.Create(v.DataBackend.Name()); e == nil {
|
||||||
v.DataBackend = backend.NewDiskFile(dataFile)
|
v.DataBackend = backend.NewDiskFile(dataFile)
|
||||||
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
|
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
|
||||||
|
v.noWriteLock.Lock()
|
||||||
v.noWriteOrDelete = false
|
v.noWriteOrDelete = false
|
||||||
v.noWriteCanDelete = false
|
v.noWriteCanDelete = false
|
||||||
|
v.noWriteLock.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue