mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
fix race detector found problems
This commit is contained in:
parent
26aaccca08
commit
3e8a3a8fec
|
@ -2,10 +2,11 @@ package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||||
|
@ -75,7 +76,7 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if in.GetVolumeSizeLimit() != 0 {
|
if in.GetVolumeSizeLimit() != 0 {
|
||||||
vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit()
|
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
|
||||||
}
|
}
|
||||||
if in.GetLeader() != "" && masterNode != in.GetLeader() {
|
if in.GetLeader() != "" && masterNode != in.GetLeader() {
|
||||||
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
|
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
|
||||||
|
|
|
@ -47,9 +47,7 @@ func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||||
|
|
||||||
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
|
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
|
||||||
e := WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
|
e := WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
|
||||||
if key > nm.MaximumFileKey {
|
nm.MaybeSetMaxFileKey(key)
|
||||||
nm.MaximumFileKey = key
|
|
||||||
}
|
|
||||||
if !offset.IsZero() && size != TombstoneFileSize {
|
if !offset.IsZero() && size != TombstoneFileSize {
|
||||||
nm.FileCounter++
|
nm.FileCounter++
|
||||||
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
||||||
|
@ -67,7 +65,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
glog.V(1).Infof("max file key: %d for file: %s", nm.MaximumFileKey, file.Name())
|
glog.V(1).Infof("max file key: %d for file: %s", nm.MaxFileKey(), file.Name())
|
||||||
return nm, e
|
return nm, e
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,51 +2,64 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
"github.com/willf/bloom"
|
"github.com/willf/bloom"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type mapMetric struct {
|
type mapMetric struct {
|
||||||
DeletionCounter int `json:"DeletionCounter"`
|
DeletionCounter uint32 `json:"DeletionCounter"`
|
||||||
FileCounter int `json:"FileCounter"`
|
FileCounter uint32 `json:"FileCounter"`
|
||||||
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
|
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
|
||||||
FileByteCounter uint64 `json:"FileByteCounter"`
|
FileByteCounter uint64 `json:"FileByteCounter"`
|
||||||
MaximumFileKey NeedleId `json:"MaxFileKey"`
|
MaximumFileKey uint64 `json:"MaxFileKey"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mm *mapMetric) logDelete(deletedByteCount uint32) {
|
func (mm *mapMetric) logDelete(deletedByteCount uint32) {
|
||||||
mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
|
mm.LogDeletionCounter(deletedByteCount)
|
||||||
mm.DeletionCounter++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
|
func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
|
||||||
if key > mm.MaximumFileKey {
|
mm.MaybeSetMaxFileKey(key)
|
||||||
mm.MaximumFileKey = key
|
mm.LogFileCounter(newSize)
|
||||||
|
if oldSize > 0 && oldSize != TombstoneFileSize {
|
||||||
|
mm.LogDeletionCounter(oldSize)
|
||||||
}
|
}
|
||||||
mm.FileCounter++
|
}
|
||||||
mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
|
func (mm mapMetric) LogFileCounter(newSize uint32) {
|
||||||
|
atomic.AddUint32(&mm.FileCounter, 1)
|
||||||
|
atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
|
||||||
|
}
|
||||||
|
func (mm mapMetric) LogDeletionCounter(oldSize uint32) {
|
||||||
if oldSize > 0 {
|
if oldSize > 0 {
|
||||||
mm.DeletionCounter++
|
atomic.AddUint32(&mm.DeletionCounter, 1)
|
||||||
mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
|
atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (mm mapMetric) ContentSize() uint64 {
|
||||||
|
return atomic.LoadUint64(&mm.FileByteCounter)
|
||||||
|
}
|
||||||
|
func (mm mapMetric) DeletedSize() uint64 {
|
||||||
|
return atomic.LoadUint64(&mm.DeletionByteCounter)
|
||||||
|
}
|
||||||
|
func (mm mapMetric) FileCount() int {
|
||||||
|
return int(atomic.LoadUint32(&mm.FileCounter))
|
||||||
|
}
|
||||||
|
func (mm mapMetric) DeletedCount() int {
|
||||||
|
return int(atomic.LoadUint32(&mm.DeletionCounter))
|
||||||
|
}
|
||||||
|
func (mm mapMetric) MaxFileKey() NeedleId {
|
||||||
|
t := uint64(mm.MaximumFileKey)
|
||||||
|
return NeedleId(t)
|
||||||
|
}
|
||||||
|
func (mm mapMetric) MaybeSetMaxFileKey(key NeedleId) {
|
||||||
|
if key > mm.MaxFileKey() {
|
||||||
|
atomic.StoreUint64(&mm.MaximumFileKey, uint64(key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mm mapMetric) ContentSize() uint64 {
|
|
||||||
return mm.FileByteCounter
|
|
||||||
}
|
|
||||||
func (mm mapMetric) DeletedSize() uint64 {
|
|
||||||
return mm.DeletionByteCounter
|
|
||||||
}
|
|
||||||
func (mm mapMetric) FileCount() int {
|
|
||||||
return mm.FileCounter
|
|
||||||
}
|
|
||||||
func (mm mapMetric) DeletedCount() int {
|
|
||||||
return mm.DeletionCounter
|
|
||||||
}
|
|
||||||
func (mm mapMetric) MaxFileKey() NeedleId {
|
|
||||||
return mm.MaximumFileKey
|
|
||||||
}
|
|
||||||
|
|
||||||
func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
|
func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
|
||||||
mm = &mapMetric{}
|
mm = &mapMetric{}
|
||||||
|
@ -56,9 +69,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
|
||||||
bf = bloom.NewWithEstimates(uint(entryCount), 0.001)
|
bf = bloom.NewWithEstimates(uint(entryCount), 0.001)
|
||||||
}, func(key NeedleId, offset Offset, size uint32) error {
|
}, func(key NeedleId, offset Offset, size uint32) error {
|
||||||
|
|
||||||
if key > mm.MaximumFileKey {
|
mm.MaybeSetMaxFileKey(key)
|
||||||
mm.MaximumFileKey = key
|
|
||||||
}
|
|
||||||
NeedleIdToBytes(buf, key)
|
NeedleIdToBytes(buf, key)
|
||||||
if size != TombstoneFileSize {
|
if size != TombstoneFileSize {
|
||||||
mm.FileByteCounter += uint64(size)
|
mm.FileByteCounter += uint64(size)
|
||||||
|
|
|
@ -2,6 +2,8 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||||
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
|
@ -22,7 +24,7 @@ type Store struct {
|
||||||
dataCenter string //optional informaton, overwriting master setting if exists
|
dataCenter string //optional informaton, overwriting master setting if exists
|
||||||
rack string //optional information, overwriting master setting if exists
|
rack string //optional information, overwriting master setting if exists
|
||||||
connected bool
|
connected bool
|
||||||
VolumeSizeLimit uint64 //read from the master
|
volumeSizeLimit uint64 //read from the master
|
||||||
Client master_pb.Seaweed_SendHeartbeatClient
|
Client master_pb.Seaweed_SendHeartbeatClient
|
||||||
NeedleMapType NeedleMapType
|
NeedleMapType NeedleMapType
|
||||||
NewVolumeIdChan chan VolumeId
|
NewVolumeIdChan chan VolumeId
|
||||||
|
@ -30,7 +32,7 @@ type Store struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) String() (str string) {
|
func (s *Store) String() (str string) {
|
||||||
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit)
|
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,7 +152,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||||
if maxFileKey < v.nm.MaxFileKey() {
|
if maxFileKey < v.nm.MaxFileKey() {
|
||||||
maxFileKey = v.nm.MaxFileKey()
|
maxFileKey = v.nm.MaxFileKey()
|
||||||
}
|
}
|
||||||
if !v.expired(s.VolumeSizeLimit) {
|
if !v.expired(s.GetVolumeSizeLimit()) {
|
||||||
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
|
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
|
||||||
} else {
|
} else {
|
||||||
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
||||||
|
@ -192,7 +194,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
|
||||||
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
|
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
|
||||||
_, size, err = v.writeNeedle(n)
|
_, size, err = v.writeNeedle(n)
|
||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize())
|
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -255,3 +257,11 @@ func (s *Store) DeleteVolume(i VolumeId) error {
|
||||||
|
|
||||||
return fmt.Errorf("Volume %d not found on disk", i)
|
return fmt.Errorf("Volume %d not found on disk", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) SetVolumeSizeLimit(x uint64) {
|
||||||
|
atomic.StoreUint64(&s.volumeSizeLimit, x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) GetVolumeSizeLimit() uint64 {
|
||||||
|
return atomic.LoadUint64(&s.volumeSizeLimit)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue