From 6df18a918103634fd4e5e3297e9d2b1597ec5b73 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 30 May 2016 12:30:26 -0700 Subject: [PATCH] rwlock concurrent read map --- go/topology/collection.go | 4 +- go/topology/data_node.go | 2 - go/topology/topology.go | 10 ++--- go/topology/topology_map.go | 4 +- go/topology/topology_vacuum.go | 4 +- go/util/concurrent_read_map.go | 44 ++++++++++++++----- .../master_server_handlers_admin.go | 2 +- 7 files changed, 45 insertions(+), 25 deletions(-) diff --git a/go/topology/collection.go b/go/topology/collection.go index 376b62405..6368900c3 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -35,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl * } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range c.storageType2VolumeLayout.Items { + for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { if list := vl.(*VolumeLayout).Lookup(vid); list != nil { return list @@ -46,7 +46,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.storageType2VolumeLayout.Items { + for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { nodes = append(nodes, list...) diff --git a/go/topology/data_node.go b/go/topology/data_node.go index 01419a791..3bad8c188 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -3,7 +3,6 @@ package topology import ( "fmt" "strconv" - "sync" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" @@ -11,7 +10,6 @@ import ( type DataNode struct { NodeImpl - sync.RWMutex volumes map[storage.VolumeId]storage.VolumeInfo Ip string Port int diff --git a/go/topology/topology.go b/go/topology/topology.go index ee1477cd2..088639eef 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -90,13 +90,13 @@ func (t *Topology) loadConfiguration(configurationFile string) error { func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { //maybe an issue if lots of collections? if collection == "" { - for _, c := range t.collectionMap.Items { + for _, c := range t.collectionMap.Items() { if list := c.(*Collection).Lookup(vid); list != nil { return list } } } else { - if c, ok := t.collectionMap.Items[collection]; ok { + if c, ok := t.collectionMap.Find(collection); ok { return c.(*Collection).Lookup(vid) } } @@ -130,13 +130,13 @@ func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPla }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) } -func (t *Topology) GetCollection(collectionName string) (*Collection, bool) { - c, hasCollection := t.collectionMap.Items[collectionName] +func (t *Topology) FindCollection(collectionName string) (*Collection, bool) { + c, hasCollection := t.collectionMap.Find(collectionName) return c.(*Collection), hasCollection } func (t *Topology) DeleteCollection(collectionName string) { - delete(t.collectionMap.Items, collectionName) + t.collectionMap.Delete(collectionName) } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index dff11aaad..ce8e9e663 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -11,9 +11,9 @@ func (t *Topology) ToMap() interface{} { } m["DataCenters"] = dcs var layouts []interface{} - for _, col := range t.collectionMap.Items { + for _, col := range t.collectionMap.Items() { c := col.(*Collection) - for _, layout := range c.storageType2VolumeLayout.Items { + for _, layout := range c.storageType2VolumeLayout.Items() { if layout != nil { tmp := layout.(*VolumeLayout).ToMap() tmp["collection"] = c.Name diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index 48bc8311d..eeb4fef69 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -81,10 +81,10 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis } func (t *Topology) Vacuum(garbageThreshold string) int { glog.V(0).Infoln("Start vacuum on demand") - for _, col := range t.collectionMap.Items { + for _, col := range t.collectionMap.Items() { c := col.(*Collection) glog.V(0).Infoln("vacuum on collection:", c.Name) - for _, vl := range c.storageType2VolumeLayout.Items { + for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) for vid, locationlist := range volumeLayout.vid2location { diff --git a/go/util/concurrent_read_map.go b/go/util/concurrent_read_map.go index ca1109f22..bbf303d82 100644 --- a/go/util/concurrent_read_map.go +++ b/go/util/concurrent_read_map.go @@ -7,31 +7,53 @@ import ( // A mostly for read map, which can thread-safely // initialize the map entries. type ConcurrentReadMap struct { - rmutex sync.RWMutex - Items map[string]interface{} + sync.RWMutex + + items map[string]interface{} } func NewConcurrentReadMap() *ConcurrentReadMap { - return &ConcurrentReadMap{Items: make(map[string]interface{})} + return &ConcurrentReadMap{items: make(map[string]interface{})} } func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { - m.rmutex.Lock() - defer m.rmutex.Unlock() - if value, ok := m.Items[key]; ok { + m.Lock() + defer m.Unlock() + if value, ok := m.items[key]; ok { return value } value = newEntry() - m.Items[key] = value + m.items[key] = value return value } func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { - m.rmutex.RLock() - if value, ok := m.Items[key]; ok { - m.rmutex.RUnlock() + m.RLock() + if value, ok := m.items[key]; ok { + m.RUnlock() return value } - m.rmutex.RUnlock() + m.RUnlock() return m.initMapEntry(key, newEntry) } + +func (m *ConcurrentReadMap) Find(key string) (interface{}, bool) { + m.RLock() + value, ok := m.items[key] + m.RUnlock() + return value, ok +} + +func (m *ConcurrentReadMap) Items() (itemsCopy []interface{}) { + m.RLock() + for _, i := range m.items { + itemsCopy = append(itemsCopy, i) + } + return itemsCopy +} + +func (m *ConcurrentReadMap) Delete(key string) { + m.Lock() + delete(m.items, key) + m.Unlock() +} diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 383456356..07399596a 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -19,7 +19,7 @@ import ( ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { - collection, ok := ms.Topo.GetCollection(r.FormValue("collection")) + collection, ok := ms.Topo.FindCollection(r.FormValue("collection")) if !ok { writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection"))) return