From 95e0520182eeeb57921916dc694b64ff342c93e1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 23 Mar 2019 11:33:34 -0700 Subject: [PATCH] weed volume: add grpc operation to relicate a volume to local --- weed/operation/sync_volume.go | 4 +- weed/pb/volume_server.proto | 60 ++- weed/pb/volume_server_pb/volume_server.pb.go | 536 +++++++++++++++---- weed/server/volume_grpc_admin.go | 8 +- weed/server/volume_grpc_replicate.go | 155 ++++++ weed/server/volume_grpc_sync.go | 20 +- weed/server/volume_grpc_vacuum.go | 22 +- weed/shell/shell_liner.go | 2 +- weed/storage/store.go | 4 +- weed/storage/volume.go | 13 +- weed/storage/volume_sync.go | 2 +- weed/topology/allocate_volume.go | 2 +- weed/topology/topology_vacuum.go | 8 +- 13 files changed, 662 insertions(+), 174 deletions(-) create mode 100644 weed/server/volume_grpc_replicate.go diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index c979254f4..6af2404c0 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -15,7 +15,7 @@ func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ - VolumdId: vid, + VolumeId: vid, }) return nil }) @@ -27,7 +27,7 @@ func GetVolumeIdxEntries(server string, grpcDialOption grpc.DialOption, vid uint return WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{ - VolumdId: vid, + VolumeId: vid, }) if err != nil { return err diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 8ab67a1bf..93db5b981 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -36,7 +36,12 @@ service VolumeServer { rpc VolumeDelete (VolumeDeleteRequest) returns (VolumeDeleteResponse) { } - // rpc VolumeUiPage (VolumeUiPageRequest) returns (VolumeUiPageResponse) {} + rpc ReplicateVolume (ReplicateVolumeRequest) returns (ReplicateVolumeResponse) { + } + rpc ReadVolumeFileStatus (ReadVolumeFileStatusRequest) returns (ReadVolumeFileStatusResponse) { + } + rpc CopyFile (CopyFileRequest) returns (stream CopyFileResponse) { + } } @@ -60,27 +65,27 @@ message Empty { } message VacuumVolumeCheckRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; } message VacuumVolumeCheckResponse { double garbage_ratio = 1; } message VacuumVolumeCompactRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; int64 preallocate = 2; } message VacuumVolumeCompactResponse { } message VacuumVolumeCommitRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; } message VacuumVolumeCommitResponse { } message VacuumVolumeCleanupRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; } message VacuumVolumeCleanupResponse { } @@ -92,7 +97,7 @@ message DeleteCollectionResponse { } message AssignVolumeRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; string collection = 2; int64 preallocate = 3; string replication = 4; @@ -102,10 +107,10 @@ message AssignVolumeResponse { } message VolumeSyncStatusRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; } message VolumeSyncStatusResponse { - uint32 volumd_id = 1; + uint32 volume_id = 1; string collection = 2; string replication = 4; string ttl = 5; @@ -115,14 +120,14 @@ message VolumeSyncStatusResponse { } message VolumeSyncIndexRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; } message VolumeSyncIndexResponse { bytes index_file_content = 1; } message VolumeSyncDataRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; uint32 revision = 2; uint32 offset = 3; uint32 size = 4; @@ -133,26 +138,51 @@ message VolumeSyncDataResponse { } message VolumeMountRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; } message VolumeMountResponse { } message VolumeUnmountRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; } message VolumeUnmountResponse { } message VolumeDeleteRequest { - uint32 volumd_id = 1; + uint32 volume_id = 1; } message VolumeDeleteResponse { } -message VolumeUiPageRequest { +message ReplicateVolumeRequest { + uint32 volume_id = 1; + string collection = 2; + string replication = 3; + string ttl = 4; + string source_data_node = 5; } -message VolumeUiPageResponse { +message ReplicateVolumeResponse { +} + +message CopyFileRequest { + uint32 volume_id = 1; + bool is_idx_file = 2; + bool is_dat_file = 3; +} +message CopyFileResponse { + bytes file_content = 1; +} + +message ReadVolumeFileStatusRequest { + uint32 volume_id = 1; +} +message ReadVolumeFileStatusResponse { + uint32 volume_id = 1; + uint64 idx_file_timestamp = 2; + uint64 idx_file_size = 3; + uint64 dat_file_timestamp = 4; + uint64 dat_file_size = 5; } message DiskStatus { diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index fa700e2e5..d84a5b099 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -37,8 +37,12 @@ It has these top-level messages: VolumeUnmountResponse VolumeDeleteRequest VolumeDeleteResponse - VolumeUiPageRequest - VolumeUiPageResponse + ReplicateVolumeRequest + ReplicateVolumeResponse + CopyFileRequest + CopyFileResponse + ReadVolumeFileStatusRequest + ReadVolumeFileStatusResponse DiskStatus MemStatus */ @@ -145,7 +149,7 @@ func (*Empty) ProtoMessage() {} func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } type VacuumVolumeCheckRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } func (m *VacuumVolumeCheckRequest) Reset() { *m = VacuumVolumeCheckRequest{} } @@ -153,9 +157,9 @@ func (m *VacuumVolumeCheckRequest) String() string { return proto.Com func (*VacuumVolumeCheckRequest) ProtoMessage() {} func (*VacuumVolumeCheckRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } -func (m *VacuumVolumeCheckRequest) GetVolumdId() uint32 { +func (m *VacuumVolumeCheckRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -177,7 +181,7 @@ func (m *VacuumVolumeCheckResponse) GetGarbageRatio() float64 { } type VacuumVolumeCompactRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` Preallocate int64 `protobuf:"varint,2,opt,name=preallocate" json:"preallocate,omitempty"` } @@ -186,9 +190,9 @@ func (m *VacuumVolumeCompactRequest) String() string { return proto.C func (*VacuumVolumeCompactRequest) ProtoMessage() {} func (*VacuumVolumeCompactRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } -func (m *VacuumVolumeCompactRequest) GetVolumdId() uint32 { +func (m *VacuumVolumeCompactRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -209,7 +213,7 @@ func (*VacuumVolumeCompactResponse) ProtoMessage() {} func (*VacuumVolumeCompactResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } type VacuumVolumeCommitRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } func (m *VacuumVolumeCommitRequest) Reset() { *m = VacuumVolumeCommitRequest{} } @@ -217,9 +221,9 @@ func (m *VacuumVolumeCommitRequest) String() string { return proto.Co func (*VacuumVolumeCommitRequest) ProtoMessage() {} func (*VacuumVolumeCommitRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } -func (m *VacuumVolumeCommitRequest) GetVolumdId() uint32 { +func (m *VacuumVolumeCommitRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -233,7 +237,7 @@ func (*VacuumVolumeCommitResponse) ProtoMessage() {} func (*VacuumVolumeCommitResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } type VacuumVolumeCleanupRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } func (m *VacuumVolumeCleanupRequest) Reset() { *m = VacuumVolumeCleanupRequest{} } @@ -241,9 +245,9 @@ func (m *VacuumVolumeCleanupRequest) String() string { return proto.C func (*VacuumVolumeCleanupRequest) ProtoMessage() {} func (*VacuumVolumeCleanupRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } -func (m *VacuumVolumeCleanupRequest) GetVolumdId() uint32 { +func (m *VacuumVolumeCleanupRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -281,7 +285,7 @@ func (*DeleteCollectionResponse) ProtoMessage() {} func (*DeleteCollectionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } type AssignVolumeRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"` Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"` @@ -293,9 +297,9 @@ func (m *AssignVolumeRequest) String() string { return proto.CompactT func (*AssignVolumeRequest) ProtoMessage() {} func (*AssignVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } -func (m *AssignVolumeRequest) GetVolumdId() uint32 { +func (m *AssignVolumeRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -337,7 +341,7 @@ func (*AssignVolumeResponse) ProtoMessage() {} func (*AssignVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } type VolumeSyncStatusRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } func (m *VolumeSyncStatusRequest) Reset() { *m = VolumeSyncStatusRequest{} } @@ -345,15 +349,15 @@ func (m *VolumeSyncStatusRequest) String() string { return proto.Comp func (*VolumeSyncStatusRequest) ProtoMessage() {} func (*VolumeSyncStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } -func (m *VolumeSyncStatusRequest) GetVolumdId() uint32 { +func (m *VolumeSyncStatusRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } type VolumeSyncStatusResponse struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"` Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"` @@ -367,9 +371,9 @@ func (m *VolumeSyncStatusResponse) String() string { return proto.Com func (*VolumeSyncStatusResponse) ProtoMessage() {} func (*VolumeSyncStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } -func (m *VolumeSyncStatusResponse) GetVolumdId() uint32 { +func (m *VolumeSyncStatusResponse) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -417,7 +421,7 @@ func (m *VolumeSyncStatusResponse) GetIdxFileSize() uint64 { } type VolumeSyncIndexRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } func (m *VolumeSyncIndexRequest) Reset() { *m = VolumeSyncIndexRequest{} } @@ -425,9 +429,9 @@ func (m *VolumeSyncIndexRequest) String() string { return proto.Compa func (*VolumeSyncIndexRequest) ProtoMessage() {} func (*VolumeSyncIndexRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } -func (m *VolumeSyncIndexRequest) GetVolumdId() uint32 { +func (m *VolumeSyncIndexRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -449,7 +453,7 @@ func (m *VolumeSyncIndexResponse) GetIndexFileContent() []byte { } type VolumeSyncDataRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` Revision uint32 `protobuf:"varint,2,opt,name=revision" json:"revision,omitempty"` Offset uint32 `protobuf:"varint,3,opt,name=offset" json:"offset,omitempty"` Size uint32 `protobuf:"varint,4,opt,name=size" json:"size,omitempty"` @@ -461,9 +465,9 @@ func (m *VolumeSyncDataRequest) String() string { return proto.Compac func (*VolumeSyncDataRequest) ProtoMessage() {} func (*VolumeSyncDataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} } -func (m *VolumeSyncDataRequest) GetVolumdId() uint32 { +func (m *VolumeSyncDataRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -513,7 +517,7 @@ func (m *VolumeSyncDataResponse) GetFileContent() []byte { } type VolumeMountRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } func (m *VolumeMountRequest) Reset() { *m = VolumeMountRequest{} } @@ -521,9 +525,9 @@ func (m *VolumeMountRequest) String() string { return proto.CompactTe func (*VolumeMountRequest) ProtoMessage() {} func (*VolumeMountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{22} } -func (m *VolumeMountRequest) GetVolumdId() uint32 { +func (m *VolumeMountRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -537,7 +541,7 @@ func (*VolumeMountResponse) ProtoMessage() {} func (*VolumeMountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} } type VolumeUnmountRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } func (m *VolumeUnmountRequest) Reset() { *m = VolumeUnmountRequest{} } @@ -545,9 +549,9 @@ func (m *VolumeUnmountRequest) String() string { return proto.Compact func (*VolumeUnmountRequest) ProtoMessage() {} func (*VolumeUnmountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{24} } -func (m *VolumeUnmountRequest) GetVolumdId() uint32 { +func (m *VolumeUnmountRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -561,7 +565,7 @@ func (*VolumeUnmountResponse) ProtoMessage() {} func (*VolumeUnmountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{25} } type VolumeDeleteRequest struct { - VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } func (m *VolumeDeleteRequest) Reset() { *m = VolumeDeleteRequest{} } @@ -569,9 +573,9 @@ func (m *VolumeDeleteRequest) String() string { return proto.CompactT func (*VolumeDeleteRequest) ProtoMessage() {} func (*VolumeDeleteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{26} } -func (m *VolumeDeleteRequest) GetVolumdId() uint32 { +func (m *VolumeDeleteRequest) GetVolumeId() uint32 { if m != nil { - return m.VolumdId + return m.VolumeId } return 0 } @@ -584,21 +588,173 @@ func (m *VolumeDeleteResponse) String() string { return proto.Compact func (*VolumeDeleteResponse) ProtoMessage() {} func (*VolumeDeleteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{27} } -type VolumeUiPageRequest struct { +type ReplicateVolumeRequest struct { + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` + Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` + Replication string `protobuf:"bytes,3,opt,name=replication" json:"replication,omitempty"` + Ttl string `protobuf:"bytes,4,opt,name=ttl" json:"ttl,omitempty"` + SourceDataNode string `protobuf:"bytes,5,opt,name=source_data_node,json=sourceDataNode" json:"source_data_node,omitempty"` } -func (m *VolumeUiPageRequest) Reset() { *m = VolumeUiPageRequest{} } -func (m *VolumeUiPageRequest) String() string { return proto.CompactTextString(m) } -func (*VolumeUiPageRequest) ProtoMessage() {} -func (*VolumeUiPageRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{28} } +func (m *ReplicateVolumeRequest) Reset() { *m = ReplicateVolumeRequest{} } +func (m *ReplicateVolumeRequest) String() string { return proto.CompactTextString(m) } +func (*ReplicateVolumeRequest) ProtoMessage() {} +func (*ReplicateVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{28} } -type VolumeUiPageResponse struct { +func (m *ReplicateVolumeRequest) GetVolumeId() uint32 { + if m != nil { + return m.VolumeId + } + return 0 } -func (m *VolumeUiPageResponse) Reset() { *m = VolumeUiPageResponse{} } -func (m *VolumeUiPageResponse) String() string { return proto.CompactTextString(m) } -func (*VolumeUiPageResponse) ProtoMessage() {} -func (*VolumeUiPageResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{29} } +func (m *ReplicateVolumeRequest) GetCollection() string { + if m != nil { + return m.Collection + } + return "" +} + +func (m *ReplicateVolumeRequest) GetReplication() string { + if m != nil { + return m.Replication + } + return "" +} + +func (m *ReplicateVolumeRequest) GetTtl() string { + if m != nil { + return m.Ttl + } + return "" +} + +func (m *ReplicateVolumeRequest) GetSourceDataNode() string { + if m != nil { + return m.SourceDataNode + } + return "" +} + +type ReplicateVolumeResponse struct { +} + +func (m *ReplicateVolumeResponse) Reset() { *m = ReplicateVolumeResponse{} } +func (m *ReplicateVolumeResponse) String() string { return proto.CompactTextString(m) } +func (*ReplicateVolumeResponse) ProtoMessage() {} +func (*ReplicateVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{29} } + +type CopyFileRequest struct { + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` + IsIdxFile bool `protobuf:"varint,2,opt,name=is_idx_file,json=isIdxFile" json:"is_idx_file,omitempty"` + IsDatFile bool `protobuf:"varint,3,opt,name=is_dat_file,json=isDatFile" json:"is_dat_file,omitempty"` +} + +func (m *CopyFileRequest) Reset() { *m = CopyFileRequest{} } +func (m *CopyFileRequest) String() string { return proto.CompactTextString(m) } +func (*CopyFileRequest) ProtoMessage() {} +func (*CopyFileRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{30} } + +func (m *CopyFileRequest) GetVolumeId() uint32 { + if m != nil { + return m.VolumeId + } + return 0 +} + +func (m *CopyFileRequest) GetIsIdxFile() bool { + if m != nil { + return m.IsIdxFile + } + return false +} + +func (m *CopyFileRequest) GetIsDatFile() bool { + if m != nil { + return m.IsDatFile + } + return false +} + +type CopyFileResponse struct { + FileContent []byte `protobuf:"bytes,1,opt,name=file_content,json=fileContent,proto3" json:"file_content,omitempty"` +} + +func (m *CopyFileResponse) Reset() { *m = CopyFileResponse{} } +func (m *CopyFileResponse) String() string { return proto.CompactTextString(m) } +func (*CopyFileResponse) ProtoMessage() {} +func (*CopyFileResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{31} } + +func (m *CopyFileResponse) GetFileContent() []byte { + if m != nil { + return m.FileContent + } + return nil +} + +type ReadVolumeFileStatusRequest struct { + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` +} + +func (m *ReadVolumeFileStatusRequest) Reset() { *m = ReadVolumeFileStatusRequest{} } +func (m *ReadVolumeFileStatusRequest) String() string { return proto.CompactTextString(m) } +func (*ReadVolumeFileStatusRequest) ProtoMessage() {} +func (*ReadVolumeFileStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{32} } + +func (m *ReadVolumeFileStatusRequest) GetVolumeId() uint32 { + if m != nil { + return m.VolumeId + } + return 0 +} + +type ReadVolumeFileStatusResponse struct { + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` + IdxFileTimestamp uint64 `protobuf:"varint,2,opt,name=idx_file_timestamp,json=idxFileTimestamp" json:"idx_file_timestamp,omitempty"` + IdxFileSize uint64 `protobuf:"varint,3,opt,name=idx_file_size,json=idxFileSize" json:"idx_file_size,omitempty"` + DatFileTimestamp uint64 `protobuf:"varint,4,opt,name=dat_file_timestamp,json=datFileTimestamp" json:"dat_file_timestamp,omitempty"` + DatFileSize uint64 `protobuf:"varint,5,opt,name=dat_file_size,json=datFileSize" json:"dat_file_size,omitempty"` +} + +func (m *ReadVolumeFileStatusResponse) Reset() { *m = ReadVolumeFileStatusResponse{} } +func (m *ReadVolumeFileStatusResponse) String() string { return proto.CompactTextString(m) } +func (*ReadVolumeFileStatusResponse) ProtoMessage() {} +func (*ReadVolumeFileStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{33} } + +func (m *ReadVolumeFileStatusResponse) GetVolumeId() uint32 { + if m != nil { + return m.VolumeId + } + return 0 +} + +func (m *ReadVolumeFileStatusResponse) GetIdxFileTimestamp() uint64 { + if m != nil { + return m.IdxFileTimestamp + } + return 0 +} + +func (m *ReadVolumeFileStatusResponse) GetIdxFileSize() uint64 { + if m != nil { + return m.IdxFileSize + } + return 0 +} + +func (m *ReadVolumeFileStatusResponse) GetDatFileTimestamp() uint64 { + if m != nil { + return m.DatFileTimestamp + } + return 0 +} + +func (m *ReadVolumeFileStatusResponse) GetDatFileSize() uint64 { + if m != nil { + return m.DatFileSize + } + return 0 +} type DiskStatus struct { Dir string `protobuf:"bytes,1,opt,name=dir" json:"dir,omitempty"` @@ -610,7 +766,7 @@ type DiskStatus struct { func (m *DiskStatus) Reset() { *m = DiskStatus{} } func (m *DiskStatus) String() string { return proto.CompactTextString(m) } func (*DiskStatus) ProtoMessage() {} -func (*DiskStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{30} } +func (*DiskStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{34} } func (m *DiskStatus) GetDir() string { if m != nil { @@ -653,7 +809,7 @@ type MemStatus struct { func (m *MemStatus) Reset() { *m = MemStatus{} } func (m *MemStatus) String() string { return proto.CompactTextString(m) } func (*MemStatus) ProtoMessage() {} -func (*MemStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{31} } +func (*MemStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{35} } func (m *MemStatus) GetGoroutines() int32 { if m != nil { @@ -733,8 +889,12 @@ func init() { proto.RegisterType((*VolumeUnmountResponse)(nil), "volume_server_pb.VolumeUnmountResponse") proto.RegisterType((*VolumeDeleteRequest)(nil), "volume_server_pb.VolumeDeleteRequest") proto.RegisterType((*VolumeDeleteResponse)(nil), "volume_server_pb.VolumeDeleteResponse") - proto.RegisterType((*VolumeUiPageRequest)(nil), "volume_server_pb.VolumeUiPageRequest") - proto.RegisterType((*VolumeUiPageResponse)(nil), "volume_server_pb.VolumeUiPageResponse") + proto.RegisterType((*ReplicateVolumeRequest)(nil), "volume_server_pb.ReplicateVolumeRequest") + proto.RegisterType((*ReplicateVolumeResponse)(nil), "volume_server_pb.ReplicateVolumeResponse") + proto.RegisterType((*CopyFileRequest)(nil), "volume_server_pb.CopyFileRequest") + proto.RegisterType((*CopyFileResponse)(nil), "volume_server_pb.CopyFileResponse") + proto.RegisterType((*ReadVolumeFileStatusRequest)(nil), "volume_server_pb.ReadVolumeFileStatusRequest") + proto.RegisterType((*ReadVolumeFileStatusResponse)(nil), "volume_server_pb.ReadVolumeFileStatusResponse") proto.RegisterType((*DiskStatus)(nil), "volume_server_pb.DiskStatus") proto.RegisterType((*MemStatus)(nil), "volume_server_pb.MemStatus") } @@ -764,6 +924,9 @@ type VolumeServerClient interface { VolumeMount(ctx context.Context, in *VolumeMountRequest, opts ...grpc.CallOption) (*VolumeMountResponse, error) VolumeUnmount(ctx context.Context, in *VolumeUnmountRequest, opts ...grpc.CallOption) (*VolumeUnmountResponse, error) VolumeDelete(ctx context.Context, in *VolumeDeleteRequest, opts ...grpc.CallOption) (*VolumeDeleteResponse, error) + ReplicateVolume(ctx context.Context, in *ReplicateVolumeRequest, opts ...grpc.CallOption) (*ReplicateVolumeResponse, error) + ReadVolumeFileStatus(ctx context.Context, in *ReadVolumeFileStatusRequest, opts ...grpc.CallOption) (*ReadVolumeFileStatusResponse, error) + CopyFile(ctx context.Context, in *CopyFileRequest, opts ...grpc.CallOption) (VolumeServer_CopyFileClient, error) } type volumeServerClient struct { @@ -937,6 +1100,56 @@ func (c *volumeServerClient) VolumeDelete(ctx context.Context, in *VolumeDeleteR return out, nil } +func (c *volumeServerClient) ReplicateVolume(ctx context.Context, in *ReplicateVolumeRequest, opts ...grpc.CallOption) (*ReplicateVolumeResponse, error) { + out := new(ReplicateVolumeResponse) + err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/ReplicateVolume", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *volumeServerClient) ReadVolumeFileStatus(ctx context.Context, in *ReadVolumeFileStatusRequest, opts ...grpc.CallOption) (*ReadVolumeFileStatusResponse, error) { + out := new(ReadVolumeFileStatusResponse) + err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/ReadVolumeFileStatus", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *volumeServerClient) CopyFile(ctx context.Context, in *CopyFileRequest, opts ...grpc.CallOption) (VolumeServer_CopyFileClient, error) { + stream, err := grpc.NewClientStream(ctx, &_VolumeServer_serviceDesc.Streams[2], c.cc, "/volume_server_pb.VolumeServer/CopyFile", opts...) + if err != nil { + return nil, err + } + x := &volumeServerCopyFileClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type VolumeServer_CopyFileClient interface { + Recv() (*CopyFileResponse, error) + grpc.ClientStream +} + +type volumeServerCopyFileClient struct { + grpc.ClientStream +} + +func (x *volumeServerCopyFileClient) Recv() (*CopyFileResponse, error) { + m := new(CopyFileResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for VolumeServer service type VolumeServerServer interface { @@ -954,6 +1167,9 @@ type VolumeServerServer interface { VolumeMount(context.Context, *VolumeMountRequest) (*VolumeMountResponse, error) VolumeUnmount(context.Context, *VolumeUnmountRequest) (*VolumeUnmountResponse, error) VolumeDelete(context.Context, *VolumeDeleteRequest) (*VolumeDeleteResponse, error) + ReplicateVolume(context.Context, *ReplicateVolumeRequest) (*ReplicateVolumeResponse, error) + ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error) + CopyFile(*CopyFileRequest, VolumeServer_CopyFileServer) error } func RegisterVolumeServerServer(s *grpc.Server, srv VolumeServerServer) { @@ -1200,6 +1416,63 @@ func _VolumeServer_VolumeDelete_Handler(srv interface{}, ctx context.Context, de return interceptor(ctx, in, info, handler) } +func _VolumeServer_ReplicateVolume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReplicateVolumeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(VolumeServerServer).ReplicateVolume(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/volume_server_pb.VolumeServer/ReplicateVolume", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(VolumeServerServer).ReplicateVolume(ctx, req.(*ReplicateVolumeRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _VolumeServer_ReadVolumeFileStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReadVolumeFileStatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(VolumeServerServer).ReadVolumeFileStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/volume_server_pb.VolumeServer/ReadVolumeFileStatus", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(VolumeServerServer).ReadVolumeFileStatus(ctx, req.(*ReadVolumeFileStatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _VolumeServer_CopyFile_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(CopyFileRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(VolumeServerServer).CopyFile(m, &volumeServerCopyFileServer{stream}) +} + +type VolumeServer_CopyFileServer interface { + Send(*CopyFileResponse) error + grpc.ServerStream +} + +type volumeServerCopyFileServer struct { + grpc.ServerStream +} + +func (x *volumeServerCopyFileServer) Send(m *CopyFileResponse) error { + return x.ServerStream.SendMsg(m) +} + var _VolumeServer_serviceDesc = grpc.ServiceDesc{ ServiceName: "volume_server_pb.VolumeServer", HandlerType: (*VolumeServerServer)(nil), @@ -1248,6 +1521,14 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{ MethodName: "VolumeDelete", Handler: _VolumeServer_VolumeDelete_Handler, }, + { + MethodName: "ReplicateVolume", + Handler: _VolumeServer_ReplicateVolume_Handler, + }, + { + MethodName: "ReadVolumeFileStatus", + Handler: _VolumeServer_ReadVolumeFileStatus_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -1260,6 +1541,11 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{ Handler: _VolumeServer_VolumeSyncData_Handler, ServerStreams: true, }, + { + StreamName: "CopyFile", + Handler: _VolumeServer_CopyFile_Handler, + ServerStreams: true, + }, }, Metadata: "volume_server.proto", } @@ -1267,71 +1553,83 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1044 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x57, 0xdd, 0x72, 0xdb, 0x44, - 0x14, 0x8e, 0x6a, 0x3b, 0x76, 0x8e, 0x6d, 0x6a, 0xd6, 0x69, 0xa2, 0xaa, 0x10, 0x8c, 0x80, 0xd4, - 0x69, 0x43, 0x80, 0x74, 0x80, 0x32, 0xdc, 0x00, 0x09, 0x30, 0xb9, 0xe8, 0x94, 0xd9, 0x4c, 0x3b, - 0xcc, 0xd0, 0x19, 0x8f, 0x22, 0xad, 0x9d, 0x25, 0xb2, 0xe4, 0x6a, 0x57, 0x99, 0x94, 0x37, 0xe1, - 0x9a, 0x1b, 0x9e, 0x8e, 0x17, 0xe0, 0x86, 0xd9, 0x1f, 0xd9, 0xfa, 0x73, 0x24, 0xe0, 0x6e, 0xf7, - 0xec, 0x39, 0xdf, 0xf9, 0xd9, 0xa3, 0xf3, 0xad, 0x60, 0x78, 0x1d, 0xfa, 0xf1, 0x9c, 0x4c, 0x18, - 0x89, 0xae, 0x49, 0x74, 0xb4, 0x88, 0x42, 0x1e, 0xa2, 0x41, 0x46, 0x38, 0x59, 0x5c, 0xd8, 0x9f, - 0x00, 0xfa, 0xce, 0xe1, 0xee, 0xe5, 0x29, 0xf1, 0x09, 0x27, 0x98, 0xbc, 0x8e, 0x09, 0xe3, 0xe8, - 0x3e, 0x74, 0xa6, 0xd4, 0x27, 0x13, 0xea, 0x31, 0xd3, 0x18, 0x35, 0xc6, 0x5b, 0xb8, 0x2d, 0xf6, - 0x67, 0x1e, 0xb3, 0x9f, 0xc3, 0x30, 0x63, 0xc0, 0x16, 0x61, 0xc0, 0x08, 0x7a, 0x0a, 0xed, 0x88, - 0xb0, 0xd8, 0xe7, 0xca, 0xa0, 0x7b, 0xbc, 0x77, 0x94, 0xf7, 0x75, 0xb4, 0x34, 0x89, 0x7d, 0x8e, - 0x13, 0x75, 0x9b, 0x42, 0x2f, 0x7d, 0x80, 0x76, 0xa1, 0xad, 0x7d, 0x9b, 0xc6, 0xc8, 0x18, 0x6f, - 0xe1, 0x4d, 0xe5, 0x1a, 0xed, 0xc0, 0x26, 0xe3, 0x0e, 0x8f, 0x99, 0x79, 0x67, 0x64, 0x8c, 0x5b, - 0x58, 0xef, 0xd0, 0x36, 0xb4, 0x48, 0x14, 0x85, 0x91, 0xd9, 0x90, 0xea, 0x6a, 0x83, 0x10, 0x34, - 0x19, 0xfd, 0x8d, 0x98, 0xcd, 0x91, 0x31, 0xee, 0x63, 0xb9, 0xb6, 0xdb, 0xd0, 0xfa, 0x7e, 0xbe, - 0xe0, 0x6f, 0xec, 0x2f, 0xc1, 0x7c, 0xe9, 0xb8, 0x71, 0x3c, 0x7f, 0x29, 0x63, 0x3c, 0xb9, 0x24, - 0xee, 0x55, 0x92, 0xfb, 0x03, 0xd8, 0x92, 0x91, 0x7b, 0x49, 0x04, 0x7d, 0xdc, 0x51, 0x82, 0x33, - 0xcf, 0xfe, 0x06, 0xee, 0x97, 0x18, 0xea, 0x1a, 0x7c, 0x00, 0xfd, 0x99, 0x13, 0x5d, 0x38, 0x33, - 0x32, 0x89, 0x1c, 0x4e, 0x43, 0x69, 0x6d, 0xe0, 0x9e, 0x16, 0x62, 0x21, 0xb3, 0x7f, 0x01, 0x2b, - 0x83, 0x10, 0xce, 0x17, 0x8e, 0xcb, 0xeb, 0x38, 0x47, 0x23, 0xe8, 0x2e, 0x22, 0xe2, 0xf8, 0x7e, - 0xe8, 0x3a, 0x9c, 0xc8, 0x2a, 0x34, 0x70, 0x5a, 0x64, 0xbf, 0x0b, 0x0f, 0x4a, 0xc1, 0x55, 0x80, - 0xf6, 0xd3, 0x5c, 0xf4, 0xe1, 0x7c, 0x4e, 0x6b, 0xb9, 0xb6, 0xdf, 0x29, 0x44, 0x2d, 0x2d, 0x35, - 0xee, 0x57, 0xb9, 0x53, 0x9f, 0x38, 0x41, 0xbc, 0xa8, 0x05, 0x9c, 0x8f, 0x38, 0x31, 0x5d, 0x22, - 0xef, 0xaa, 0xe6, 0x38, 0x09, 0x7d, 0x9f, 0xb8, 0x9c, 0x86, 0x41, 0x02, 0xbb, 0x07, 0xe0, 0x2e, - 0x85, 0xba, 0x55, 0x52, 0x12, 0xdb, 0x02, 0xb3, 0x68, 0xaa, 0x61, 0xff, 0x34, 0x60, 0xf8, 0x2d, - 0x63, 0x74, 0x16, 0x28, 0xb7, 0xb5, 0xca, 0x9f, 0x75, 0x78, 0x27, 0xef, 0x30, 0x7f, 0x3d, 0x8d, - 0xc2, 0xf5, 0x08, 0x8d, 0x88, 0x2c, 0x7c, 0xea, 0x3a, 0x12, 0xa2, 0x29, 0x21, 0xd2, 0x22, 0x34, - 0x80, 0x06, 0xe7, 0xbe, 0xd9, 0x92, 0x27, 0x62, 0x69, 0xef, 0xc0, 0x76, 0x36, 0x52, 0x9d, 0xc2, - 0x17, 0xb0, 0xab, 0x24, 0xe7, 0x6f, 0x02, 0xf7, 0x5c, 0x7e, 0x09, 0xb5, 0x0a, 0xfe, 0xb7, 0x01, - 0x66, 0xd1, 0x50, 0x77, 0xf0, 0xff, 0xcd, 0xff, 0xdf, 0x66, 0x87, 0xde, 0x83, 0x2e, 0x77, 0xa8, - 0x3f, 0x09, 0xa7, 0x53, 0x46, 0xb8, 0xb9, 0x39, 0x32, 0xc6, 0x4d, 0x0c, 0x42, 0xf4, 0x5c, 0x4a, - 0xd0, 0x01, 0x0c, 0x5c, 0xd5, 0xc5, 0x93, 0x88, 0x5c, 0x53, 0x26, 0x90, 0xdb, 0x32, 0xb0, 0xbb, - 0x6e, 0xd2, 0xdd, 0x4a, 0x8c, 0x6c, 0xe8, 0x53, 0xef, 0x66, 0x22, 0x87, 0x87, 0xfc, 0xf4, 0x3b, - 0x12, 0xad, 0x4b, 0xbd, 0x9b, 0x1f, 0xa8, 0x4f, 0xce, 0xc5, 0x04, 0xf8, 0x1c, 0x76, 0x56, 0xc9, - 0x9f, 0x05, 0x1e, 0xb9, 0xa9, 0x55, 0xb4, 0x1f, 0xd3, 0xc5, 0xd6, 0x66, 0xba, 0x64, 0x87, 0x80, - 0xa8, 0x10, 0x28, 0xbf, 0x6e, 0x18, 0x70, 0x12, 0x70, 0x09, 0xd0, 0xc3, 0x03, 0x79, 0x22, 0x9c, - 0x9f, 0x28, 0xb9, 0xfd, 0xbb, 0x01, 0xf7, 0x56, 0x48, 0xa7, 0x0e, 0x77, 0x6a, 0xb5, 0x9e, 0x05, - 0x9d, 0x65, 0xf6, 0x77, 0xd4, 0x59, 0xb2, 0x17, 0x63, 0x51, 0x57, 0xaf, 0x21, 0x4f, 0xf4, 0xae, - 0x6c, 0x00, 0x0a, 0x27, 0x01, 0x21, 0x9e, 0x9a, 0xae, 0xea, 0x1a, 0x3a, 0x4a, 0x70, 0xe6, 0xd9, - 0x5f, 0xa7, 0x6b, 0xa3, 0x42, 0xd3, 0x39, 0xbe, 0x0f, 0xbd, 0x92, 0xec, 0xba, 0xd3, 0x54, 0x62, - 0x9f, 0x01, 0x52, 0xc6, 0xcf, 0xc2, 0x38, 0xa8, 0x37, 0x53, 0xee, 0xc1, 0x30, 0x63, 0xa2, 0x1b, - 0xfb, 0x09, 0x6c, 0x2b, 0xf1, 0x8b, 0x60, 0x5e, 0x1b, 0x6b, 0x37, 0x29, 0xeb, 0xd2, 0x48, 0xa3, - 0x1d, 0x27, 0x4e, 0xb2, 0x04, 0x77, 0x2b, 0xd8, 0x4e, 0x12, 0x41, 0x96, 0xe3, 0x56, 0x01, 0xbf, - 0xa0, 0x3f, 0x89, 0x79, 0xae, 0xb0, 0x56, 0xea, 0x89, 0x58, 0xab, 0xff, 0x0c, 0x70, 0x4a, 0xd9, - 0x95, 0xfa, 0xc4, 0x44, 0xef, 0x7b, 0x34, 0xd2, 0x73, 0x4a, 0x2c, 0x85, 0xc4, 0xf1, 0x7d, 0x79, - 0x9f, 0x4d, 0x2c, 0x96, 0xe2, 0xca, 0x62, 0x46, 0x3c, 0x79, 0x91, 0x4d, 0x2c, 0xd7, 0x42, 0x36, - 0x8d, 0x88, 0xba, 0xc6, 0x26, 0x96, 0x6b, 0xfb, 0x0f, 0x03, 0xb6, 0x9e, 0x91, 0xb9, 0x46, 0xde, - 0x03, 0x98, 0x85, 0x51, 0x18, 0x73, 0x1a, 0x10, 0x26, 0x1d, 0xb4, 0x70, 0x4a, 0xf2, 0xdf, 0xfd, - 0xc8, 0x16, 0x22, 0xfe, 0x54, 0x76, 0x4a, 0x13, 0xcb, 0xb5, 0x90, 0x5d, 0x12, 0x67, 0xa1, 0x3f, - 0x55, 0xb9, 0x16, 0x0c, 0xcc, 0xb8, 0xe3, 0x5e, 0xc9, 0x2f, 0xb3, 0x89, 0xd5, 0xe6, 0xf8, 0x2f, - 0x80, 0x9e, 0x6e, 0x28, 0xf9, 0x04, 0x40, 0xaf, 0xa0, 0x9b, 0x7a, 0x3a, 0xa0, 0x0f, 0x8b, 0x2f, - 0x84, 0xe2, 0x53, 0xc4, 0xfa, 0xa8, 0x42, 0x4b, 0x17, 0x7b, 0x03, 0x05, 0xf0, 0x76, 0x81, 0x9a, - 0xd1, 0xa3, 0xa2, 0xf5, 0x3a, 0xe2, 0xb7, 0x1e, 0xd7, 0xd2, 0x5d, 0xfa, 0xe3, 0x30, 0x2c, 0xe1, - 0x5a, 0x74, 0x58, 0x81, 0x92, 0xe1, 0x7b, 0xeb, 0xe3, 0x9a, 0xda, 0x4b, 0xaf, 0xaf, 0x01, 0x15, - 0x89, 0x18, 0x3d, 0xae, 0x84, 0x59, 0x11, 0xbd, 0x75, 0x58, 0x4f, 0x79, 0x6d, 0xa2, 0x8a, 0xa2, - 0x2b, 0x13, 0xcd, 0x3c, 0x02, 0x2a, 0x13, 0xcd, 0xf1, 0xfe, 0x06, 0xba, 0x82, 0x41, 0x9e, 0xbe, - 0xd1, 0xc1, 0xba, 0x37, 0x65, 0xe1, 0x75, 0x60, 0x3d, 0xaa, 0xa3, 0xba, 0x74, 0x36, 0x81, 0x5e, - 0x9a, 0x64, 0x51, 0x49, 0xd3, 0x95, 0x3c, 0x17, 0xac, 0xfd, 0x2a, 0xb5, 0x74, 0x36, 0x79, 0xd2, - 0x2d, 0xcb, 0x66, 0x0d, 0xa3, 0x97, 0x65, 0xb3, 0x8e, 0xc3, 0xed, 0x0d, 0xf4, 0x2b, 0xdc, 0xcd, - 0xb1, 0x15, 0x1a, 0xdf, 0x06, 0x90, 0xe6, 0x41, 0xeb, 0xa0, 0x86, 0x66, 0xe2, 0xe9, 0x53, 0x03, - 0xcd, 0xe0, 0xad, 0x2c, 0x69, 0xa0, 0x87, 0xb7, 0x01, 0xa4, 0x18, 0xcf, 0x1a, 0x57, 0x2b, 0xa6, - 0x1c, 0xbd, 0x82, 0x6e, 0x8a, 0x2d, 0xca, 0x86, 0x47, 0x91, 0x7f, 0xca, 0x86, 0x47, 0x19, 0xe5, - 0x6c, 0xa0, 0x0b, 0xe8, 0x67, 0xf8, 0x03, 0xed, 0xaf, 0xb3, 0xcc, 0xb2, 0x92, 0xf5, 0xb0, 0x52, - 0x2f, 0xdd, 0x64, 0x69, 0x5a, 0x41, 0x6b, 0x83, 0xcb, 0x0e, 0xc0, 0xfd, 0x2a, 0xb5, 0xc4, 0xc1, - 0xc5, 0xa6, 0xfc, 0xc9, 0x7b, 0xf2, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdb, 0x3c, 0x6d, 0xd7, - 0xfb, 0x0d, 0x00, 0x00, + // 1247 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x58, 0x5b, 0x73, 0xdb, 0xc4, + 0x17, 0x8f, 0x62, 0x3b, 0x71, 0x8e, 0xed, 0xc6, 0xff, 0x4d, 0x9a, 0x38, 0x4a, 0xff, 0xc1, 0x15, + 0x90, 0x3a, 0x6d, 0x1a, 0x20, 0x9d, 0x42, 0x81, 0x17, 0x20, 0x01, 0x26, 0x0f, 0xa5, 0x33, 0x0a, + 0xed, 0x30, 0x43, 0x67, 0x34, 0x1b, 0x69, 0x9d, 0x88, 0xc8, 0x92, 0xaa, 0x5d, 0x85, 0x84, 0x6f, + 0xc2, 0x33, 0x2f, 0x7d, 0xe7, 0x03, 0xf1, 0x41, 0x78, 0x61, 0xf6, 0x22, 0x59, 0x37, 0xc7, 0xe2, + 0xf2, 0xb6, 0x3a, 0x7b, 0xce, 0xef, 0x5c, 0xf6, 0xec, 0xd9, 0x9f, 0x0d, 0x6b, 0x57, 0x81, 0x17, + 0x4f, 0x88, 0x45, 0x49, 0x74, 0x45, 0xa2, 0x83, 0x30, 0x0a, 0x58, 0x80, 0xfa, 0x39, 0xa1, 0x15, + 0x9e, 0x19, 0x1f, 0x00, 0xfa, 0x0a, 0x33, 0xfb, 0xe2, 0x98, 0x78, 0x84, 0x11, 0x93, 0xbc, 0x89, + 0x09, 0x65, 0x68, 0x0b, 0xda, 0x63, 0xd7, 0x23, 0x96, 0xeb, 0xd0, 0x81, 0x36, 0x6c, 0x8c, 0x56, + 0xcc, 0x65, 0xfe, 0x7d, 0xe2, 0x50, 0xe3, 0x05, 0xac, 0xe5, 0x0c, 0x68, 0x18, 0xf8, 0x94, 0xa0, + 0x67, 0xb0, 0x1c, 0x11, 0x1a, 0x7b, 0x4c, 0x1a, 0x74, 0x0e, 0x77, 0x0e, 0x8a, 0xbe, 0x0e, 0x52, + 0x93, 0xd8, 0x63, 0x66, 0xa2, 0x6e, 0xb8, 0xd0, 0xcd, 0x6e, 0xa0, 0x4d, 0x58, 0x56, 0xbe, 0x07, + 0xda, 0x50, 0x1b, 0xad, 0x98, 0x4b, 0xd2, 0x35, 0xda, 0x80, 0x25, 0xca, 0x30, 0x8b, 0xe9, 0x60, + 0x71, 0xa8, 0x8d, 0x5a, 0xa6, 0xfa, 0x42, 0xeb, 0xd0, 0x22, 0x51, 0x14, 0x44, 0x83, 0x86, 0x50, + 0x97, 0x1f, 0x08, 0x41, 0x93, 0xba, 0xbf, 0x90, 0x41, 0x73, 0xa8, 0x8d, 0x7a, 0xa6, 0x58, 0x1b, + 0xcb, 0xd0, 0xfa, 0x7a, 0x12, 0xb2, 0x1b, 0xe3, 0x13, 0x18, 0xbc, 0xc2, 0x76, 0x1c, 0x4f, 0x5e, + 0x89, 0x18, 0x8f, 0x2e, 0x88, 0x7d, 0x99, 0xe4, 0xbe, 0x0d, 0x2b, 0x2a, 0x72, 0x15, 0x41, 0xcf, + 0x6c, 0x4b, 0xc1, 0x89, 0x63, 0x7c, 0x01, 0x5b, 0x15, 0x86, 0xaa, 0x06, 0xef, 0x42, 0xef, 0x1c, + 0x47, 0x67, 0xf8, 0x9c, 0x58, 0x11, 0x66, 0x6e, 0x20, 0xac, 0x35, 0xb3, 0xab, 0x84, 0x26, 0x97, + 0x19, 0x3f, 0x82, 0x9e, 0x43, 0x08, 0x26, 0x21, 0xb6, 0x59, 0x1d, 0xe7, 0x68, 0x08, 0x9d, 0x30, + 0x22, 0xd8, 0xf3, 0x02, 0x1b, 0x33, 0x22, 0xaa, 0xd0, 0x30, 0xb3, 0x22, 0xe3, 0xff, 0xb0, 0x5d, + 0x09, 0x2e, 0x03, 0x34, 0x9e, 0x15, 0xa2, 0x0f, 0x26, 0x13, 0xb7, 0x96, 0x6b, 0xe3, 0x5e, 0x29, + 0x6a, 0x61, 0xa9, 0x70, 0x3f, 0x2d, 0xec, 0x7a, 0x04, 0xfb, 0x71, 0x58, 0x0b, 0xb8, 0x18, 0x71, + 0x62, 0x9a, 0x22, 0x6f, 0xca, 0xe6, 0x38, 0x0a, 0x3c, 0x8f, 0xd8, 0xcc, 0x0d, 0xfc, 0x04, 0x76, + 0x07, 0xc0, 0x4e, 0x85, 0xaa, 0x55, 0x32, 0x12, 0x43, 0x87, 0x41, 0xd9, 0x54, 0xc1, 0xbe, 0xd5, + 0x60, 0xed, 0x4b, 0x4a, 0xdd, 0x73, 0x5f, 0xba, 0xad, 0x55, 0xfe, 0xbc, 0xc3, 0xc5, 0xa2, 0xc3, + 0xe2, 0xf1, 0x34, 0x4a, 0xc7, 0xc3, 0x35, 0x22, 0x12, 0x7a, 0xae, 0x8d, 0x05, 0x44, 0x53, 0x40, + 0x64, 0x45, 0xa8, 0x0f, 0x0d, 0xc6, 0xbc, 0x41, 0x4b, 0xec, 0xf0, 0xa5, 0xb1, 0x01, 0xeb, 0xf9, + 0x48, 0x55, 0x0a, 0x1f, 0xc3, 0xa6, 0x94, 0x9c, 0xde, 0xf8, 0xf6, 0xa9, 0xb8, 0x09, 0xb5, 0x0a, + 0xfe, 0xa7, 0x06, 0x83, 0xb2, 0xa1, 0xea, 0xe0, 0x7f, 0x9b, 0xff, 0xdf, 0xcd, 0x0e, 0xbd, 0x03, + 0x1d, 0x86, 0x5d, 0xcf, 0x0a, 0xc6, 0x63, 0x4a, 0xd8, 0x60, 0x69, 0xa8, 0x8d, 0x9a, 0x26, 0x70, + 0xd1, 0x0b, 0x21, 0x41, 0x7b, 0xd0, 0xb7, 0x65, 0x17, 0x5b, 0x11, 0xb9, 0x72, 0x29, 0x47, 0x5e, + 0x16, 0x81, 0xad, 0xda, 0x49, 0x77, 0x4b, 0x31, 0x32, 0xa0, 0xe7, 0x3a, 0xd7, 0x96, 0x18, 0x1e, + 0xe2, 0xea, 0xb7, 0x05, 0x5a, 0xc7, 0x75, 0xae, 0xbf, 0x71, 0x3d, 0x72, 0xca, 0x27, 0xc0, 0x53, + 0xd8, 0x98, 0x26, 0x7f, 0xe2, 0x3b, 0xe4, 0xba, 0x56, 0xd1, 0xbe, 0xcd, 0x16, 0x5b, 0x99, 0xa9, + 0x92, 0xed, 0x03, 0x72, 0xb9, 0x40, 0xfa, 0xb5, 0x03, 0x9f, 0x11, 0x9f, 0x09, 0x80, 0xae, 0xd9, + 0x17, 0x3b, 0xdc, 0xf9, 0x91, 0x94, 0x1b, 0xbf, 0x6a, 0x70, 0x77, 0x8a, 0x74, 0x8c, 0x19, 0xae, + 0xd5, 0x7a, 0x3a, 0xb4, 0xd3, 0xec, 0x17, 0xe5, 0x5e, 0xf2, 0xcd, 0xc7, 0xa2, 0xaa, 0x5e, 0x43, + 0xec, 0xa8, 0xaf, 0xaa, 0x01, 0xc8, 0x9d, 0xf8, 0x84, 0x38, 0x72, 0xba, 0xca, 0x63, 0x68, 0x4b, + 0xc1, 0x89, 0x63, 0x7c, 0x9e, 0xad, 0x8d, 0x0c, 0x4d, 0xe5, 0x78, 0x1f, 0xba, 0x15, 0xd9, 0x75, + 0xc6, 0x99, 0xc4, 0x3e, 0x02, 0x24, 0x8d, 0x9f, 0x07, 0xb1, 0x5f, 0x6f, 0xa6, 0xdc, 0x85, 0xb5, + 0x9c, 0x89, 0x6a, 0xec, 0x27, 0xb0, 0x2e, 0xc5, 0x2f, 0xfd, 0x49, 0x6d, 0xac, 0xcd, 0xa4, 0xac, + 0xa9, 0x91, 0x42, 0x3b, 0x4c, 0x9c, 0xe4, 0x1f, 0xb8, 0x5b, 0xc1, 0x36, 0x92, 0x08, 0xf2, 0x6f, + 0x9c, 0xf1, 0xbb, 0x06, 0x1b, 0xa6, 0x6a, 0x67, 0xf2, 0xdf, 0x0e, 0x8e, 0xec, 0xc5, 0x69, 0xcc, + 0xbc, 0x38, 0xcd, 0xe9, 0xc5, 0x19, 0x41, 0x9f, 0x06, 0x71, 0x64, 0x13, 0xcb, 0xc1, 0x0c, 0x5b, + 0x7e, 0xe0, 0x10, 0x75, 0xa0, 0x77, 0xa4, 0x9c, 0x1f, 0xe0, 0x77, 0x81, 0x43, 0x8c, 0x2d, 0xd8, + 0x2c, 0x05, 0xad, 0x12, 0xf2, 0x61, 0xf5, 0x28, 0x08, 0x6f, 0x78, 0x83, 0xd6, 0x4c, 0xa4, 0xe3, + 0x52, 0x2b, 0xb9, 0x64, 0x22, 0x93, 0xb6, 0xb9, 0xe2, 0xd2, 0x13, 0x79, 0xc3, 0xd4, 0xbe, 0x83, + 0x99, 0xdc, 0x6f, 0x24, 0xfb, 0xc7, 0x98, 0xf1, 0x7d, 0xe3, 0x29, 0xf4, 0xa7, 0xfe, 0xea, 0xf7, + 0xd6, 0x67, 0xb0, 0x6d, 0x12, 0xec, 0xc8, 0xe0, 0xc5, 0x55, 0xae, 0x3f, 0xee, 0xfe, 0xd0, 0xe0, + 0x5e, 0xb5, 0x71, 0x9d, 0x91, 0xc7, 0x2f, 0x77, 0x32, 0x52, 0x98, 0x3b, 0x21, 0x94, 0xe1, 0x49, + 0x28, 0xf2, 0x6e, 0x9a, 0x7d, 0x35, 0x57, 0xbe, 0x4f, 0xe4, 0xe5, 0x01, 0xd4, 0x28, 0x0d, 0x20, + 0x8e, 0x98, 0xd4, 0x27, 0x83, 0xd8, 0x94, 0x88, 0x8e, 0xac, 0x53, 0x0e, 0x31, 0xd5, 0x16, 0x88, + 0x2d, 0x89, 0xa8, 0x14, 0xc5, 0x48, 0xfb, 0x01, 0xe0, 0xd8, 0xa5, 0x97, 0x32, 0x2d, 0xde, 0x29, + 0x8e, 0x1b, 0xa9, 0xe7, 0x90, 0x2f, 0xb9, 0x04, 0x7b, 0x9e, 0x0a, 0x9a, 0x2f, 0xf9, 0x64, 0x88, + 0x29, 0x71, 0x54, 0x78, 0x62, 0xcd, 0x65, 0xe3, 0x88, 0x10, 0x15, 0x89, 0x58, 0x1b, 0xbf, 0x69, + 0xb0, 0xf2, 0x9c, 0x4c, 0x14, 0xf2, 0x0e, 0xc0, 0x79, 0x10, 0x05, 0x31, 0x73, 0x7d, 0x42, 0x85, + 0x83, 0x96, 0x99, 0x91, 0xfc, 0x73, 0x3f, 0x62, 0x52, 0x11, 0x6f, 0xac, 0x92, 0x13, 0x6b, 0x2e, + 0xbb, 0x20, 0x38, 0x54, 0x2f, 0x82, 0x58, 0x73, 0xa2, 0x47, 0x19, 0xb6, 0x2f, 0xc5, 0x03, 0xd0, + 0x34, 0xe5, 0xc7, 0xe1, 0xdb, 0x1e, 0x74, 0xd5, 0xdc, 0x12, 0x4c, 0x13, 0xbd, 0x86, 0x4e, 0x86, + 0xa1, 0xa2, 0xf7, 0xca, 0x44, 0xb4, 0xcc, 0x78, 0xf5, 0xf7, 0xe7, 0x68, 0xa9, 0x1b, 0xb3, 0x80, + 0x7c, 0xf8, 0x5f, 0x89, 0x01, 0xa2, 0x87, 0x65, 0xeb, 0x59, 0xfc, 0x52, 0x7f, 0x54, 0x4b, 0x37, + 0xf5, 0xc7, 0x60, 0xad, 0x82, 0xd2, 0xa1, 0xfd, 0x39, 0x28, 0x39, 0x5a, 0xa9, 0x3f, 0xae, 0xa9, + 0x9d, 0x7a, 0x7d, 0x03, 0xa8, 0xcc, 0xf7, 0xd0, 0xa3, 0xb9, 0x30, 0x53, 0x3e, 0xa9, 0xef, 0xd7, + 0x53, 0x9e, 0x99, 0xa8, 0x64, 0x82, 0x73, 0x13, 0xcd, 0x71, 0xcd, 0xb9, 0x89, 0x16, 0xe8, 0xe5, + 0x02, 0xba, 0x84, 0x7e, 0x91, 0x25, 0xa2, 0xbd, 0x59, 0x3f, 0x5d, 0x4a, 0x24, 0x54, 0x7f, 0x58, + 0x47, 0x35, 0x75, 0x66, 0x41, 0x37, 0xcb, 0xe5, 0x50, 0x45, 0xd3, 0x55, 0xb0, 0x52, 0x7d, 0x77, + 0x9e, 0x5a, 0x36, 0x9b, 0x22, 0xb7, 0xab, 0xca, 0x66, 0x06, 0x71, 0xac, 0xca, 0x66, 0x16, 0x55, + 0x34, 0x16, 0xd0, 0x4f, 0xb0, 0x5a, 0x20, 0x45, 0x68, 0x74, 0x1b, 0x40, 0x96, 0x6e, 0xe9, 0x7b, + 0x35, 0x34, 0x13, 0x4f, 0x1f, 0x6a, 0xe8, 0x1c, 0xee, 0xe4, 0xb9, 0x09, 0x7a, 0x70, 0x1b, 0x40, + 0x86, 0x58, 0xe9, 0xa3, 0xf9, 0x8a, 0x19, 0x47, 0xaf, 0xa1, 0x93, 0x21, 0x25, 0x55, 0xc3, 0xa3, + 0x4c, 0x73, 0xaa, 0x86, 0x47, 0x15, 0xb3, 0x59, 0x40, 0x67, 0xd0, 0xcb, 0xd1, 0x14, 0xb4, 0x3b, + 0xcb, 0x32, 0x4f, 0x7e, 0xf4, 0x07, 0x73, 0xf5, 0xb2, 0x4d, 0x96, 0x65, 0x2f, 0x68, 0x66, 0x70, + 0xf9, 0x01, 0xb8, 0x3b, 0x4f, 0x2d, 0x75, 0x70, 0x01, 0xab, 0x05, 0x42, 0x51, 0x75, 0xee, 0xd5, + 0x44, 0xa9, 0xea, 0xdc, 0x67, 0xb1, 0x93, 0x05, 0xf4, 0x33, 0xac, 0x57, 0xbd, 0xdd, 0xe8, 0x71, + 0x15, 0xc8, 0x4c, 0x82, 0xa0, 0x1f, 0xd4, 0x55, 0x4f, 0x1d, 0xbf, 0x84, 0x76, 0x42, 0x54, 0xd0, + 0xfd, 0xb2, 0x75, 0x81, 0x34, 0xe9, 0xc6, 0x6d, 0x2a, 0xd3, 0xe6, 0x3a, 0x5b, 0x12, 0xff, 0xc2, + 0x3c, 0xf9, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x6c, 0x7d, 0x0d, 0xbb, 0x9c, 0x11, 0x00, 0x00, } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 429ca9b68..c924b7a62 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -29,7 +29,7 @@ func (vs *VolumeServer) AssignVolume(ctx context.Context, req *volume_server_pb. resp := &volume_server_pb.AssignVolumeResponse{} err := vs.store.AddVolume( - storage.VolumeId(req.VolumdId), + storage.VolumeId(req.VolumeId), req.Collection, vs.needleMapKind, req.Replication, @@ -51,7 +51,7 @@ func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.V resp := &volume_server_pb.VolumeMountResponse{} - err := vs.store.MountVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.MountVolume(storage.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume mount %v: %v", req, err) @@ -67,7 +67,7 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb resp := &volume_server_pb.VolumeUnmountResponse{} - err := vs.store.UnmountVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume unmount %v: %v", req, err) @@ -83,7 +83,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb. resp := &volume_server_pb.VolumeDeleteResponse{} - err := vs.store.DeleteVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.DeleteVolume(storage.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume delete %v: %v", req, err) diff --git a/weed/server/volume_grpc_replicate.go b/weed/server/volume_grpc_replicate.go new file mode 100644 index 000000000..20a85fd6f --- /dev/null +++ b/weed/server/volume_grpc_replicate.go @@ -0,0 +1,155 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "io" + "os" +) + +func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_pb.ReplicateVolumeRequest) (*volume_server_pb.ReplicateVolumeResponse, error) { + + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v != nil { + // unmount the volume + err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId)) + if err != nil { + return nil, fmt.Errorf("failed to unmount volume %d: %v", req.VolumeId, err) + } + } + + location := vs.store.FindFreeLocation() + if location == nil { + return nil, fmt.Errorf("no space left") + } + + volumeFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId)) + + // the master will not start compaction for read-only volumes, so it is safe to just copy files directly + // copy .dat and .idx files + // read .idx .dat file size and timestamp + // send .idx file + // send .dat file + // confirm size and timestamp + + err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + + // TODO read file sizes before copying + client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{}) + + copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: req.VolumeId, + IsIdxFile: true, + }) + + if err != nil { + return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err) + } + + err = writeToFile(copyFileClient, volumeFileName+".idx") + if err != nil { + return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err) + } + + copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: req.VolumeId, + IsDatFile: true, + }) + + if err != nil { + return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err) + } + + err = writeToFile(copyFileClient, volumeFileName+".dat") + if err != nil { + return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err) + } + + return nil + }) + + if err != nil { + return nil, err + } + + // TODO: check the timestamp and size + + // mount the volume + err = vs.store.MountVolume(storage.VolumeId(req.VolumeId)) + if err != nil { + return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err) + } + + return &volume_server_pb.ReplicateVolumeResponse{}, err + +} + +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { + println("writing to ", fileName) + dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return nil + } + defer dst.Close() + + for { + resp, receiveErr := client.Recv() + if receiveErr == io.EOF { + break + } + if receiveErr != nil { + return fmt.Errorf("receiving %s: %v", fileName, receiveErr) + } + dst.Write(resp.FileContent) + } + return nil +} + +func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { + resp := &volume_server_pb.ReadVolumeFileStatusResponse{} + return resp, nil +} + +func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) (error) { + + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } + + const BufferSize = 1024 * 16 + var fileName = v.FileName() + if req.IsDatFile { + fileName += ".dat" + } else if req.IsIdxFile { + fileName += ".idx" + } + file, err := os.Open(fileName) + if err != nil { + return err + } + defer file.Close() + + buffer := make([]byte, BufferSize) + + for { + bytesread, err := file.Read(buffer) + + if err != nil { + if err != io.EOF { + return err + } + break + } + + stream.Send(&volume_server_pb.CopyFileResponse{ + FileContent: buffer[:bytesread], + }) + + } + + return nil +} diff --git a/weed/server/volume_grpc_sync.go b/weed/server/volume_grpc_sync.go index 0114b38a4..971258689 100644 --- a/weed/server/volume_grpc_sync.go +++ b/weed/server/volume_grpc_sync.go @@ -12,14 +12,14 @@ import ( func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) { - v := vs.store.GetVolume(storage.VolumeId(req.VolumdId)) + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) if v == nil { - return nil, fmt.Errorf("not found volume id %d", req.VolumdId) + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) } resp := v.GetVolumeSyncStatus() - glog.V(2).Infof("volume sync status %d", req.VolumdId) + glog.V(2).Infof("volume sync status %d", req.VolumeId) return resp, nil @@ -27,17 +27,17 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexRequest, stream volume_server_pb.VolumeServer_VolumeSyncIndexServer) error { - v := vs.store.GetVolume(storage.VolumeId(req.VolumdId)) + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) if v == nil { - return fmt.Errorf("not found volume id %d", req.VolumdId) + return fmt.Errorf("not found volume id %d", req.VolumeId) } content, err := v.IndexFileContent() if err != nil { - glog.Errorf("sync volume %d index: %v", req.VolumdId, err) + glog.Errorf("sync volume %d index: %v", req.VolumeId, err) } else { - glog.V(2).Infof("sync volume %d index", req.VolumdId) + glog.V(2).Infof("sync volume %d index", req.VolumeId) } const blockSizeLimit = 1024 * 1024 * 2 @@ -57,9 +57,9 @@ func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexReq func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataRequest, stream volume_server_pb.VolumeServer_VolumeSyncDataServer) error { - v := vs.store.GetVolume(storage.VolumeId(req.VolumdId)) + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) if v == nil { - return fmt.Errorf("not found volume id %d", req.VolumdId) + return fmt.Errorf("not found volume id %d", req.VolumeId) } if uint32(v.SuperBlock.CompactRevision) != req.Revision { @@ -82,7 +82,7 @@ func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataReque } if err != nil { - glog.Errorf("sync volume %d data: %v", req.VolumdId, err) + glog.Errorf("sync volume %d data: %v", req.VolumeId, err) } const blockSizeLimit = 1024 * 1024 * 2 diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index f0c87b582..d31b8f8e7 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -12,12 +12,12 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve resp := &volume_server_pb.VacuumVolumeCheckResponse{} - garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumdId)) + garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumeId)) resp.GarbageRatio = garbageRatio if err != nil { - glog.V(3).Infof("check volume %d: %v", req.VolumdId, err) + glog.V(3).Infof("check volume %d: %v", req.VolumeId, err) } return resp, err @@ -28,12 +28,12 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCompactResponse{} - err := vs.store.CompactVolume(storage.VolumeId(req.VolumdId), req.Preallocate) + err := vs.store.CompactVolume(storage.VolumeId(req.VolumeId), req.Preallocate) if err != nil { - glog.Errorf("compact volume %d: %v", req.VolumdId, err) + glog.Errorf("compact volume %d: %v", req.VolumeId, err) } else { - glog.V(1).Infof("compact volume %d", req.VolumdId) + glog.V(1).Infof("compact volume %d", req.VolumeId) } return resp, err @@ -44,12 +44,12 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv resp := &volume_server_pb.VacuumVolumeCommitResponse{} - err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumeId)) if err != nil { - glog.Errorf("commit volume %d: %v", req.VolumdId, err) + glog.Errorf("commit volume %d: %v", req.VolumeId, err) } else { - glog.V(1).Infof("commit volume %d", req.VolumdId) + glog.V(1).Infof("commit volume %d", req.VolumeId) } return resp, err @@ -60,12 +60,12 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCleanupResponse{} - err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumeId)) if err != nil { - glog.Errorf("cleanup volume %d: %v", req.VolumdId, err) + glog.Errorf("cleanup volume %d: %v", req.VolumeId, err) } else { - glog.V(1).Infof("cleanup volume %d", req.VolumdId) + glog.V(1).Infof("cleanup volume %d", req.VolumeId) } return resp, err diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 096532fdf..7d09661dc 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -71,7 +71,7 @@ func RunShell(options ShellOptions) { } else { for _, c := range commands { if c.Name() == cmd { - if err := c.Do(args, commandEnv, os.Stderr); err != nil { + if err := c.Do(args, commandEnv, os.Stdout); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) } } diff --git a/weed/storage/store.go b/weed/storage/store.go index 8d4d9c55e..56e973738 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -77,7 +77,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume { } return nil } -func (s *Store) findFreeLocation() (ret *DiskLocation) { +func (s *Store) FindFreeLocation() (ret *DiskLocation) { max := 0 for _, location := range s.Locations { currentFreeCount := location.MaxVolumeCount - location.VolumesLen() @@ -92,7 +92,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } - if location := s.findFreeLocation(); location != nil { + if location := s.FindFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 22acf1653..807fefa38 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -5,6 +5,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "os" "path" + "strconv" "sync" "time" @@ -42,14 +43,18 @@ func (v *Volume) String() string { return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) } -func (v *Volume) FileName() (fileName string) { - if v.Collection == "" { - fileName = path.Join(v.dir, v.Id.String()) +func VolumeFileName(collection string, dir string, id int) (fileName string) { + idString := strconv.Itoa(id) + if collection == "" { + fileName = path.Join(dir, idString) } else { - fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String()) + fileName = path.Join(dir, collection+"_"+idString) } return } +func (v *Volume) FileName() (fileName string) { + return VolumeFileName(v.Collection, v.dir, int(v.Id)) +} func (v *Volume) DataFile() *os.File { return v.dataFile } diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go index 8d90a729d..827e6685a 100644 --- a/weed/storage/volume_sync.go +++ b/weed/storage/volume_sync.go @@ -192,7 +192,7 @@ func (v *Volume) fetchNeedle(volumeServer string, grpcDialOption grpc.DialOption return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{ - VolumdId: uint32(v.Id), + VolumeId: uint32(v.Id), Revision: uint32(compactRevision), Offset: uint32(needleValue.Offset), Size: uint32(needleValue.Size), diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 66b1b3af5..1360988b3 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -17,7 +17,7 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid storage.Vo return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.AssignVolume(context.Background(), &volume_server_pb.AssignVolumeRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), Collection: option.Collection, Replication: option.ReplicaPlacement.String(), Ttl: option.Ttl.String(), diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 840821efa..ea65b2ff9 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -17,7 +17,7 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi go func(index int, url string, vid storage.VolumeId) { err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), }) if err != nil { ch <- false @@ -52,7 +52,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), }) return err }) @@ -83,7 +83,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), }) return err }) @@ -104,7 +104,7 @@ func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), }) return err })