From 95e0d2f1b236f97f70d51a62c3df6a937a27286d Mon Sep 17 00:00:00 2001 From: chrislusf Date: Thu, 14 Apr 2016 01:28:40 -0700 Subject: [PATCH] make VolumeLayout thread safe --- go/topology/volume_layout.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 3c1dd9503..7b6c0f117 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -17,7 +17,7 @@ type VolumeLayout struct { vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id volumeSizeLimit uint64 - accessLock sync.Mutex + accessLock sync.RWMutex } func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { @@ -44,7 +44,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id].Set(dn) glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - vl.AddToWritable(v.Id) + vl.addToWritable(v.Id) } else { vl.removeFromWritable(v.Id) } @@ -58,7 +58,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { delete(vl.vid2location, v.Id) } -func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) { +func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { for _, id := range vl.writables { if vid == id { return @@ -74,6 +74,9 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { } func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + if location := vl.vid2location[vid]; location != nil { return location.list } @@ -81,6 +84,9 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { } func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + for _, location := range vl.vid2location { nodes = append(nodes, location.list...) } @@ -88,6 +94,9 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { } func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + len_writers := len(vl.writables) if len_writers <= 0 { glog.V(0).Infoln("No more writable volumes!") @@ -125,6 +134,9 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s } func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + if option.DataCenter == "" { return len(vl.writables) }