better locking to prevent any possible memory access error

This commit is contained in:
Chris Lu 2013-07-15 21:34:43 -07:00
parent b409ccc5ab
commit dd2245956f

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"log" "log"
"math/rand" "math/rand"
"sync"
) )
type VolumeLayout struct { type VolumeLayout struct {
@ -13,6 +14,7 @@ type VolumeLayout struct {
writables []storage.VolumeId // transient array of writable volume id writables []storage.VolumeId // transient array of writable volume id
pulse int64 pulse int64
volumeSizeLimit uint64 volumeSizeLimit uint64
accessLock sync.Mutex
} }
func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout { func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
@ -26,6 +28,9 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu
} }
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
if _, ok := vl.vid2location[v.Id]; !ok { if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList() vl.vid2location[v.Id] = NewVolumeLocationList()
} }
@ -121,6 +126,9 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
} }
func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
if vl.vid2location[vid].Remove(dn) { if vl.vid2location[vid].Remove(dn) {
if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
log.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount()) log.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount())
@ -130,6 +138,9 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
return false return false
} }
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
if vl.vid2location[vid].Add(dn) { if vl.vid2location[vid].Add(dn) {
if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
return vl.setVolumeWritable(vid) return vl.setVolumeWritable(vid)
@ -139,6 +150,10 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
} }
func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
log.Println("Volume", vid, "reaches full capacity.")
return vl.removeFromWritable(vid) return vl.removeFromWritable(vid)
} }