diff --git a/weed/command/server.go b/weed/command/server.go index d16095075..4aeecbff0 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -98,6 +98,7 @@ func init() { serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = &False s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") diff --git a/weed/command/volume.go b/weed/command/volume.go index 6fb7447e7..5a0dc9e67 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -55,6 +55,7 @@ type VolumeServerOptions struct { fileSizeLimitMB *int minFreeSpacePercents []float32 pprof *bool + preStopSeconds *int // pulseSeconds *int } @@ -66,6 +67,7 @@ func init() { v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") + v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") @@ -206,7 +208,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.compactionMBPerSecond, *v.fileSizeLimitMB, ) - // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -227,6 +228,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v fmt.Println("volume server has be killed") var startTime time.Time + // Stop heartbeats + fmt.Println("stop send heartbeat and sleep %d sec", v.preStopSeconds) + volumeServer.SendHeartbeat = false + time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) + fmt.Println("end sleep 20 sec") // firstly, stop the public http service to prevent from receiving new user request if nil != publicHttpDown { startTime = time.Now() diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index c62a4a388..468429db8 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -168,10 +168,14 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi return "", err } case <-volumeTickChan: - glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) - if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return "", err + if vs.SendHeartbeat { + glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return "", err + } + } else { + glog.V(5).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port) } case <-ecShardTickChan: glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index fc6fa5cee..6612e9045 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -31,6 +31,7 @@ type VolumeServer struct { MetricsAddress string MetricsIntervalSec int fileSizeLimitBytes int64 + SendHeartbeat bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -66,6 +67,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, + SendHeartbeat: true, } vs.SeedMasterNodes = masterNodes vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)