use read lock to avoid io hang during heartbeat

This commit is contained in:
zhangsong 2019-11-08 19:00:47 +08:00
parent 08c83b1a59
commit 1dd101f782

View file

@ -165,8 +165,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.Lock()
location.RLock()
for _, v := range location.volumes {
if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey()
@ -175,8 +176,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
} else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
location.deleteVolumeById(v.Id)
glog.V(0).Infoln("volume", v.Id, "is deleted.")
deleteVids = append(deleteVids, v.Id)
} else {
glog.V(0).Infoln("volume", v.Id, "is expired.")
}
@ -184,7 +184,17 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize
}
location.Unlock()
location.RUnlock()
if len(deleteVids) > 0 {
// delete expired volumes.
location.Lock()
for _, vid := range deleteVids {
location.deleteVolumeById(vid)
glog.V(0).Infoln("volume", vid, "is deleted.")
}
location.Unlock()
}
}
for col, size := range collectionVolumeSize {