move volume vacuum to gRpc

This commit is contained in:
Chris Lu 2018-10-14 23:12:43 -07:00
parent 91ac2e0dd9
commit b1daede91b
14 changed files with 469 additions and 207 deletions

View file

@ -42,7 +42,7 @@ var (
defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
// mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds")
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file")

View file

@ -60,7 +60,7 @@ var (
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverPeers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list")
serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
serverGarbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")

View file

@ -97,7 +97,7 @@ func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteRes
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
err = withVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,

View file

@ -11,7 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
func withVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error {
func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error {
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {

View file

@ -8,6 +8,14 @@ service VolumeServer {
//Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
rpc BatchDelete (BatchDeleteRequest) returns (BatchDeleteResponse) {
}
rpc VacuumVolumeCheck (VacuumVolumeCheckRequest) returns (VacuumVolumeCheckResponse) {
}
rpc VacuumVolumeCompact (VacuumVolumeCompactRequest) returns (VacuumVolumeCompactResponse) {
}
rpc VacuumVolumeCommit (VacuumVolumeCommitRequest) returns (VacuumVolumeCommitResponse) {
}
rpc VacuumVolumeCleanup (VacuumVolumeCleanupRequest) returns (VacuumVolumeCleanupResponse) {
}
}
//////////////////////////////////////////////////
@ -28,3 +36,29 @@ message DeleteResult {
message Empty {
}
message VacuumVolumeCheckRequest {
uint32 volumd_id = 1;
}
message VacuumVolumeCheckResponse {
double garbage_ratio = 1;
}
message VacuumVolumeCompactRequest {
uint32 volumd_id = 1;
int64 preallocate = 2;
}
message VacuumVolumeCompactResponse {
}
message VacuumVolumeCommitRequest {
uint32 volumd_id = 1;
}
message VacuumVolumeCommitResponse {
}
message VacuumVolumeCleanupRequest {
uint32 volumd_id = 1;
}
message VacuumVolumeCleanupResponse {
}

View file

@ -13,6 +13,14 @@ It has these top-level messages:
BatchDeleteResponse
DeleteResult
Empty
VacuumVolumeCheckRequest
VacuumVolumeCheckResponse
VacuumVolumeCompactRequest
VacuumVolumeCompactResponse
VacuumVolumeCommitRequest
VacuumVolumeCommitResponse
VacuumVolumeCleanupRequest
VacuumVolumeCleanupResponse
*/
package volume_server_pb
@ -116,11 +124,131 @@ func (m *Empty) String() string { return proto.CompactTextString(m) }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
type VacuumVolumeCheckRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
}
func (m *VacuumVolumeCheckRequest) Reset() { *m = VacuumVolumeCheckRequest{} }
func (m *VacuumVolumeCheckRequest) String() string { return proto.CompactTextString(m) }
func (*VacuumVolumeCheckRequest) ProtoMessage() {}
func (*VacuumVolumeCheckRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *VacuumVolumeCheckRequest) GetVolumdId() uint32 {
if m != nil {
return m.VolumdId
}
return 0
}
type VacuumVolumeCheckResponse struct {
GarbageRatio float64 `protobuf:"fixed64,1,opt,name=garbage_ratio,json=garbageRatio" json:"garbage_ratio,omitempty"`
}
func (m *VacuumVolumeCheckResponse) Reset() { *m = VacuumVolumeCheckResponse{} }
func (m *VacuumVolumeCheckResponse) String() string { return proto.CompactTextString(m) }
func (*VacuumVolumeCheckResponse) ProtoMessage() {}
func (*VacuumVolumeCheckResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *VacuumVolumeCheckResponse) GetGarbageRatio() float64 {
if m != nil {
return m.GarbageRatio
}
return 0
}
type VacuumVolumeCompactRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
Preallocate int64 `protobuf:"varint,2,opt,name=preallocate" json:"preallocate,omitempty"`
}
func (m *VacuumVolumeCompactRequest) Reset() { *m = VacuumVolumeCompactRequest{} }
func (m *VacuumVolumeCompactRequest) String() string { return proto.CompactTextString(m) }
func (*VacuumVolumeCompactRequest) ProtoMessage() {}
func (*VacuumVolumeCompactRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *VacuumVolumeCompactRequest) GetVolumdId() uint32 {
if m != nil {
return m.VolumdId
}
return 0
}
func (m *VacuumVolumeCompactRequest) GetPreallocate() int64 {
if m != nil {
return m.Preallocate
}
return 0
}
type VacuumVolumeCompactResponse struct {
}
func (m *VacuumVolumeCompactResponse) Reset() { *m = VacuumVolumeCompactResponse{} }
func (m *VacuumVolumeCompactResponse) String() string { return proto.CompactTextString(m) }
func (*VacuumVolumeCompactResponse) ProtoMessage() {}
func (*VacuumVolumeCompactResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
type VacuumVolumeCommitRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
}
func (m *VacuumVolumeCommitRequest) Reset() { *m = VacuumVolumeCommitRequest{} }
func (m *VacuumVolumeCommitRequest) String() string { return proto.CompactTextString(m) }
func (*VacuumVolumeCommitRequest) ProtoMessage() {}
func (*VacuumVolumeCommitRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (m *VacuumVolumeCommitRequest) GetVolumdId() uint32 {
if m != nil {
return m.VolumdId
}
return 0
}
type VacuumVolumeCommitResponse struct {
}
func (m *VacuumVolumeCommitResponse) Reset() { *m = VacuumVolumeCommitResponse{} }
func (m *VacuumVolumeCommitResponse) String() string { return proto.CompactTextString(m) }
func (*VacuumVolumeCommitResponse) ProtoMessage() {}
func (*VacuumVolumeCommitResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
type VacuumVolumeCleanupRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
}
func (m *VacuumVolumeCleanupRequest) Reset() { *m = VacuumVolumeCleanupRequest{} }
func (m *VacuumVolumeCleanupRequest) String() string { return proto.CompactTextString(m) }
func (*VacuumVolumeCleanupRequest) ProtoMessage() {}
func (*VacuumVolumeCleanupRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
func (m *VacuumVolumeCleanupRequest) GetVolumdId() uint32 {
if m != nil {
return m.VolumdId
}
return 0
}
type VacuumVolumeCleanupResponse struct {
}
func (m *VacuumVolumeCleanupResponse) Reset() { *m = VacuumVolumeCleanupResponse{} }
func (m *VacuumVolumeCleanupResponse) String() string { return proto.CompactTextString(m) }
func (*VacuumVolumeCleanupResponse) ProtoMessage() {}
func (*VacuumVolumeCleanupResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
func init() {
proto.RegisterType((*BatchDeleteRequest)(nil), "volume_server_pb.BatchDeleteRequest")
proto.RegisterType((*BatchDeleteResponse)(nil), "volume_server_pb.BatchDeleteResponse")
proto.RegisterType((*DeleteResult)(nil), "volume_server_pb.DeleteResult")
proto.RegisterType((*Empty)(nil), "volume_server_pb.Empty")
proto.RegisterType((*VacuumVolumeCheckRequest)(nil), "volume_server_pb.VacuumVolumeCheckRequest")
proto.RegisterType((*VacuumVolumeCheckResponse)(nil), "volume_server_pb.VacuumVolumeCheckResponse")
proto.RegisterType((*VacuumVolumeCompactRequest)(nil), "volume_server_pb.VacuumVolumeCompactRequest")
proto.RegisterType((*VacuumVolumeCompactResponse)(nil), "volume_server_pb.VacuumVolumeCompactResponse")
proto.RegisterType((*VacuumVolumeCommitRequest)(nil), "volume_server_pb.VacuumVolumeCommitRequest")
proto.RegisterType((*VacuumVolumeCommitResponse)(nil), "volume_server_pb.VacuumVolumeCommitResponse")
proto.RegisterType((*VacuumVolumeCleanupRequest)(nil), "volume_server_pb.VacuumVolumeCleanupRequest")
proto.RegisterType((*VacuumVolumeCleanupResponse)(nil), "volume_server_pb.VacuumVolumeCleanupResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
@ -136,6 +264,10 @@ const _ = grpc.SupportPackageIsVersion4
type VolumeServerClient interface {
// Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
BatchDelete(ctx context.Context, in *BatchDeleteRequest, opts ...grpc.CallOption) (*BatchDeleteResponse, error)
VacuumVolumeCheck(ctx context.Context, in *VacuumVolumeCheckRequest, opts ...grpc.CallOption) (*VacuumVolumeCheckResponse, error)
VacuumVolumeCompact(ctx context.Context, in *VacuumVolumeCompactRequest, opts ...grpc.CallOption) (*VacuumVolumeCompactResponse, error)
VacuumVolumeCommit(ctx context.Context, in *VacuumVolumeCommitRequest, opts ...grpc.CallOption) (*VacuumVolumeCommitResponse, error)
VacuumVolumeCleanup(ctx context.Context, in *VacuumVolumeCleanupRequest, opts ...grpc.CallOption) (*VacuumVolumeCleanupResponse, error)
}
type volumeServerClient struct {
@ -155,11 +287,51 @@ func (c *volumeServerClient) BatchDelete(ctx context.Context, in *BatchDeleteReq
return out, nil
}
func (c *volumeServerClient) VacuumVolumeCheck(ctx context.Context, in *VacuumVolumeCheckRequest, opts ...grpc.CallOption) (*VacuumVolumeCheckResponse, error) {
out := new(VacuumVolumeCheckResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VacuumVolumeCheck", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) VacuumVolumeCompact(ctx context.Context, in *VacuumVolumeCompactRequest, opts ...grpc.CallOption) (*VacuumVolumeCompactResponse, error) {
out := new(VacuumVolumeCompactResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VacuumVolumeCompact", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) VacuumVolumeCommit(ctx context.Context, in *VacuumVolumeCommitRequest, opts ...grpc.CallOption) (*VacuumVolumeCommitResponse, error) {
out := new(VacuumVolumeCommitResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VacuumVolumeCommit", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) VacuumVolumeCleanup(ctx context.Context, in *VacuumVolumeCleanupRequest, opts ...grpc.CallOption) (*VacuumVolumeCleanupResponse, error) {
out := new(VacuumVolumeCleanupResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VacuumVolumeCleanup", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for VolumeServer service
type VolumeServerServer interface {
// Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
BatchDelete(context.Context, *BatchDeleteRequest) (*BatchDeleteResponse, error)
VacuumVolumeCheck(context.Context, *VacuumVolumeCheckRequest) (*VacuumVolumeCheckResponse, error)
VacuumVolumeCompact(context.Context, *VacuumVolumeCompactRequest) (*VacuumVolumeCompactResponse, error)
VacuumVolumeCommit(context.Context, *VacuumVolumeCommitRequest) (*VacuumVolumeCommitResponse, error)
VacuumVolumeCleanup(context.Context, *VacuumVolumeCleanupRequest) (*VacuumVolumeCleanupResponse, error)
}
func RegisterVolumeServerServer(s *grpc.Server, srv VolumeServerServer) {
@ -184,6 +356,78 @@ func _VolumeServer_BatchDelete_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VacuumVolumeCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VacuumVolumeCheckRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).VacuumVolumeCheck(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/VacuumVolumeCheck",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).VacuumVolumeCheck(ctx, req.(*VacuumVolumeCheckRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VacuumVolumeCompact_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VacuumVolumeCompactRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).VacuumVolumeCompact(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/VacuumVolumeCompact",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).VacuumVolumeCompact(ctx, req.(*VacuumVolumeCompactRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VacuumVolumeCommit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VacuumVolumeCommitRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).VacuumVolumeCommit(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/VacuumVolumeCommit",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).VacuumVolumeCommit(ctx, req.(*VacuumVolumeCommitRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VacuumVolumeCleanup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VacuumVolumeCleanupRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).VacuumVolumeCleanup(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/VacuumVolumeCleanup",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).VacuumVolumeCleanup(ctx, req.(*VacuumVolumeCleanupRequest))
}
return interceptor(ctx, in, info, handler)
}
var _VolumeServer_serviceDesc = grpc.ServiceDesc{
ServiceName: "volume_server_pb.VolumeServer",
HandlerType: (*VolumeServerServer)(nil),
@ -192,6 +436,22 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
MethodName: "BatchDelete",
Handler: _VolumeServer_BatchDelete_Handler,
},
{
MethodName: "VacuumVolumeCheck",
Handler: _VolumeServer_VacuumVolumeCheck_Handler,
},
{
MethodName: "VacuumVolumeCompact",
Handler: _VolumeServer_VacuumVolumeCompact_Handler,
},
{
MethodName: "VacuumVolumeCommit",
Handler: _VolumeServer_VacuumVolumeCommit_Handler,
},
{
MethodName: "VacuumVolumeCleanup",
Handler: _VolumeServer_VacuumVolumeCleanup_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "volume_server.proto",
@ -200,21 +460,34 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 252 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x50, 0xc1, 0x4a, 0xc3, 0x40,
0x10, 0x75, 0x6d, 0x93, 0xd8, 0x69, 0x05, 0x99, 0x8a, 0xae, 0x1e, 0x24, 0x2c, 0x0a, 0x39, 0x45,
0xa8, 0x17, 0xcf, 0xa2, 0x07, 0x4f, 0xc2, 0x0a, 0x9e, 0x84, 0xd0, 0xda, 0x11, 0x03, 0x5b, 0x37,
0xee, 0xec, 0x16, 0xf4, 0xeb, 0xc5, 0x4d, 0x95, 0xd6, 0x1c, 0x7a, 0x9b, 0x37, 0xf3, 0x1e, 0xef,
0xcd, 0x83, 0xf1, 0xd2, 0x9a, 0xb0, 0xa0, 0x8a, 0xc9, 0x2d, 0xc9, 0x95, 0x8d, 0xb3, 0xde, 0xe2,
0xc1, 0xc6, 0xb2, 0x6a, 0x66, 0xea, 0x12, 0xf0, 0x66, 0xea, 0x5f, 0xde, 0x6e, 0xc9, 0x90, 0x27,
0x4d, 0x1f, 0x81, 0xd8, 0xe3, 0x09, 0xec, 0xbd, 0xd6, 0x86, 0xaa, 0x7a, 0xce, 0x52, 0xe4, 0xbd,
0x62, 0xa0, 0xb3, 0x1f, 0x7c, 0x3f, 0x67, 0xf5, 0x00, 0xe3, 0x0d, 0x01, 0x37, 0xf6, 0x9d, 0x09,
0xaf, 0x21, 0x73, 0xc4, 0xc1, 0xf8, 0x56, 0x30, 0x9c, 0x9c, 0x95, 0xff, 0xbd, 0xca, 0x3f, 0x49,
0x30, 0x5e, 0xff, 0xd2, 0x55, 0x0d, 0xa3, 0xf5, 0x03, 0x1e, 0x43, 0xb6, 0xf2, 0x96, 0x22, 0x17,
0xc5, 0x40, 0xa7, 0xad, 0x35, 0x1e, 0x41, 0xca, 0x7e, 0xea, 0x03, 0xcb, 0xdd, 0x5c, 0x14, 0x89,
0x5e, 0x21, 0x3c, 0x84, 0x84, 0x9c, 0xb3, 0x4e, 0xf6, 0x22, 0xbd, 0x05, 0x88, 0xd0, 0xe7, 0xfa,
0x8b, 0x64, 0x3f, 0x17, 0xc5, 0xbe, 0x8e, 0xb3, 0xca, 0x20, 0xb9, 0x5b, 0x34, 0xfe, 0x73, 0x62,
0x60, 0xf4, 0x14, 0xd3, 0x3d, 0xc6, 0x70, 0xf8, 0x0c, 0xc3, 0xb5, 0xa7, 0xf0, 0xbc, 0x9b, 0xbd,
0x5b, 0xd2, 0xe9, 0xc5, 0x16, 0x56, 0xdb, 0x8c, 0xda, 0x99, 0xa5, 0xb1, 0xfc, 0xab, 0xef, 0x00,
0x00, 0x00, 0xff, 0xff, 0xd3, 0x09, 0x3b, 0x59, 0x93, 0x01, 0x00, 0x00,
// 454 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x54, 0x4b, 0x6f, 0xd3, 0x40,
0x10, 0xae, 0xc9, 0xab, 0x99, 0x24, 0x12, 0x4c, 0x10, 0xb8, 0x2e, 0x20, 0x6b, 0x01, 0x29, 0xa2,
0x25, 0x48, 0xe5, 0x40, 0xb9, 0x21, 0x1e, 0x87, 0x9e, 0x90, 0x16, 0xa9, 0x17, 0x90, 0xa2, 0x8d,
0x33, 0xb4, 0x16, 0x76, 0xd6, 0xdd, 0x5d, 0x57, 0x82, 0xbf, 0xc6, 0x9f, 0x43, 0xdd, 0x75, 0x42,
0x1c, 0x27, 0xb2, 0x6f, 0xde, 0xd9, 0xf9, 0x1e, 0xb3, 0xf3, 0xc9, 0x30, 0xbe, 0x95, 0x49, 0x9e,
0xd2, 0x4c, 0x93, 0xba, 0x25, 0x35, 0xcd, 0x94, 0x34, 0x12, 0xef, 0x97, 0x8a, 0xb3, 0x6c, 0xce,
0xde, 0x00, 0x7e, 0x14, 0x26, 0xba, 0xfe, 0x4c, 0x09, 0x19, 0xe2, 0x74, 0x93, 0x93, 0x36, 0x78,
0x04, 0x87, 0x3f, 0xe3, 0x84, 0x66, 0xf1, 0x42, 0xfb, 0x5e, 0xd8, 0x9a, 0xf4, 0x79, 0xef, 0xee,
0x7c, 0xb1, 0xd0, 0xec, 0x2b, 0x8c, 0x4b, 0x00, 0x9d, 0xc9, 0xa5, 0x26, 0x3c, 0x87, 0x9e, 0x22,
0x9d, 0x27, 0xc6, 0x01, 0x06, 0x67, 0xcf, 0xa6, 0xdb, 0x5a, 0xd3, 0x35, 0x24, 0x4f, 0x0c, 0x5f,
0xb5, 0xb3, 0x18, 0x86, 0x9b, 0x17, 0xf8, 0x18, 0x7a, 0x85, 0xb6, 0xef, 0x85, 0xde, 0xa4, 0xcf,
0xbb, 0x4e, 0x1a, 0x1f, 0x41, 0x57, 0x1b, 0x61, 0x72, 0xed, 0xdf, 0x0b, 0xbd, 0x49, 0x87, 0x17,
0x27, 0x7c, 0x08, 0x1d, 0x52, 0x4a, 0x2a, 0xbf, 0x65, 0xdb, 0xdd, 0x01, 0x11, 0xda, 0x3a, 0xfe,
0x43, 0x7e, 0x3b, 0xf4, 0x26, 0x23, 0x6e, 0xbf, 0x59, 0x0f, 0x3a, 0x5f, 0xd2, 0xcc, 0xfc, 0x66,
0xef, 0xc0, 0xbf, 0x14, 0x51, 0x9e, 0xa7, 0x97, 0xd6, 0xe3, 0xa7, 0x6b, 0x8a, 0x7e, 0xad, 0x66,
0x3f, 0x86, 0xbe, 0x75, 0xbe, 0x58, 0x39, 0x18, 0xf1, 0x43, 0x57, 0xb8, 0x58, 0xb0, 0x0f, 0x70,
0xb4, 0x03, 0x58, 0xbc, 0xc1, 0x73, 0x18, 0x5d, 0x09, 0x35, 0x17, 0x57, 0x34, 0x53, 0xc2, 0xc4,
0xd2, 0xa2, 0x3d, 0x3e, 0x2c, 0x8a, 0xfc, 0xae, 0xc6, 0xbe, 0x43, 0x50, 0x62, 0x90, 0x69, 0x26,
0x22, 0xd3, 0x44, 0x1c, 0x43, 0x18, 0x64, 0x8a, 0x44, 0x92, 0xc8, 0x48, 0x18, 0xb2, 0xaf, 0xd0,
0xe2, 0x9b, 0x25, 0xf6, 0x14, 0x8e, 0x77, 0x92, 0x3b, 0x83, 0xec, 0x7c, 0xcb, 0xbd, 0x4c, 0xd3,
0xb8, 0x91, 0x34, 0x7b, 0x52, 0x71, 0x6d, 0x91, 0x05, 0xef, 0xfb, 0xad, 0xdb, 0x84, 0xc4, 0x32,
0xcf, 0x1a, 0x11, 0x6f, 0x3b, 0x5e, 0x41, 0x1d, 0xf3, 0xd9, 0xdf, 0x36, 0x0c, 0xdd, 0xcd, 0x37,
0x1b, 0x23, 0xfc, 0x01, 0x83, 0x8d, 0xf8, 0xe1, 0x8b, 0x6a, 0xca, 0xaa, 0x71, 0x0e, 0x5e, 0xd6,
0x74, 0x15, 0x63, 0x1c, 0xe0, 0x12, 0x1e, 0x54, 0xd6, 0x8b, 0xaf, 0xaa, 0xe8, 0x7d, 0xe1, 0x09,
0x4e, 0x1a, 0xf5, 0xae, 0xf5, 0x0c, 0x8c, 0x77, 0xec, 0x0b, 0x4f, 0x6b, 0x58, 0x4a, 0x99, 0x09,
0x5e, 0x37, 0xec, 0x5e, 0xab, 0xde, 0x00, 0x56, 0x97, 0x89, 0x27, 0xb5, 0x34, 0xff, 0xc3, 0x12,
0x9c, 0x36, 0x6b, 0xde, 0x3b, 0xa8, 0x5b, 0x73, 0xed, 0xa0, 0xa5, 0x20, 0xd5, 0x0e, 0x5a, 0xce,
0x0e, 0x3b, 0x98, 0x77, 0xed, 0x5f, 0xef, 0xed, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x13, 0x2b,
0x80, 0x54, 0x0c, 0x05, 0x00, 0x00,
}

View file

@ -24,7 +24,7 @@ type MasterServer struct {
preallocate int64
pulseSeconds int
defaultReplicaPlacement string
garbageThreshold string
garbageThreshold float64
guard *security.Guard
Topo *topology.Topology
@ -43,7 +43,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
preallocate bool,
pulseSeconds int,
defaultReplicaPlacement string,
garbageThreshold string,
garbageThreshold float64,
whiteList []string,
secureKey string,
) *MasterServer {

View file

@ -37,9 +37,15 @@ func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request)
}
func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
gcThreshold := r.FormValue("garbageThreshold")
if gcThreshold == "" {
gcThreshold = ms.garbageThreshold
gcString := r.FormValue("garbageThreshold")
gcThreshold := ms.garbageThreshold
if gcString != "" {
var err error
gcThreshold, err = strconv.ParseFloat(gcString, 32)
if err != nil {
glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
return
}
}
glog.Infoln("garbageThreshold =", gcThreshold)
ms.Topo.Vacuum(gcThreshold, ms.preallocate)

View file

@ -0,0 +1,73 @@
package weed_server
import (
"context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
)
func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
resp := &volume_server_pb.VacuumVolumeCheckResponse{}
garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumdId))
resp.GarbageRatio = garbageRatio
if err != nil {
glog.V(3).Infof("check volume %d: %f", req.VolumdId, err)
}
return resp, err
}
func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_server_pb.VacuumVolumeCompactRequest) (*volume_server_pb.VacuumVolumeCompactResponse, error) {
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
err := vs.store.CompactVolume(storage.VolumeId(req.VolumdId), req.Preallocate)
if err != nil {
glog.Errorf("compact volume %d: %f", req.VolumdId, err)
} else {
glog.V(1).Infof("compact volume %d", req.VolumdId)
}
return resp, err
}
func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumdId))
if err != nil {
glog.Errorf("commit volume %d: %f", req.VolumdId, err)
} else {
glog.V(1).Infof("commit volume %d", req.VolumdId)
}
return resp, err
}
func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) {
resp := &volume_server_pb.VacuumVolumeCleanupResponse{}
err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumdId))
if err != nil {
glog.Errorf("cleanup volume %d: %f", req.VolumdId, err)
} else {
glog.V(1).Infof("cleanup volume %d", req.VolumdId)
}
return resp, err
}

View file

@ -48,10 +48,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler))
adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler))
adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
adminMux.HandleFunc("/admin/vacuum/cleanup", vs.guard.WhiteList(vs.vacuumVolumeCleanupHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))

