This commit is contained in:
Chris Lu 2020-03-22 18:32:49 -07:00
parent e32999108a
commit 35208711e5
2 changed files with 18 additions and 1 deletions

View file

@ -79,7 +79,7 @@ func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(d
} }
n.RUnlock() n.RUnlock()
if len(candidates) < numberOfNodes { if len(candidates) < numberOfNodes {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates") glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
return nil, nil, errors.New("No enough data node found!") return nil, nil, errors.New("No enough data node found!")
} }
@ -192,30 +192,46 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error)
} }
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
if maxVolumeCountDelta == 0 {
return
}
atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
println("node", n.Id(), "new max", n.maxVolumeCount, "delta", maxVolumeCountDelta)
if n.parent != nil { if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
if volumeCountDelta == 0 {
return
}
atomic.AddInt64(&n.volumeCount, volumeCountDelta) atomic.AddInt64(&n.volumeCount, volumeCountDelta)
if n.parent != nil { if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
if remoteVolumeCountDelta == 0 {
return
}
atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta) atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
if n.parent != nil { if n.parent != nil {
n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
if ecShardCountDelta == 0 {
return
}
atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
if n.parent != nil { if n.parent != nil {
n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
if activeVolumeCountDelta == 0 {
return
}
atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
if n.parent != nil { if n.parent != nil {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)

View file

@ -85,6 +85,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targe
if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
counter += c counter += c
} else { } else {
glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e)
return counter, e return counter, e
} }
} }