diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 4d9d4cbe5..a301103eb 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -40,6 +40,10 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } + + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + for _, dn := range vl.vid2location[volumeInfo.Id].list { if !volumeInfo.ReadOnly { dn.UpAdjustActiveVolumeCountDelta(-1) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 29b3c339b..c270ef9d4 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -122,29 +122,41 @@ func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int { for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) - for vid, locationlist := range volumeLayout.vid2location { - - volumeLayout.accessLock.RLock() - isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] - volumeLayout.accessLock.RUnlock() - - if hasValue && isReadOnly { - continue - } - - glog.V(0).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) - if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) { - batchVacuumVolumeCommit(volumeLayout, vid, locationlist) - } - } - } + vacuumOneVolumeLayout(volumeLayout, c, garbageThreshold, preallocate) } } } return 0 } +func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { + + volumeLayout.accessLock.RLock() + tmpMap := make(map[storage.VolumeId]*VolumeLocationList) + for vid, locationlist := range volumeLayout.vid2location { + tmpMap[vid] = locationlist + } + volumeLayout.accessLock.RUnlock() + + for vid, locationlist := range tmpMap { + + volumeLayout.accessLock.RLock() + isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] + volumeLayout.accessLock.RUnlock() + + if hasValue && isReadOnly { + continue + } + + glog.V(0).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) + if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) { + batchVacuumVolumeCommit(volumeLayout, vid, locationlist) + } + } + } +} + type VacuumVolumeResult struct { Result bool Error string