mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
stop send heartbeat before stop volume server
This commit is contained in:
parent
1901f15cd2
commit
464d4c82ec
|
@ -98,6 +98,7 @@ func init() {
|
||||||
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
|
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
|
||||||
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
|
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
|
||||||
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
|
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
|
||||||
|
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server")
|
||||||
serverOptions.v.pprof = &False
|
serverOptions.v.pprof = &False
|
||||||
|
|
||||||
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
|
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
|
||||||
|
|
|
@ -55,6 +55,7 @@ type VolumeServerOptions struct {
|
||||||
fileSizeLimitMB *int
|
fileSizeLimitMB *int
|
||||||
minFreeSpacePercents []float32
|
minFreeSpacePercents []float32
|
||||||
pprof *bool
|
pprof *bool
|
||||||
|
preStopSeconds *int
|
||||||
// pulseSeconds *int
|
// pulseSeconds *int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,6 +67,7 @@ func init() {
|
||||||
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
|
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
|
||||||
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
|
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
|
||||||
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
|
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
|
||||||
|
v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server")
|
||||||
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
|
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
|
||||||
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
|
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
|
||||||
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
|
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
|
||||||
|
@ -206,7 +208,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
|
||||||
*v.compactionMBPerSecond,
|
*v.compactionMBPerSecond,
|
||||||
*v.fileSizeLimitMB,
|
*v.fileSizeLimitMB,
|
||||||
)
|
)
|
||||||
|
|
||||||
// starting grpc server
|
// starting grpc server
|
||||||
grpcS := v.startGrpcService(volumeServer)
|
grpcS := v.startGrpcService(volumeServer)
|
||||||
|
|
||||||
|
@ -227,6 +228,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
|
||||||
fmt.Println("volume server has be killed")
|
fmt.Println("volume server has be killed")
|
||||||
var startTime time.Time
|
var startTime time.Time
|
||||||
|
|
||||||
|
// Stop heartbeats
|
||||||
|
fmt.Println("stop send heartbeat and sleep %d sec", v.preStopSeconds)
|
||||||
|
volumeServer.SendHeartbeat = false
|
||||||
|
time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
|
||||||
|
fmt.Println("end sleep 20 sec")
|
||||||
// firstly, stop the public http service to prevent from receiving new user request
|
// firstly, stop the public http service to prevent from receiving new user request
|
||||||
if nil != publicHttpDown {
|
if nil != publicHttpDown {
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
|
|
|
@ -168,11 +168,15 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
case <-volumeTickChan:
|
case <-volumeTickChan:
|
||||||
|
if vs.SendHeartbeat {
|
||||||
glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
|
glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
|
||||||
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
|
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
|
||||||
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
|
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(5).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port)
|
||||||
|
}
|
||||||
case <-ecShardTickChan:
|
case <-ecShardTickChan:
|
||||||
glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
|
glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
|
||||||
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
|
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
|
||||||
|
|
|
@ -31,6 +31,7 @@ type VolumeServer struct {
|
||||||
MetricsAddress string
|
MetricsAddress string
|
||||||
MetricsIntervalSec int
|
MetricsIntervalSec int
|
||||||
fileSizeLimitBytes int64
|
fileSizeLimitBytes int64
|
||||||
|
SendHeartbeat bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
||||||
|
@ -66,6 +67,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
||||||
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
|
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
|
||||||
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
|
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
|
||||||
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
|
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
|
||||||
|
SendHeartbeat: true,
|
||||||
}
|
}
|
||||||
vs.SeedMasterNodes = masterNodes
|
vs.SeedMasterNodes = masterNodes
|
||||||
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)
|
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)
|
||||||
|
|
Loading…
Reference in a new issue