mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
skip deltaBeat if dn is zero (#3630)
* skip deltaBeat https://github.com/seaweedfs/seaweedfs/issues/3629 * fix GrpcPort * skip url :0 * skip empty DataCenter or Rack * skip empty heartbeat Ip * dell msg add DataCenter * comment todo * fix
This commit is contained in:
parent
b834027c5a
commit
721c6197f9
5
weed/pb/master_pb/master_helper.go
Normal file
5
weed/pb/master_pb/master_helper.go
Normal file
|
@ -0,0 +1,5 @@
|
|||
package master_pb
|
||||
|
||||
func (v *VolumeLocation) IsEmptyUrl() bool {
|
||||
return v.Url == "" || v.Url == ":0"
|
||||
}
|
|
@ -70,6 +70,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
|||
}
|
||||
|
||||
message := &master_pb.VolumeLocation{
|
||||
DataCenter: dn.GetDataCenterId(),
|
||||
Url: dn.Url(),
|
||||
PublicUrl: dn.PublicUrl,
|
||||
}
|
||||
|
@ -126,6 +127,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
|||
|
||||
ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
|
||||
if dn == nil {
|
||||
// Skip delta heartbeat for volume server versions better than 3.28 https://github.com/seaweedfs/seaweedfs/pull/3630
|
||||
if heartbeat.Ip == "" {
|
||||
continue
|
||||
} // ToDo must be removed after update major version
|
||||
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
|
||||
dc := ms.Topo.GetOrCreateDataCenter(dcName)
|
||||
rack := dc.GetOrCreateRack(rackName)
|
||||
|
@ -181,8 +186,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
|||
}
|
||||
|
||||
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
|
||||
if heartbeat.Ip != "" {
|
||||
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
|
||||
ms.Topo.DataNodeRegistration(dcName, rackName, dn)
|
||||
}
|
||||
|
||||
// process heartbeat.Volumes
|
||||
stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
|
||||
|
|
|
@ -160,11 +160,18 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
|
|||
|
||||
volumeTickChan := time.Tick(sleepInterval)
|
||||
ecShardTickChan := time.Tick(17 * sleepInterval)
|
||||
|
||||
dataCenter := vs.store.GetDataCenter()
|
||||
rack := vs.store.GetRack()
|
||||
ip := vs.store.Ip
|
||||
port := uint32(vs.store.Port)
|
||||
for {
|
||||
select {
|
||||
case volumeMessage := <-vs.store.NewVolumesChan:
|
||||
deltaBeat := &master_pb.Heartbeat{
|
||||
Ip: ip,
|
||||
Port: port,
|
||||
DataCenter: dataCenter,
|
||||
Rack: rack,
|
||||
NewVolumes: []*master_pb.VolumeShortInformationMessage{
|
||||
&volumeMessage,
|
||||
},
|
||||
|
@ -176,6 +183,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
|
|||
}
|
||||
case ecShardMessage := <-vs.store.NewEcShardsChan:
|
||||
deltaBeat := &master_pb.Heartbeat{
|
||||
Ip: ip,
|
||||
Port: port,
|
||||
DataCenter: dataCenter,
|
||||
Rack: rack,
|
||||
NewEcShards: []*master_pb.VolumeEcShardInformationMessage{
|
||||
&ecShardMessage,
|
||||
},
|
||||
|
@ -188,6 +199,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
|
|||
}
|
||||
case volumeMessage := <-vs.store.DeletedVolumesChan:
|
||||
deltaBeat := &master_pb.Heartbeat{
|
||||
Ip: ip,
|
||||
Port: port,
|
||||
DataCenter: dataCenter,
|
||||
Rack: rack,
|
||||
DeletedVolumes: []*master_pb.VolumeShortInformationMessage{
|
||||
&volumeMessage,
|
||||
},
|
||||
|
@ -199,6 +214,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
|
|||
}
|
||||
case ecShardMessage := <-vs.store.DeletedEcShardsChan:
|
||||
deltaBeat := &master_pb.Heartbeat{
|
||||
Ip: ip,
|
||||
Port: port,
|
||||
DataCenter: dataCenter,
|
||||
Rack: rack,
|
||||
DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{
|
||||
&ecShardMessage,
|
||||
},
|
||||
|
@ -227,12 +246,12 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
|
|||
case <-vs.stopChan:
|
||||
var volumeMessages []*master_pb.VolumeInformationMessage
|
||||
emptyBeat := &master_pb.Heartbeat{
|
||||
Ip: vs.store.Ip,
|
||||
Port: uint32(vs.store.Port),
|
||||
Ip: ip,
|
||||
Port: port,
|
||||
PublicUrl: vs.store.PublicUrl,
|
||||
MaxFileKey: uint64(0),
|
||||
DataCenter: vs.store.GetDataCenter(),
|
||||
Rack: vs.store.GetRack(),
|
||||
DataCenter: dataCenter,
|
||||
Rack: rack,
|
||||
Volumes: volumeMessages,
|
||||
HasNoVolumes: len(volumeMessages) == 0,
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ type topology struct {
|
|||
type Configuration struct {
|
||||
XMLName xml.Name `xml:"Configuration"`
|
||||
Topo topology `xml:"Topology"`
|
||||
ip2location map[string]loc // this is not used any more. leave it here for later.
|
||||
}
|
||||
|
||||
func (c *Configuration) String() string {
|
||||
|
@ -33,12 +32,6 @@ func (c *Configuration) String() string {
|
|||
}
|
||||
|
||||
func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
|
||||
if c != nil && c.ip2location != nil {
|
||||
if loc, ok := c.ip2location[ip]; ok {
|
||||
return loc.dcName, loc.rackName
|
||||
}
|
||||
}
|
||||
|
||||
if dcName == "" {
|
||||
dcName = "DefaultDataCenter"
|
||||
}
|
||||
|
|
|
@ -262,6 +262,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
|
|||
}
|
||||
|
||||
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
|
||||
if resp.VolumeLocation.IsEmptyUrl() {
|
||||
glog.V(0).Infof("updateVidMap ignore short heartbeat: %+v", resp)
|
||||
return
|
||||
}
|
||||
// process new volume location
|
||||
loc := Location{
|
||||
Url: resp.VolumeLocation.Url,
|
||||
|
|
Loading…
Reference in a new issue