diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index e500de583..1816577b4 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -12,21 +12,23 @@ import ( // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { - rp *storage.ReplicaPlacement - ttl *storage.TTL - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - volumeSizeLimit uint64 - accessLock sync.RWMutex + rp *storage.ReplicaPlacement + ttl *storage.TTL + vid2location map[storage.VolumeId]*VolumeLocationList + writables []storage.VolumeId // transient array of writable volume id + oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes + volumeSizeLimit uint64 + accessLock sync.RWMutex } func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ - rp: rp, - ttl: ttl, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - volumeSizeLimit: volumeSizeLimit, + rp: rp, + ttl: ttl, + vid2location: make(map[storage.VolumeId]*VolumeLocationList), + writables: *new([]storage.VolumeId), + oversizedVolumes: make(map[storage.VolumeId]bool), + volumeSizeLimit: volumeSizeLimit, } } @@ -44,12 +46,21 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id].Set(dn) glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - vl.addToWritable(v.Id) + if _, ok := vl.oversizedVolumes[v.Id]; !ok { + vl.addToWritable(v.Id) + } } else { + vl.rememberOversizedVolumne(v) vl.removeFromWritable(v.Id) } } +func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) { + if vl.isOversized(v) { + vl.oversizedVolumes[v.Id] = true + } +} + func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -67,8 +78,12 @@ func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { vl.writables = append(vl.writables, vid) } +func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { + return uint64(v.Size) < vl.volumeSizeLimit +} + func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { - return uint64(v.Size) < vl.volumeSizeLimit && + return vl.isOversized(v) && v.Version == storage.CurrentVersion && !v.ReadOnly }