diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 717dc8f76..063fba5f7 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -187,8 +187,12 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float if vl != nil { volumeLayout := vl.(*VolumeLayout) if volumeId > 0 { - if volumeLayout.Lookup(needle.VolumeId(volumeId)) != nil { - t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) + vid := needle.VolumeId(volumeId) + volumeLayout.accessLock.RLock() + locationList, ok := volumeLayout.vid2location[vid] + volumeLayout.accessLock.RUnlock() + if ok { + t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) } } else { t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) @@ -208,23 +212,27 @@ func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeL volumeLayout.accessLock.RUnlock() for vid, locationList := range tmpMap { + t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) + } +} - volumeLayout.accessLock.RLock() - isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid) - volumeLayout.accessLock.RUnlock() +func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) { + volumeLayout.accessLock.RLock() + isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid) + isEnoughCopies := volumeLayout.enoughCopies(vid) + volumeLayout.accessLock.RUnlock() - if isReadOnly { - continue - } + if isReadOnly || !isEnoughCopies { + return + } - glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) - if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck( - grpcDialOption, vid, locationList, garbageThreshold); needVacuum { - if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { - t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) - } else { - t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) - } + glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) + if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck( + grpcDialOption, vid, locationList, garbageThreshold); needVacuum { + if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { + t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) + } else { + t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) } } }