diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 6ab90e9e2..799cbca62 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -50,7 +50,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() - if _, ok := vl.vid2location[v.Id]; !ok || vl.vid2location[v.Id] == nil { + if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() } vl.vid2location[v.Id].Set(dn) @@ -72,17 +72,13 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { return } } - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { - vl.addToWritable(v.Id) - } - } else { - vl.rememberOversizedVolumne(v) - vl.removeFromWritable(v.Id) - } + + vl.rememberOversizedVolume(v) + vl.ensureCorrectWritables(v) + } -func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { if vl.isOversized(v) { vl.oversizedVolumes[v.Id] = true } @@ -92,9 +88,31 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() - vl.removeFromWritable(v.Id) - delete(vl.vid2location, v.Id) // somehow this line does not work as expected - // vl.vid2location[v.Id] = nil + // remove from vid2location map + location, ok := vl.vid2location[v.Id] + if !ok { + return + } + + if location.Remove(dn) { + + vl.ensureCorrectWritables(v) + + if location.Length() == 0 { + delete(vl.vid2location, v.Id) + } + + } +} + +func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { + if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { + if _, ok := vl.oversizedVolumes[v.Id]; !ok { + vl.addToWritable(v.Id) + } + } else { + vl.removeFromWritable(v.Id) + } } func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) { @@ -252,7 +270,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bo defer vl.accessLock.Unlock() vl.vid2location[vid].Set(dn) - if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { + if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() { return vl.setVolumeWritable(vid) } return false