diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index a7ef52336..c030cb558 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -86,14 +86,16 @@ func (t *Topology) Vacuum(garbageThreshold string) int { for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) - writableSet := make(map[storage.VolumeId]bool) - for _, id := range volumeLayout.writables { - writableSet[id] = true - } for vid, locationlist := range volumeLayout.vid2location { - if _, isWritable := writableSet[vid]; !isWritable { + + vl.accessLock.RLock() + isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] + vl.accessLock.RUnlock() + + if hasValue && isReadOnly { continue } + glog.V(0).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index af8503b29..79ed70c0f 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -16,6 +16,7 @@ type VolumeLayout struct { ttl *storage.TTL vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id + readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 accessLock sync.RWMutex @@ -27,6 +28,7 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL ttl: ttl, vid2location: make(map[storage.VolumeId]*VolumeLocationList), writables: *new([]storage.VolumeId), + readonlyVolumes: make(map[storage.VolumeId]bool), oversizedVolumes: make(map[storage.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, } @@ -50,11 +52,15 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if v_info.ReadOnly { glog.V(3).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) + vl.readonlyVolumes[v.Id] = true return + } else { + delete(vl.readonlyVolumes, v.Id) } } else { glog.V(3).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) + delete(vl.readonlyVolumes, v.Id) return } }