master: better locking of in memory volume data

related to https://github.com/chrislusf/seaweedfs/issues/1436#issuecomment-695880135
This commit is contained in:
Chris Lu 2020-09-20 23:07:55 -07:00
parent 9a3b564508
commit 289e62a305

View file

@ -44,6 +44,10 @@ func (dn *DataNode) String() string {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
dn.Lock() dn.Lock()
defer dn.Unlock() defer dn.Unlock()
return dn.doAddOrUpdateVolume(v)
}
func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
if oldV, ok := dn.volumes[v.Id]; !ok { if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1) dn.UpAdjustVolumeCountDelta(1)
@ -71,11 +75,15 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO
} }
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes { for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v actualVolumeMap[v.Id] = v
} }
dn.Lock() dn.Lock()
defer dn.Unlock()
for vid, v := range dn.volumes { for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok { if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid) glog.V(0).Infoln("Deleting volume id:", vid)
@ -90,9 +98,8 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
} }
} }
} }
dn.Unlock()
for _, v := range actualVolumes { for _, v := range actualVolumes {
isNew, isChangedRO := dn.AddOrUpdateVolume(v) isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
if isNew { if isNew {
newVolumes = append(newVolumes, v) newVolumes = append(newVolumes, v)
} }
@ -103,8 +110,10 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
return return
} }
func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) { func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
dn.Lock() dn.Lock()
defer dn.Unlock()
for _, v := range deletedVolumes { for _, v := range deletedVolumes {
delete(dn.volumes, v.Id) delete(dn.volumes, v.Id)
dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustVolumeCountDelta(-1)
@ -115,9 +124,8 @@ func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.Vol
dn.UpAdjustActiveVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1)
} }
} }
dn.Unlock() for _, v := range newVolumes {
for _, v := range newlVolumes { dn.doAddOrUpdateVolume(v)
dn.AddOrUpdateVolume(v)
} }
return return
} }