seaweedfs/weed/topology/data_node.go

273 lines
6.6 KiB
Go
Raw Permalink Normal View History

package topology
import (
"fmt"
2019-03-18 03:27:08 +00:00
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
2019-04-19 04:43:36 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2021-02-16 10:47:02 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"strconv"
2019-04-19 04:43:36 +00:00
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
2012-09-08 23:25:44 +00:00
type DataNode struct {
NodeImpl
2021-02-16 11:03:00 +00:00
Ip string
Port int
PublicUrl string
LastSeen int64 // unix time in seconds
}
2012-09-08 23:25:44 +00:00
func NewDataNode(id string) *DataNode {
2021-02-16 10:47:02 +00:00
dn := &DataNode{}
dn.id = NodeId(id)
dn.nodeType = "DataNode"
dn.diskUsages = newDiskUsages()
dn.children = make(map[NodeId]Node)
dn.NodeImpl.value = dn
return dn
2012-08-31 08:35:11 +00:00
}
func (dn *DataNode) String() string {
2016-05-20 06:32:56 +00:00
dn.RLock()
defer dn.RUnlock()
2021-02-16 10:47:02 +00:00
return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl)
}
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
2016-05-20 06:32:56 +00:00
dn.Lock()
defer dn.Unlock()
return dn.doAddOrUpdateVolume(v)
}
2021-02-16 10:47:02 +00:00
func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
c, found := dn.children[NodeId(diskType)]
if !found {
c = NewDisk(diskType)
2021-02-16 18:48:16 +00:00
dn.doLinkChildNode(c)
}
2021-02-16 10:47:02 +00:00
disk := c.(*Disk)
return disk
}
func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
disk := dn.getOrCreateDisk(v.DiskType)
return disk.AddOrUpdateVolume(v)
}
2021-02-16 10:47:02 +00:00
// UpdateVolumes detects new/deleted/changed volumes on a volume server
// used in master to notify master clients of these changes.
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
2019-04-19 04:43:36 +00:00
actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
dn.Lock()
defer dn.Unlock()
2021-02-16 10:47:02 +00:00
existingVolumes := dn.getVolumes()
for _, v := range existingVolumes {
vid := v.Id
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
2021-02-16 10:47:02 +00:00
disk := dn.getOrCreateDisk(v.DiskType)
delete(disk.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
2021-02-16 10:47:02 +00:00
deltaDiskUsages := newDiskUsages()
2021-02-16 11:03:00 +00:00
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
2021-02-16 10:47:02 +00:00
deltaDiskUsage.volumeCount = -1
if v.IsRemote() {
2021-02-16 10:47:02 +00:00
deltaDiskUsage.remoteVolumeCount = -1
}
if !v.ReadOnly {
2021-02-16 10:47:02 +00:00
deltaDiskUsage.activeVolumeCount = -1
}
2021-02-16 10:47:02 +00:00
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
}
for _, v := range actualVolumes {
isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
if isNew {
newVolumes = append(newVolumes, v)
}
if isChangedRO {
changeRO = append(changeRO, v)
}
}
return
}
func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
2019-04-20 18:35:20 +00:00
dn.Lock()
defer dn.Unlock()
2019-04-20 18:35:20 +00:00
for _, v := range deletedVolumes {
2021-02-16 10:47:02 +00:00
disk := dn.getOrCreateDisk(v.DiskType)
delete(disk.volumes, v.Id)
deltaDiskUsages := newDiskUsages()
2021-02-16 11:03:00 +00:00
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
2021-02-16 10:47:02 +00:00
deltaDiskUsage.volumeCount = -1
if v.IsRemote() {
2021-02-16 10:47:02 +00:00
deltaDiskUsage.remoteVolumeCount = -1
}
if !v.ReadOnly {
2021-02-16 10:47:02 +00:00
deltaDiskUsage.activeVolumeCount = -1
}
2021-02-16 10:47:02 +00:00
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
2019-04-20 18:35:20 +00:00
}
for _, v := range newVolumes {
dn.doAddOrUpdateVolume(v)
2019-04-20 18:35:20 +00:00
}
return
}
func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
deltaDiskUsages := newDiskUsages()
for diskType, maxVolumeCount := range maxVolumeCounts {
if maxVolumeCount == 0 {
// the volume server may have set the max to zero
continue
}
dt := types.ToDiskType(diskType)
currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) {
continue
}
disk := dn.getOrCreateDisk(dt.String())
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
}
2016-05-20 06:32:56 +00:00
func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
dn.RLock()
2021-02-16 10:47:02 +00:00
for _, c := range dn.children {
disk := c.(*Disk)
ret = append(ret, disk.GetVolumes()...)
2016-05-20 06:32:56 +00:00
}
dn.RUnlock()
return ret
}
2021-02-16 10:47:02 +00:00
func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) {
dn.RLock()
defer dn.RUnlock()
2021-02-16 10:47:02 +00:00
found := false
for _, c := range dn.children {
disk := c.(*Disk)
vInfo, found = disk.volumes[id]
if found {
break
}
}
if found {
2019-01-17 01:17:19 +00:00
return vInfo, nil
} else {
return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
}
}
func (dn *DataNode) GetDataCenter() *DataCenter {
rack := dn.Parent()
if rack == nil {
return nil
}
dcNode := rack.Parent()
if dcNode == nil {
return nil
}
dcValue := dcNode.GetValue()
return dcValue.(*DataCenter)
}
func (dn *DataNode) GetRack() *Rack {
return dn.Parent().(*NodeImpl).value.(*Rack)
}
func (dn *DataNode) GetTopology() *Topology {
p := dn.Parent()
for p.Parent() != nil {
p = p.Parent()
}
t := p.(*Topology)
return t
}
2012-09-14 08:17:13 +00:00
func (dn *DataNode) MatchLocation(ip string, port int) bool {
return dn.Ip == ip && dn.Port == port
}
2012-09-26 08:55:56 +00:00
func (dn *DataNode) Url() string {
return dn.Ip + ":" + strconv.Itoa(dn.Port)
2012-09-26 08:55:56 +00:00
}
func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
2012-09-26 08:55:56 +00:00
ret["Url"] = dn.Url()
ret["PublicUrl"] = dn.PublicUrl
2021-02-19 09:38:56 +00:00
// aggregated volume info
var volumeCount, ecShardCount, maxVolumeCount int64
var volumeIds string
for _, diskUsage := range dn.diskUsages.usages {
volumeCount += diskUsage.volumeCount
ecShardCount += diskUsage.ecShardCount
maxVolumeCount += diskUsage.maxVolumeCount
}
for _, disk := range dn.Children() {
d := disk.(*Disk)
2021-02-19 22:22:36 +00:00
volumeIds += " " + d.GetVolumeIds()
2021-02-19 09:38:56 +00:00
}
ret["Volumes"] = volumeCount
ret["EcShards"] = ecShardCount
ret["Max"] = maxVolumeCount
ret["VolumeIds"] = volumeIds
2021-02-19 09:38:56 +00:00
return ret
2012-09-14 08:17:13 +00:00
}
2019-03-18 03:27:08 +00:00
func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
m := &master_pb.DataNodeInfo{
2021-02-16 11:03:00 +00:00
Id: string(dn.Id()),
DiskInfos: make(map[string]*master_pb.DiskInfo),
2019-03-18 03:27:08 +00:00
}
2021-02-16 10:47:02 +00:00
for _, c := range dn.Children() {
disk := c.(*Disk)
m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo()
}
2019-03-18 03:27:08 +00:00
return m
}
// GetVolumeIds returns the human readable volume ids limited to count of max 100.
func (dn *DataNode) GetVolumeIds() string {
dn.RLock()
defer dn.RUnlock()
2021-02-16 10:47:02 +00:00
existingVolumes := dn.getVolumes()
ids := make([]int, 0, len(existingVolumes))
2021-02-16 10:47:02 +00:00
for k := range existingVolumes {
ids = append(ids, int(k))
}
return util.HumanReadableIntsMax(100, ids...)
}
2021-02-16 10:47:02 +00:00
func (dn *DataNode) getVolumes() []storage.VolumeInfo {
var existingVolumes []storage.VolumeInfo
for _, c := range dn.children {
disk := c.(*Disk)
existingVolumes = append(existingVolumes, disk.GetVolumes()...)
}
return existingVolumes
2021-02-16 11:03:00 +00:00
}