View file

@ -1,53 +0,0 @@
package weed_server
import (
"net/http"
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
)
func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
if err == nil {
writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"error": "", "result": ret})
} else {
writeJsonQuiet(w, r, http.StatusInternalServerError, map[string]interface{}{"error": err.Error(), "result": false})
}
glog.V(2).Infoln("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
}
func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
var preallocate int64
var err error
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
if err != nil {
glog.V(0).Infof("Failed to parse int64 preallocate = %s: %v", r.FormValue("preallocate"), err)
}
}
err = vs.store.CompactVolume(r.FormValue("volume"), preallocate)
if err == nil {
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
} else {
writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("compacted volume =", r.FormValue("volume"), ", error =", err)
}
func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.CommitCompactVolume(r.FormValue("volume"))
if err == nil {
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
} else {
writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("commit compact volume =", r.FormValue("volume"), ", error =", err)
}
func (vs *VolumeServer) vacuumVolumeCleanupHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.CommitCleanupVolume(r.FormValue("volume"))
if err == nil {
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
} else {
writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("cleanup compact volume =", r.FormValue("volume"), ", error =", err)
}

View file

@ -2,51 +2,29 @@ package storage
import (
"fmt"
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
)
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString), false
func (s *Store) CheckCompactVolume(volumeId VolumeId) (float64, error) {
if v := s.findVolume(volumeId); v != nil {
glog.V(3).Infof("volumd %d garbage level: %f", volumeId, v.garbageLevel())
return v.garbageLevel(), nil
}
garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
if e != nil {
return fmt.Errorf("garbageThreshold %s is not a valid float number", garbageThresholdString), false
}
if v := s.findVolume(vid); v != nil {
glog.V(3).Infoln(vid, "garbage level is", v.garbageLevel())
return nil, garbageThreshold < v.garbageLevel()
}
return fmt.Errorf("volume id %d is not found during check compact", vid), false
}
func (s *Store) CompactVolume(volumeIdString string, preallocate int64) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
}
func (s *Store) CompactVolume(vid VolumeId, preallocate int64) error {
if v := s.findVolume(vid); v != nil {
return v.Compact(preallocate)
}
return fmt.Errorf("volume id %d is not found during compact", vid)
}
func (s *Store) CommitCompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
}
func (s *Store) CommitCompactVolume(vid VolumeId) error {
if v := s.findVolume(vid); v != nil {
return v.commitCompact()
}
return fmt.Errorf("volume id %d is not found during commit compact", vid)
}
func (s *Store) CommitCleanupVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
}
func (s *Store) CommitCleanupVolume(vid VolumeId) error {
if v := s.findVolume(vid); v != nil {
return v.cleanupCompact()
}

View file

@ -8,7 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
)
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string, preallocate int64) {
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallocate int64) {
go func() {
for {
if t.IsLeader() {
@ -18,7 +18,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string, prealloc
time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
}
}()
go func(garbageThreshold string) {
go func(garbageThreshold float64) {
c := time.Tick(15 * time.Minute)
for _ = range c {
if t.IsLeader() {

View file

@ -1,28 +1,33 @@
package topology
import (
"encoding/json"
"errors"
"net/url"
"context"
"time"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
go func(index int, url string, vid storage.VolumeId) {
//glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url())
if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
//glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e)
err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumdId: uint32(vid),
})
if err != nil {
ch <- false
} else {
//glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
ch <- ret
return err
}
isNeeded := resp.GarbageRatio > garbageThreshold
ch <- isNeeded
return nil
})
if err != nil {
glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err)
}
}(index, dn.Url(), vid)
}
@ -44,11 +49,17 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
for index, dn := range locationlist.list {
go func(index int, url string, vid storage.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
if e := vacuumVolume_Compact(url, vid, preallocate); e != nil {
glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e)
err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumdId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err)
ch <- false
} else {
glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
glog.V(0).Infof("Complete vacuuming %d on %s", vid, url)
ch <- true
}
}(index, dn.Url(), vid)
@ -69,11 +80,17 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
isCommitSuccess := true
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumdId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err)
isCommitSuccess = false
} else {
glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
glog.V(0).Infof("Complete Commiting vacuum %d on %s", vid, dn.Url())
}
if isCommitSuccess {
vl.SetVolumeAvailable(dn, vid)
@ -84,16 +101,22 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
if e := vacuumVolume_Cleanup(dn.Url(), vid); e != nil {
glog.V(0).Infoln("Error when cleaning up", vid, "on", dn.Url(), e)
err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumdId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, dn.Url(), err)
} else {
glog.V(0).Infoln("Complete cleaning up", vid, "on", dn.Url())
glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, dn.Url())
}
}
}
func (t *Topology) Vacuum(garbageThreshold string, preallocate int64) int {
glog.V(0).Infof("Start vacuum on demand with threshold:%s", garbageThreshold)
func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int {
glog.V(0).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
for _, vl := range c.storageType2VolumeLayout.Items() {
@ -126,71 +149,3 @@ type VacuumVolumeResult struct {
Result bool
Error string
}
func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("garbageThreshold", garbageThreshold)
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values)
if err != nil {
glog.V(0).Infoln("parameters:", values)
return err, false
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err, false
}
if ret.Error != "" {
return errors.New(ret.Error), false
}
return nil, ret.Result
}
func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId, preallocate int64) error {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("preallocate", fmt.Sprintf("%d", preallocate))
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
if err != nil {
return err
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
}
func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values)
values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values)
if err != nil {
return err
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
}
func vacuumVolume_Cleanup(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values)
values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/cleanup", values)
if err != nil {
return err
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
}