support enable/disable vacuum (#4087)

* stop vacuum

* suspend/resume vacuum

* remove unused code

* rename

* rename param
This commit is contained in:
Guo Lei 2022-12-28 17:36:44 +08:00 committed by GitHub
parent 84492976c9
commit d8cfa1552b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 656 additions and 255 deletions

View file

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.26.0 // protoc-gen-go v1.28.1
// protoc v3.17.3 // protoc v3.21.4
// source: filer.proto // source: filer.proto
package filer_pb package filer_pb
@ -4480,7 +4480,7 @@ var file_filer_proto_goTypes = []interface{}{
var file_filer_proto_depIdxs = []int32{ var file_filer_proto_depIdxs = []int32{
5, // 0: filer_pb.LookupDirectoryEntryResponse.entry:type_name -> filer_pb.Entry 5, // 0: filer_pb.LookupDirectoryEntryResponse.entry:type_name -> filer_pb.Entry
5, // 1: filer_pb.ListEntriesResponse.entry:type_name -> filer_pb.Entry 5, // 1: filer_pb.ListEntriesResponse.entry:type_name -> filer_pb.Entry
8, // 2: filer_pb.Entry.GetChunks():type_name -> filer_pb.FileChunk 8, // 2: filer_pb.Entry.chunks:type_name -> filer_pb.FileChunk
11, // 3: filer_pb.Entry.attributes:type_name -> filer_pb.FuseAttributes 11, // 3: filer_pb.Entry.attributes:type_name -> filer_pb.FuseAttributes
55, // 4: filer_pb.Entry.extended:type_name -> filer_pb.Entry.ExtendedEntry 55, // 4: filer_pb.Entry.extended:type_name -> filer_pb.Entry.ExtendedEntry
4, // 5: filer_pb.Entry.remote_entry:type_name -> filer_pb.RemoteEntry 4, // 5: filer_pb.Entry.remote_entry:type_name -> filer_pb.RemoteEntry

View file

@ -1,8 +1,4 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.4
// source: filer.proto
package filer_pb package filer_pb

View file

@ -1,8 +1,4 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.4
// source: iam.proto
package iam_pb package iam_pb

View file

@ -27,6 +27,10 @@ service Seaweed {
} }
rpc VacuumVolume (VacuumVolumeRequest) returns (VacuumVolumeResponse) { rpc VacuumVolume (VacuumVolumeRequest) returns (VacuumVolumeResponse) {
} }
rpc DisableVacuum (DisableVacuumRequest) returns (DisableVacuumResponse) {
}
rpc EnableVacuum (EnableVacuumRequest) returns (EnableVacuumResponse) {
}
rpc VolumeMarkReadonly (VolumeMarkReadonlyRequest) returns (VolumeMarkReadonlyResponse) { rpc VolumeMarkReadonly (VolumeMarkReadonlyRequest) returns (VolumeMarkReadonlyResponse) {
} }
rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) { rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) {
@ -305,6 +309,16 @@ message VacuumVolumeRequest {
message VacuumVolumeResponse { message VacuumVolumeResponse {
} }
message DisableVacuumRequest {
}
message DisableVacuumResponse {
}
message EnableVacuumRequest {
}
message EnableVacuumResponse {
}
message VolumeMarkReadonlyRequest { message VolumeMarkReadonlyRequest {
string ip = 1; string ip = 1;
uint32 port = 2; uint32 port = 2;

File diff suppressed because it is too large Load diff

View file

@ -1,8 +1,4 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.4
// source: master.proto
package master_pb package master_pb
@ -32,6 +28,8 @@ type SeaweedClient interface {
VolumeList(ctx context.Context, in *VolumeListRequest, opts ...grpc.CallOption) (*VolumeListResponse, error) VolumeList(ctx context.Context, in *VolumeListRequest, opts ...grpc.CallOption) (*VolumeListResponse, error)
LookupEcVolume(ctx context.Context, in *LookupEcVolumeRequest, opts ...grpc.CallOption) (*LookupEcVolumeResponse, error) LookupEcVolume(ctx context.Context, in *LookupEcVolumeRequest, opts ...grpc.CallOption) (*LookupEcVolumeResponse, error)
VacuumVolume(ctx context.Context, in *VacuumVolumeRequest, opts ...grpc.CallOption) (*VacuumVolumeResponse, error) VacuumVolume(ctx context.Context, in *VacuumVolumeRequest, opts ...grpc.CallOption) (*VacuumVolumeResponse, error)
DisableVacuum(ctx context.Context, in *DisableVacuumRequest, opts ...grpc.CallOption) (*DisableVacuumResponse, error)
EnableVacuum(ctx context.Context, in *EnableVacuumRequest, opts ...grpc.CallOption) (*EnableVacuumResponse, error)
VolumeMarkReadonly(ctx context.Context, in *VolumeMarkReadonlyRequest, opts ...grpc.CallOption) (*VolumeMarkReadonlyResponse, error) VolumeMarkReadonly(ctx context.Context, in *VolumeMarkReadonlyRequest, opts ...grpc.CallOption) (*VolumeMarkReadonlyResponse, error)
GetMasterConfiguration(ctx context.Context, in *GetMasterConfigurationRequest, opts ...grpc.CallOption) (*GetMasterConfigurationResponse, error) GetMasterConfiguration(ctx context.Context, in *GetMasterConfigurationRequest, opts ...grpc.CallOption) (*GetMasterConfigurationResponse, error)
ListClusterNodes(ctx context.Context, in *ListClusterNodesRequest, opts ...grpc.CallOption) (*ListClusterNodesResponse, error) ListClusterNodes(ctx context.Context, in *ListClusterNodesRequest, opts ...grpc.CallOption) (*ListClusterNodesResponse, error)
@ -185,6 +183,24 @@ func (c *seaweedClient) VacuumVolume(ctx context.Context, in *VacuumVolumeReques
return out, nil return out, nil
} }
func (c *seaweedClient) DisableVacuum(ctx context.Context, in *DisableVacuumRequest, opts ...grpc.CallOption) (*DisableVacuumResponse, error) {
out := new(DisableVacuumResponse)
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/DisableVacuum", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedClient) EnableVacuum(ctx context.Context, in *EnableVacuumRequest, opts ...grpc.CallOption) (*EnableVacuumResponse, error) {
out := new(EnableVacuumResponse)
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/EnableVacuum", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedClient) VolumeMarkReadonly(ctx context.Context, in *VolumeMarkReadonlyRequest, opts ...grpc.CallOption) (*VolumeMarkReadonlyResponse, error) { func (c *seaweedClient) VolumeMarkReadonly(ctx context.Context, in *VolumeMarkReadonlyRequest, opts ...grpc.CallOption) (*VolumeMarkReadonlyResponse, error) {
out := new(VolumeMarkReadonlyResponse) out := new(VolumeMarkReadonlyResponse)
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/VolumeMarkReadonly", in, out, opts...) err := c.cc.Invoke(ctx, "/master_pb.Seaweed/VolumeMarkReadonly", in, out, opts...)
@ -280,6 +296,8 @@ type SeaweedServer interface {
VolumeList(context.Context, *VolumeListRequest) (*VolumeListResponse, error) VolumeList(context.Context, *VolumeListRequest) (*VolumeListResponse, error)
LookupEcVolume(context.Context, *LookupEcVolumeRequest) (*LookupEcVolumeResponse, error) LookupEcVolume(context.Context, *LookupEcVolumeRequest) (*LookupEcVolumeResponse, error)
VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error) VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error)
DisableVacuum(context.Context, *DisableVacuumRequest) (*DisableVacuumResponse, error)
EnableVacuum(context.Context, *EnableVacuumRequest) (*EnableVacuumResponse, error)
VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error)
GetMasterConfiguration(context.Context, *GetMasterConfigurationRequest) (*GetMasterConfigurationResponse, error) GetMasterConfiguration(context.Context, *GetMasterConfigurationRequest) (*GetMasterConfigurationResponse, error)
ListClusterNodes(context.Context, *ListClusterNodesRequest) (*ListClusterNodesResponse, error) ListClusterNodes(context.Context, *ListClusterNodesRequest) (*ListClusterNodesResponse, error)
@ -326,6 +344,12 @@ func (UnimplementedSeaweedServer) LookupEcVolume(context.Context, *LookupEcVolum
func (UnimplementedSeaweedServer) VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error) { func (UnimplementedSeaweedServer) VacuumVolume(context.Context, *VacuumVolumeRequest) (*VacuumVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolume not implemented") return nil, status.Errorf(codes.Unimplemented, "method VacuumVolume not implemented")
} }
func (UnimplementedSeaweedServer) DisableVacuum(context.Context, *DisableVacuumRequest) (*DisableVacuumResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DisableVacuum not implemented")
}
func (UnimplementedSeaweedServer) EnableVacuum(context.Context, *EnableVacuumRequest) (*EnableVacuumResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method EnableVacuum not implemented")
}
func (UnimplementedSeaweedServer) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) { func (UnimplementedSeaweedServer) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkReadonly not implemented") return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
} }
@ -562,6 +586,42 @@ func _Seaweed_VacuumVolume_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Seaweed_DisableVacuum_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DisableVacuumRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedServer).DisableVacuum(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/master_pb.Seaweed/DisableVacuum",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedServer).DisableVacuum(ctx, req.(*DisableVacuumRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Seaweed_EnableVacuum_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EnableVacuumRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedServer).EnableVacuum(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/master_pb.Seaweed/EnableVacuum",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedServer).EnableVacuum(ctx, req.(*EnableVacuumRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Seaweed_VolumeMarkReadonly_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Seaweed_VolumeMarkReadonly_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeMarkReadonlyRequest) in := new(VolumeMarkReadonlyRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -763,6 +823,14 @@ var Seaweed_ServiceDesc = grpc.ServiceDesc{
MethodName: "VacuumVolume", MethodName: "VacuumVolume",
Handler: _Seaweed_VacuumVolume_Handler, Handler: _Seaweed_VacuumVolume_Handler,
}, },
{
MethodName: "DisableVacuum",
Handler: _Seaweed_DisableVacuum_Handler,
},
{
MethodName: "EnableVacuum",
Handler: _Seaweed_EnableVacuum_Handler,
},
{ {
MethodName: "VolumeMarkReadonly", MethodName: "VolumeMarkReadonly",
Handler: _Seaweed_VolumeMarkReadonly_Handler, Handler: _Seaweed_VolumeMarkReadonly_Handler,

View file

@ -1,8 +1,4 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.4
// source: mount.proto
package mount_pb package mount_pb

View file

@ -20,7 +20,7 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
) )
// //////////////////////////////////////////////// //////////////////////////////////////////////////
type SegmentInfo struct { type SegmentInfo struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -617,7 +617,7 @@ func (x *CheckBrokerLoadResponse) GetBytesCount() int64 {
return 0 return 0
} }
// //////////////////////////////////////////////// //////////////////////////////////////////////////
type PublishRequest struct { type PublishRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache

View file

@ -1,8 +1,4 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.4
// source: mq.proto
package mq_pb package mq_pb

View file

@ -20,9 +20,9 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
) )
// /////////////////////// /////////////////////////
// Remote Storage related // Remote Storage related
// /////////////////////// /////////////////////////
type RemoteConf struct { type RemoteConf struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache

View file

@ -1,8 +1,4 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.4
// source: s3.proto
package s3_pb package s3_pb

View file

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.26.0 // protoc-gen-go v1.28.1
// protoc v3.17.3 // protoc v3.21.4
// source: volume_server.proto // source: volume_server.proto
package volume_server_pb package volume_server_pb

View file

@ -3,12 +3,13 @@ package weed_server
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/raft"
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/seaweedfs/raft"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
@ -283,6 +284,20 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
return resp, nil return resp, nil
} }
func (ms *MasterServer) DisableVacuum(ctx context.Context, req *master_pb.DisableVacuumRequest) (*master_pb.DisableVacuumResponse, error) {
ms.Topo.DisableVacuum()
resp := &master_pb.DisableVacuumResponse{}
return resp, nil
}
func (ms *MasterServer) EnableVacuum(ctx context.Context, req *master_pb.EnableVacuumRequest) (*master_pb.EnableVacuumResponse, error) {
ms.Topo.EnableVacuum()
resp := &master_pb.EnableVacuumResponse{}
return resp, nil
}
func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.VolumeMarkReadonlyRequest) (*master_pb.VolumeMarkReadonlyResponse, error) { func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.VolumeMarkReadonlyRequest) (*master_pb.VolumeMarkReadonlyResponse, error) {
if !ms.Topo.IsLeader() { if !ms.Topo.IsLeader() {

View file

@ -0,0 +1,41 @@
package shell
import (
"context"
"io"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
func init() {
Commands = append(Commands, &commandDisableVacuum{})
}
type commandDisableVacuum struct {
}
func (c *commandDisableVacuum) Name() string {
return "volume.vacuum.disable"
}
func (c *commandDisableVacuum) Help() string {
return `disable vacuuming
volume.vacuum.disable
`
}
func (c *commandDisableVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.DisableVacuum(context.Background(), &master_pb.DisableVacuumRequest{})
return err
})
return
}

View file

@ -0,0 +1,41 @@
package shell
import (
"context"
"io"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
func init() {
Commands = append(Commands, &commandEnableVacuum{})
}
type commandEnableVacuum struct {
}
func (c *commandEnableVacuum) Name() string {
return "volume.vacuum.enable"
}
func (c *commandEnableVacuum) Help() string {
return `enable vacuuming
volume.vacuum.enable
`
}
func (c *commandEnableVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.EnableVacuum(context.Background(), &master_pb.EnableVacuumRequest{})
return err
})
return
}

View file

@ -35,6 +35,7 @@ type Topology struct {
volumeSizeLimit uint64 volumeSizeLimit uint64
replicationAsMin bool replicationAsMin bool
isDisableVacuum bool
Sequence sequence.Sequencer Sequence sequence.Sequencer
@ -338,3 +339,13 @@ func (t *Topology) DataNodeRegistration(dcName, rackName string, dn *DataNode) {
rack.LinkChildNode(dn) rack.LinkChildNode(dn)
glog.Infof("[%s] reLink To topo ", dn.Id()) glog.Infof("[%s] reLink To topo ", dn.Id())
} }
func (t *Topology) DisableVacuum() {
glog.V(0).Infof("DisableVacuum")
t.isDisableVacuum = true
}
func (t *Topology) EnableVacuum() {
glog.V(0).Infof("EnableVacuum")
t.isDisableVacuum = false
}

View file

@ -1,12 +1,13 @@
package topology package topology
import ( import (
"math/rand"
"time"
"github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc" "google.golang.org/grpc"
"math/rand"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage"
@ -25,7 +26,9 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
go func(garbageThreshold float64) { go func(garbageThreshold float64) {
for { for {
if t.IsLeader() { if t.IsLeader() {
t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate) if !t.isDisableVacuum {
t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate)
}
} else { } else {
stats.MasterReplicaPlacementMismatch.Reset() stats.MasterReplicaPlacementMismatch.Reset()
} }