shell: add volumeServer.leave command

This commit is contained in:
Chris Lu 2020-09-13 21:25:51 -07:00
parent 1af95c5b76
commit 5d6753fb98
9 changed files with 741 additions and 490 deletions

View file

@ -24,4 +24,4 @@ debug_server:
debug_volume: debug_volume:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- volume -dir=/Volumes/mobile_disk/100 -port 8564 -max=30 dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- volume -dir=/Volumes/mobile_disk/100 -port 8564 -max=30 -preStopSeconds=2

View file

@ -223,22 +223,27 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// starting the cluster http server // starting the cluster http server
clusterHttpServer := v.startClusterHttpService(volumeMux) clusterHttpServer := v.startClusterHttpService(volumeMux)
stopChan := make(chan bool)
grace.OnInterrupt(func() { grace.OnInterrupt(func() {
fmt.Println("volume server has be killed") fmt.Println("volume server has be killed")
// Stop heartbeatsZ // Stop heartbeats
glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds) if !volumeServer.StopHeartbeat() {
volumeServer.SendHeartbeat = false glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds)
time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
}
v.shutdown(publicHttpDown, clusterHttpServer, grpcS, volumeServer) shutdown(publicHttpDown, clusterHttpServer, grpcS, volumeServer)
stopChan <- true
}) })
select {} select {
case <-stopChan:
}
} }
func (v VolumeServerOptions) shutdown(publicHttpDown httpdown.Server, clusterHttpServer httpdown.Server, grpcS *grpc.Server, volumeServer *weed_server.VolumeServer) { func shutdown(publicHttpDown httpdown.Server, clusterHttpServer httpdown.Server, grpcS *grpc.Server, volumeServer *weed_server.VolumeServer) {
// firstly, stop the public http service to prevent from receiving new user request // firstly, stop the public http service to prevent from receiving new user request
if nil != publicHttpDown { if nil != publicHttpDown {

View file

@ -85,6 +85,8 @@ service VolumeServer {
rpc VolumeServerStatus (VolumeServerStatusRequest) returns (VolumeServerStatusResponse) { rpc VolumeServerStatus (VolumeServerStatusRequest) returns (VolumeServerStatusResponse) {
} }
rpc VolumeServerLeave (VolumeServerLeaveRequest) returns (VolumeServerLeaveResponse) {
}
// <experimental> query // <experimental> query
rpc Query (QueryRequest) returns (stream QueriedStripe) { rpc Query (QueryRequest) returns (stream QueriedStripe) {
@ -425,6 +427,11 @@ message VolumeServerStatusResponse {
MemStatus memory_status = 2; MemStatus memory_status = 2;
} }
message VolumeServerLeaveRequest {
}
message VolumeServerLeaveResponse {
}
// select on volume servers // select on volume servers
message QueryRequest { message QueryRequest {
repeated string selections = 1; repeated string selections = 1;

File diff suppressed because it is too large Load diff

View file

@ -196,6 +196,16 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv
} }
func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
resp := &volume_server_pb.VolumeServerLeaveResponse{}
vs.StopHeartbeat()
return resp, nil
}
func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) { func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
resp := &volume_server_pb.VolumeNeedleStatusResponse{} resp := &volume_server_pb.VolumeNeedleStatusResponse{}

View file

@ -31,7 +31,7 @@ func (vs *VolumeServer) heartbeat() {
var err error var err error
var newLeader string var newLeader string
for { for vs.isHeartbeating {
for _, master := range vs.SeedMasterNodes { for _, master := range vs.SeedMasterNodes {
if newLeader != "" { if newLeader != "" {
// the new leader may actually is the same master // the new leader may actually is the same master
@ -52,10 +52,22 @@ func (vs *VolumeServer) heartbeat() {
newLeader = "" newLeader = ""
vs.store.MasterAddress = "" vs.store.MasterAddress = ""
} }
if !vs.isHeartbeating {
break
}
} }
} }
} }
func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
if !vs.isHeartbeating {
return true
}
vs.isHeartbeating = false
vs.stopChan <- true
return false
}
func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -171,14 +183,10 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
return "", err return "", err
} }
case <-volumeTickChan: case <-volumeTickChan:
if vs.SendHeartbeat { glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err
return "", err
}
} else {
glog.V(4).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port)
} }
case <-ecShardTickChan: case <-ecShardTickChan:
glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
@ -188,6 +196,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
} }
case err = <-doneChan: case err = <-doneChan:
return return
case <-vs.stopChan:
return
} }
} }
} }

View file

@ -31,7 +31,8 @@ type VolumeServer struct {
MetricsAddress string MetricsAddress string
MetricsIntervalSec int MetricsIntervalSec int
fileSizeLimitBytes int64 fileSizeLimitBytes int64
SendHeartbeat bool isHeartbeating bool
stopChan chan bool
} }
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@ -67,7 +68,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
SendHeartbeat: true, isHeartbeating: true,
stopChan: make(chan bool),
} }
vs.SeedMasterNodes = masterNodes vs.SeedMasterNodes = masterNodes
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind) vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)

View file

@ -0,0 +1,67 @@
package shell
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
"io"
)
func init() {
Commands = append(Commands, &commandVolumeServerLeave{})
}
type commandVolumeServerLeave struct {
}
func (c *commandVolumeServerLeave) Name() string {
return "volumeServer.leave"
}
func (c *commandVolumeServerLeave) Help() string {
return `stop a volume server from sending heartbeats to the master
volume.unmount -node <volume server host:port> -force
This command enables gracefully shutting down the volume server.
The volume server will stop sending heartbeats to the master.
After draining the traffic for a few seconds, you can safely shut down the volume server.
This operation is not revocable unless the volume server is restarted.
`
}
func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(); err != nil {
return
}
vsLeaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeServer := vsLeaveCommand.String("node", "", "<host>:<port> of the volume server")
if err = vsLeaveCommand.Parse(args); err != nil {
return nil
}
if *volumeServer == "" {
return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
}
return volumeServerLeave(commandEnv.option.GrpcDialOption, *volumeServer, writer)
}
func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer string, writer io.Writer) (err error) {
return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
if leaveErr != nil {
fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr)
} else {
fmt.Fprintf(writer, "stopped heartbeat in volume server %s. After a few seconds to drain traffic, it will be safe to stop the volume server.\n", volumeServer)
}
return leaveErr
})
}

View file

@ -66,7 +66,7 @@ func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool
args[i] = strings.Trim(string(cmds[1+i]), "\"'") args[i] = strings.Trim(string(cmds[1+i]), "\"'")
} }
cmd := strings.ToLower(cmds[0]) cmd := cmds[0]
if cmd == "help" || cmd == "?" { if cmd == "help" || cmd == "?" {
printHelp(cmds) printHelp(cmds)
} else if cmd == "exit" || cmd == "quit" { } else if cmd == "exit" || cmd == "quit" {