avoid race conditions for diskLocation.MaxVolumeCount (#3526)

This commit is contained in:
Konstantin Lebedev 2022-08-26 20:41:42 +05:00 committed by GitHub
parent 913a5d0dd4
commit 4f7a1f67cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 12 deletions

View file

@ -24,8 +24,8 @@ type DiskLocation struct {
DirectoryUuid string DirectoryUuid string
IdxDirectory string IdxDirectory string
DiskType types.DiskType DiskType types.DiskType
MaxVolumeCount int MaxVolumeCount int32
OriginalMaxVolumeCount int OriginalMaxVolumeCount int32
MinFreeSpace util.MinFreeSpace MinFreeSpace util.MinFreeSpace
volumes map[needle.VolumeId]*Volume volumes map[needle.VolumeId]*Volume
volumesLock sync.RWMutex volumesLock sync.RWMutex
@ -58,7 +58,7 @@ func GenerateDirUuid(dir string) (dirUuidString string, err error) {
return dirUuidString, nil return dirUuidString, nil
} }
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation { func NewDiskLocation(dir string, maxVolumeCount int32, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation {
dir = util.ResolvePath(dir) dir = util.ResolvePath(dir)
if idxDir == "" { if idxDir == "" {
idxDir = dir idxDir = dir

View file

@ -70,7 +70,7 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int,
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0) s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ { for i := 0; i < len(dirnames); i++ {
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpaces[i], idxFolder, diskTypes[i]) location := NewDiskLocation(dirnames[i], int32(maxVolumeCounts[i]), minFreeSpaces[i], idxFolder, diskTypes[i])
location.loadExistingVolumes(needleMapKind) location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location) s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
@ -116,7 +116,7 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume {
return nil return nil
} }
func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
max := 0 max := int32(0)
for _, location := range s.Locations { for _, location := range s.Locations {
if diskType != location.DiskType { if diskType != location.DiskType {
continue continue
@ -124,9 +124,9 @@ func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
if location.isDiskSpaceLow { if location.isDiskSpaceLow {
continue continue
} }
currentFreeCount := location.MaxVolumeCount - location.VolumesLen() currentFreeCount := location.MaxVolumeCount - int32(location.VolumesLen())
currentFreeCount *= erasure_coding.DataShardsCount currentFreeCount *= erasure_coding.DataShardsCount
currentFreeCount -= location.EcVolumesLen() currentFreeCount -= int32(location.EcVolumesLen())
currentFreeCount /= erasure_coding.DataShardsCount currentFreeCount /= erasure_coding.DataShardsCount
if currentFreeCount > max { if currentFreeCount > max {
max = currentFreeCount max = currentFreeCount
@ -539,19 +539,19 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
} }
for _, diskLocation := range s.Locations { for _, diskLocation := range s.Locations {
if diskLocation.OriginalMaxVolumeCount == 0 { if diskLocation.OriginalMaxVolumeCount == 0 {
currentMaxVolumeCount := diskLocation.MaxVolumeCount currentMaxVolumeCount := atomic.LoadInt32(&diskLocation.MaxVolumeCount)
diskStatus := stats.NewDiskStatus(diskLocation.Directory) diskStatus := stats.NewDiskStatus(diskLocation.Directory)
unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit) unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace) unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
volCount := diskLocation.VolumesLen() volCount := diskLocation.VolumesLen()
maxVolumeCount := volCount maxVolumeCount := int32(volCount)
if unclaimedSpaces > int64(volumeSizeLimit) { if unclaimedSpaces > int64(volumeSizeLimit) {
maxVolumeCount += int(uint64(unclaimedSpaces)/volumeSizeLimit) - 1 maxVolumeCount += int32(uint64(unclaimedSpaces)/volumeSizeLimit) - 1
} }
diskLocation.MaxVolumeCount = maxVolumeCount atomic.StoreInt32(&diskLocation.MaxVolumeCount, maxVolumeCount)
glog.V(2).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB", glog.V(2).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024) diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024)
hasChanges = hasChanges || currentMaxVolumeCount != diskLocation.MaxVolumeCount hasChanges = hasChanges || currentMaxVolumeCount != atomic.LoadInt32(&diskLocation.MaxVolumeCount)
} }
} }
return return