add locks for location.volumes

fix https://github.com/chrislusf/seaweedfs/issues/392
This commit is contained in:
Chris Lu 2016-11-06 20:55:22 -08:00
parent df49692dff
commit 36f9633223
2 changed files with 47 additions and 6 deletions

View file

@ -3,6 +3,7 @@ package storage
import ( import (
"io/ioutil" "io/ioutil"
"strings" "strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
) )
@ -11,6 +12,7 @@ type DiskLocation struct {
Directory string Directory string
MaxVolumeCount int MaxVolumeCount int
volumes map[VolumeId]*Volume volumes map[VolumeId]*Volume
sync.RWMutex
} }
func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
@ -20,6 +22,8 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
} }
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
l.Lock()
defer l.Unlock()
if dirs, err := ioutil.ReadDir(l.Directory); err == nil { if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs { for _, dir := range dirs {
@ -48,6 +52,9 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
} }
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
l.Lock()
defer l.Unlock()
for k, v := range l.volumes { for k, v := range l.volumes {
if v.Collection == collection { if v.Collection == collection {
e = l.deleteVolumeById(k) e = l.deleteVolumeById(k)
@ -71,3 +78,35 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
delete(l.volumes, vid) delete(l.volumes, vid)
return return
} }
func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) {
l.Lock()
defer l.Unlock()
l.volumes[vid] = volume
}
func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) {
l.RLock()
defer l.RUnlock()
v, ok := l.volumes[vid]
return v, ok
}
func (l *DiskLocation) VolumesLen() int {
l.RLock()
defer l.RUnlock()
return len(l.volumes)
}
func (l *DiskLocation) Close() {
l.Lock()
defer l.Unlock()
for _, v := range l.volumes {
v.Close()
}
return
}

View file

@ -143,7 +143,7 @@ func (s *Store) DeleteCollection(collection string) (e error) {
func (s *Store) findVolume(vid VolumeId) *Volume { func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations { for _, location := range s.Locations {
if v, found := location.volumes[vid]; found { if v, found := location.FindVolume(vid); found {
return v return v
} }
} }
@ -152,7 +152,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume {
func (s *Store) findFreeLocation() (ret *DiskLocation) { func (s *Store) findFreeLocation() (ret *DiskLocation) {
max := 0 max := 0
for _, location := range s.Locations { for _, location := range s.Locations {
currentFreeCount := location.MaxVolumeCount - len(location.volumes) currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
if currentFreeCount > max { if currentFreeCount > max {
max = currentFreeCount max = currentFreeCount
ret = location ret = location
@ -168,7 +168,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl) location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume location.SetVolume(vid, volume)
return nil return nil
} else { } else {
return err return err
@ -180,6 +180,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM
func (s *Store) Status() []*VolumeInfo { func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo var stats []*VolumeInfo
for _, location := range s.Locations { for _, location := range s.Locations {
location.RLock()
for k, v := range location.volumes { for k, v := range location.volumes {
s := &VolumeInfo{ s := &VolumeInfo{
Id: VolumeId(k), Id: VolumeId(k),
@ -194,6 +195,7 @@ func (s *Store) Status() []*VolumeInfo {
Ttl: v.Ttl} Ttl: v.Ttl}
stats = append(stats, s) stats = append(stats, s)
} }
location.RUnlock()
} }
sortVolumeInfos(stats) sortVolumeInfos(stats)
return stats return stats
@ -219,6 +221,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
var maxFileKey uint64 var maxFileKey uint64
for _, location := range s.Locations { for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.Lock()
for k, v := range location.volumes { for k, v := range location.volumes {
if maxFileKey < v.nm.MaxFileKey() { if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey() maxFileKey = v.nm.MaxFileKey()
@ -246,6 +249,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
} }
} }
} }
location.Unlock()
} }
joinMessage := &operation.JoinMessage{ joinMessage := &operation.JoinMessage{
@ -290,9 +294,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
} }
func (s *Store) Close() { func (s *Store) Close() {
for _, location := range s.Locations { for _, location := range s.Locations {
for _, v := range location.volumes { location.Close()
v.Close()
}
} }
} }
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {