remember oversized volumes

fix https://github.com/chrislusf/seaweedfs/issues/331
This commit is contained in:
Chris Lu 2016-06-27 15:28:23 -07:00
parent d0dbf6d2ea
commit b617b13c43

View file

@ -12,21 +12,23 @@ import (
// mapping from volume to its locations, inverted from server to volume // mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct { type VolumeLayout struct {
rp *storage.ReplicaPlacement rp *storage.ReplicaPlacement
ttl *storage.TTL ttl *storage.TTL
vid2location map[storage.VolumeId]*VolumeLocationList vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64 oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes
accessLock sync.RWMutex volumeSizeLimit uint64
accessLock sync.RWMutex
} }
func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{ return &VolumeLayout{
rp: rp, rp: rp,
ttl: ttl, ttl: ttl,
vid2location: make(map[storage.VolumeId]*VolumeLocationList), vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId), writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit, oversizedVolumes: make(map[storage.VolumeId]bool),
volumeSizeLimit: volumeSizeLimit,
} }
} }
@ -44,12 +46,21 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id].Set(dn) vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
vl.addToWritable(v.Id) if _, ok := vl.oversizedVolumes[v.Id]; !ok {
vl.addToWritable(v.Id)
}
} else { } else {
vl.rememberOversizedVolumne(v)
vl.removeFromWritable(v.Id) vl.removeFromWritable(v.Id)
} }
} }
func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) {
if vl.isOversized(v) {
vl.oversizedVolumes[v.Id] = true
}
}
func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock() vl.accessLock.Lock()
defer vl.accessLock.Unlock() defer vl.accessLock.Unlock()
@ -67,8 +78,12 @@ func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
vl.writables = append(vl.writables, vid) vl.writables = append(vl.writables, vid)
} }
func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
return uint64(v.Size) < vl.volumeSizeLimit
}
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
return uint64(v.Size) < vl.volumeSizeLimit && return vl.isOversized(v) &&
v.Version == storage.CurrentVersion && v.Version == storage.CurrentVersion &&
!v.ReadOnly !v.ReadOnly
} }