weed master: atomic volume counting

possible fix for https://github.com/chrislusf/seaweedfs/issues/913
This commit is contained in:
Chris Lu 2019-04-04 19:27:00 -07:00
parent 715a38da1e
commit 766396d249
5 changed files with 31 additions and 30 deletions

View file

@ -63,7 +63,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
rack := dc.GetOrCreateRack(rackName) rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip, dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl, int(heartbeat.Port), heartbeat.PublicUrl,
int(heartbeat.MaxVolumeCount)) int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{ if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,

View file

@ -68,8 +68,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
} }
if err == nil { if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
} else { } else {
count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo) count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo)
} }

View file

@ -5,6 +5,7 @@ import (
"math/rand" "math/rand"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
@ -14,16 +15,16 @@ type NodeId string
type Node interface { type Node interface {
Id() NodeId Id() NodeId
String() string String() string
FreeSpace() int FreeSpace() int64
ReserveOneVolume(r int) (*DataNode, error) ReserveOneVolume(r int64) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
UpAdjustVolumeCountDelta(volumeCountDelta int) UpAdjustVolumeCountDelta(volumeCountDelta int64)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
UpAdjustMaxVolumeId(vid storage.VolumeId) UpAdjustMaxVolumeId(vid storage.VolumeId)
GetVolumeCount() int GetVolumeCount() int64
GetActiveVolumeCount() int GetActiveVolumeCount() int64
GetMaxVolumeCount() int GetMaxVolumeCount() int64
GetMaxVolumeId() storage.VolumeId GetMaxVolumeId() storage.VolumeId
SetParent(Node) SetParent(Node)
LinkChildNode(node Node) LinkChildNode(node Node)
@ -40,9 +41,9 @@ type Node interface {
} }
type NodeImpl struct { type NodeImpl struct {
id NodeId id NodeId
volumeCount int volumeCount int64
activeVolumeCount int activeVolumeCount int64
maxVolumeCount int maxVolumeCount int64
parent Node parent Node
sync.RWMutex // lock children sync.RWMutex // lock children
children map[NodeId]Node children map[NodeId]Node
@ -126,7 +127,7 @@ func (n *NodeImpl) String() string {
func (n *NodeImpl) Id() NodeId { func (n *NodeImpl) Id() NodeId {
return n.id return n.id
} }
func (n *NodeImpl) FreeSpace() int { func (n *NodeImpl) FreeSpace() int64 {
return n.maxVolumeCount - n.volumeCount return n.maxVolumeCount - n.volumeCount
} }
func (n *NodeImpl) SetParent(node Node) { func (n *NodeImpl) SetParent(node Node) {
@ -146,7 +147,7 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} { func (n *NodeImpl) GetValue() interface{} {
return n.value return n.value
} }
func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
n.RLock() n.RLock()
defer n.RUnlock() defer n.RUnlock()
for _, node := range n.children { for _, node := range n.children {
@ -171,20 +172,20 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
return nil, errors.New("No free volume slot found!") return nil, errors.New("No free volume slot found!")
} }
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
n.maxVolumeCount += maxVolumeCountDelta atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
if n.parent != nil { if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
n.volumeCount += volumeCountDelta atomic.AddInt64(&n.volumeCount, volumeCountDelta)
if n.parent != nil { if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
n.activeVolumeCount += activeVolumeCountDelta atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
if n.parent != nil { if n.parent != nil {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
} }
@ -200,13 +201,13 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
return n.maxVolumeId return n.maxVolumeId
} }
func (n *NodeImpl) GetVolumeCount() int { func (n *NodeImpl) GetVolumeCount() int64 {
return n.volumeCount return n.volumeCount
} }
func (n *NodeImpl) GetActiveVolumeCount() int { func (n *NodeImpl) GetActiveVolumeCount() int64 {
return n.activeVolumeCount return n.activeVolumeCount
} }
func (n *NodeImpl) GetMaxVolumeCount() int { func (n *NodeImpl) GetMaxVolumeCount() int64 {
return n.maxVolumeCount return n.maxVolumeCount
} }

View file

@ -28,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
} }
return nil return nil
} }
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode {
for _, c := range r.Children() { for _, c := range r.Children() {
dn := c.(*DataNode) dn := c.(*DataNode)
if dn.MatchLocation(ip, port) { if dn.MatchLocation(ip, port) {

View file

@ -105,7 +105,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if len(node.Children()) < rp.DiffRackCount+1 { if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
} }
if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
} }
possibleRacksCount := 0 possibleRacksCount := 0
@ -134,7 +134,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack) return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
} }
if node.FreeSpace() < rp.SameRackCount+1 { if node.FreeSpace() < int64(rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
} }
if len(node.Children()) < rp.SameRackCount+1 { if len(node.Children()) < rp.SameRackCount+1 {
@ -175,7 +175,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
servers = append(servers, server.(*DataNode)) servers = append(servers, server.(*DataNode))
} }
for _, rack := range otherRacks { for _, rack := range otherRacks {
r := rand.Intn(rack.FreeSpace()) r := rand.Int63n(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil { if server, e := rack.ReserveOneVolume(r); e == nil {
servers = append(servers, server) servers = append(servers, server)
} else { } else {
@ -183,7 +183,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
} }
} }
for _, datacenter := range otherDataCenters { for _, datacenter := range otherDataCenters {
r := rand.Intn(datacenter.FreeSpace()) r := rand.Int63n(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil { if server, e := datacenter.ReserveOneVolume(r); e == nil {
servers = append(servers, server) servers = append(servers, server)
} else { } else {