diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index cc3c83b63..fc11a411f 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -3,6 +3,7 @@ package storage import ( "io/ioutil" "strings" + "sync" "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -11,6 +12,7 @@ type DiskLocation struct { Directory string MaxVolumeCount int volumes map[VolumeId]*Volume + sync.RWMutex } func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { @@ -20,6 +22,8 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { } func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { + l.Lock() + defer l.Unlock() if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { @@ -48,6 +52,9 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { } func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { + l.Lock() + defer l.Unlock() + for k, v := range l.volumes { if v.Collection == collection { e = l.deleteVolumeById(k) @@ -71,3 +78,35 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { delete(l.volumes, vid) return } + +func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) { + l.Lock() + defer l.Unlock() + + l.volumes[vid] = volume +} + +func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) { + l.RLock() + defer l.RUnlock() + + v, ok := l.volumes[vid] + return v, ok +} + +func (l *DiskLocation) VolumesLen() int { + l.RLock() + defer l.RUnlock() + + return len(l.volumes) +} + +func (l *DiskLocation) Close() { + l.Lock() + defer l.Unlock() + + for _, v := range l.volumes { + v.Close() + } + return +} diff --git a/weed/storage/store.go b/weed/storage/store.go index d6c7172e7..614c87ace 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -143,7 +143,7 @@ func (s *Store) DeleteCollection(collection string) (e error) { func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.Locations { - if v, found := location.volumes[vid]; found { + if v, found := location.FindVolume(vid); found { return v } } @@ -152,7 +152,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume { func (s *Store) findFreeLocation() (ret *DiskLocation) { max := 0 for _, location := range s.Locations { - currentFreeCount := location.MaxVolumeCount - len(location.volumes) + currentFreeCount := location.MaxVolumeCount - location.VolumesLen() if currentFreeCount > max { max = currentFreeCount ret = location @@ -168,7 +168,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { - location.volumes[vid] = volume + location.SetVolume(vid, volume) return nil } else { return err @@ -180,6 +180,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for _, location := range s.Locations { + location.RLock() for k, v := range location.volumes { s := &VolumeInfo{ Id: VolumeId(k), @@ -194,6 +195,7 @@ func (s *Store) Status() []*VolumeInfo { Ttl: v.Ttl} stats = append(stats, s) } + location.RUnlock() } sortVolumeInfos(stats) return stats @@ -219,6 +221,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S var maxFileKey uint64 for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + location.Lock() for k, v := range location.volumes { if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() @@ -246,6 +249,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } } } + location.Unlock() } joinMessage := &operation.JoinMessage{ @@ -290,9 +294,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } func (s *Store) Close() { for _, location := range s.Locations { - for _, v := range location.volumes { - v.Close() - } + location.Close() } } func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {