package topology import ( "fmt" "github.com/chrislusf/seaweedfs/weed/util" "strconv" "sync" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" ) type DataNode struct { NodeImpl volumes map[needle.VolumeId]storage.VolumeInfo Ip string Port int PublicUrl string LastSeen int64 // unix time in seconds ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo ecShardsLock sync.RWMutex } func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) s.NodeImpl.value = s return s } func (dn *DataNode) String() string { dn.RLock() defer dn.RUnlock() return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) } func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { dn.Lock() defer dn.Unlock() return dn.doAddOrUpdateVolume(v) } func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { if oldV, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v if v.DiskType == storage.SsdType { dn.UpAdjustSsdVolumeCountDelta(1) } else { dn.UpAdjustVolumeCountDelta(1) } if v.IsRemote() { dn.UpAdjustRemoteVolumeCountDelta(1) } if !v.ReadOnly { dn.UpAdjustActiveVolumeCountDelta(1) } dn.UpAdjustMaxVolumeId(v.Id) isNew = true } else { if oldV.IsRemote() != v.IsRemote() { if v.IsRemote() { dn.UpAdjustRemoteVolumeCountDelta(1) } if oldV.IsRemote() { dn.UpAdjustRemoteVolumeCountDelta(-1) } } isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly dn.volumes[v.Id] = v } return } func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } dn.Lock() defer dn.Unlock() for vid, v := range dn.volumes { if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) delete(dn.volumes, vid) deletedVolumes = append(deletedVolumes, v) if v.DiskType == storage.SsdType { dn.UpAdjustSsdVolumeCountDelta(-1) } else { dn.UpAdjustVolumeCountDelta(-1) } if v.IsRemote() { dn.UpAdjustRemoteVolumeCountDelta(-1) } if !v.ReadOnly { dn.UpAdjustActiveVolumeCountDelta(-1) } } } for _, v := range actualVolumes { isNew, isChangedRO := dn.doAddOrUpdateVolume(v) if isNew { newVolumes = append(newVolumes, v) } if isChangedRO { changeRO = append(changeRO, v) } } return } func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) { dn.Lock() defer dn.Unlock() for _, v := range deletedVolumes { delete(dn.volumes, v.Id) if v.DiskType == storage.SsdType { dn.UpAdjustSsdVolumeCountDelta(-1) } else { dn.UpAdjustVolumeCountDelta(-1) } if v.IsRemote() { dn.UpAdjustRemoteVolumeCountDelta(-1) } if !v.ReadOnly { dn.UpAdjustActiveVolumeCountDelta(-1) } } for _, v := range newVolumes { dn.doAddOrUpdateVolume(v) } return } func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() for _, v := range dn.volumes { ret = append(ret, v) } dn.RUnlock() return ret } func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { dn.RLock() defer dn.RUnlock() vInfo, ok := dn.volumes[id] if ok { return vInfo, nil } else { return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") } } func (dn *DataNode) GetDataCenter() *DataCenter { return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) } func (dn *DataNode) GetRack() *Rack { return dn.Parent().(*NodeImpl).value.(*Rack) } func (dn *DataNode) GetTopology() *Topology { p := dn.Parent() for p.Parent() != nil { p = p.Parent() } t := p.(*Topology) return t } func (dn *DataNode) MatchLocation(ip string, port int) bool { return dn.Ip == ip && dn.Port == port } func (dn *DataNode) Url() string { return dn.Ip + ":" + strconv.Itoa(dn.Port) } func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() ret["Volumes"] = dn.GetVolumeCount() + dn.GetSsdVolumeCount() ret["VolumeIds"] = dn.GetVolumeIds() ret["EcShards"] = dn.GetEcShardCount() ret["Max"] = dn.GetMaxVolumeCount() + dn.GetMaxSsdVolumeCount() ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl return ret } func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { m := &master_pb.DataNodeInfo{ Id: string(dn.Id()), VolumeCount: uint64(dn.GetVolumeCount()), MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), MaxSsdVolumeCount: uint64(dn.GetMaxSsdVolumeCount()), SsdVolumeCount: uint64(dn.GetSsdVolumeCount()), FreeVolumeCount: uint64(dn.FreeSpace()), ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()), } for _, v := range dn.GetVolumes() { m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) } for _, ecv := range dn.GetEcShards() { m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) } return m } // GetVolumeIds returns the human readable volume ids limited to count of max 100. func (dn *DataNode) GetVolumeIds() string { dn.RLock() defer dn.RUnlock() ids := make([]int, 0, len(dn.volumes)) for k := range dn.volumes { ids = append(ids, int(k)) } return util.HumanReadableIntsMax(100, ids...) }