use hdd instead of empty string

This commit is contained in:
Chris Lu 2021-02-16 03:03:00 -08:00
parent f8446b42ab
commit 3fe628f04e
14 changed files with 29 additions and 28 deletions

View file

@ -120,7 +120,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
return nil, err return nil, err
} }
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.DiskType(req.DiskType)) volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats() stats := volumeLayout.Stats()
totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024

View file

@ -41,7 +41,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Ttl, req.Ttl,
req.Preallocate, req.Preallocate,
req.MemoryMapMaxSizeMb, req.MemoryMapMaxSizeMb,
types.DiskType(req.DiskType), types.ToDiskType(req.DiskType),
) )
if err != nil { if err != nil {

View file

@ -59,7 +59,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
if req.DiskType != "" { if req.DiskType != "" {
diskType = req.DiskType diskType = req.DiskType
} }
location := vs.store.FindFreeLocation(types.DiskType(diskType)) location := vs.store.FindFreeLocation(types.ToDiskType(diskType))
if location == nil { if location == nil {
return fmt.Errorf("no space left") return fmt.Errorf("no space left")
} }

View file

@ -115,14 +115,14 @@ func writeDiskInfo(writer io.Writer, t *master_pb.DiskInfo) statistics {
s = s.plus(writeVolumeInformationMessage(writer, vi)) s = s.plus(writeVolumeInformationMessage(writer, vi))
} }
for _, ecShardInfo := range t.EcShardInfos { for _, ecShardInfo := range t.EcShardInfos {
fmt.Fprintf(writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds()) fmt.Fprintf(writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds())
} }
fmt.Fprintf(writer, " Disk %s %+v \n", t.Type, s) fmt.Fprintf(writer, " Disk %s %+v \n", t.Type, s)
return s return s
} }
func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics { func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics {
fmt.Fprintf(writer, " volume %+v \n", t) fmt.Fprintf(writer, " volume %+v \n", t)
return newStatistics(t) return newStatistics(t)
} }

View file

@ -180,7 +180,7 @@ func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEc
func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
sort.Slice(otherNodes, func(i, j int) bool { sort.Slice(otherNodes, func(i, j int) bool {
return otherNodes[i].localVolumeRatio(capacityByMaxVolumeCount(types.DiskType(vol.DiskType))) < otherNodes[j].localVolumeRatio(capacityByMaxVolumeCount(types.DiskType(vol.DiskType))) return otherNodes[i].localVolumeRatio(capacityByMaxVolumeCount(types.ToDiskType(vol.DiskType))) < otherNodes[j].localVolumeRatio(capacityByMaxVolumeCount(types.ToDiskType(vol.DiskType)))
}) })
for i := 0; i < len(otherNodes); i++ { for i := 0; i < len(otherNodes); i++ {

View file

@ -91,7 +91,7 @@ func collectVolumeIdsForTierChange(commandEnv *CommandEnv, sourceTier string, se
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.DiskType(v.DiskType) == types.ToDiskType(sourceTier) { if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == types.ToDiskType(sourceTier) {
if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 {
vidMap[v.Id] = true vidMap[v.Id] = true
} }

View file

@ -7,7 +7,7 @@ import (
type DiskType string type DiskType string
const ( const (
HardDriveType DiskType = "" HardDriveType DiskType = "hdd"
SsdType = "ssd" SsdType = "ssd"
) )

View file

@ -14,10 +14,10 @@ import (
type DataNode struct { type DataNode struct {
NodeImpl NodeImpl
Ip string Ip string
Port int Port int
PublicUrl string PublicUrl string
LastSeen int64 // unix time in seconds LastSeen int64 // unix time in seconds
} }
func NewDataNode(id string) *DataNode { func NewDataNode(id string) *DataNode {
@ -80,7 +80,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
deletedVolumes = append(deletedVolumes, v) deletedVolumes = append(deletedVolumes, v)
deltaDiskUsages := newDiskUsages() deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(v.DiskType)) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
deltaDiskUsage.volumeCount = -1 deltaDiskUsage.volumeCount = -1
if v.IsRemote() { if v.IsRemote() {
deltaDiskUsage.remoteVolumeCount = -1 deltaDiskUsage.remoteVolumeCount = -1
@ -112,7 +112,7 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
delete(disk.volumes, v.Id) delete(disk.volumes, v.Id)
deltaDiskUsages := newDiskUsages() deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(v.DiskType)) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
deltaDiskUsage.volumeCount = -1 deltaDiskUsage.volumeCount = -1
if v.IsRemote() { if v.IsRemote() {
deltaDiskUsage.remoteVolumeCount = -1 deltaDiskUsage.remoteVolumeCount = -1
@ -194,7 +194,8 @@ func (dn *DataNode) ToMap() interface{} {
func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
m := &master_pb.DataNodeInfo{ m := &master_pb.DataNodeInfo{
Id: string(dn.Id()), Id: string(dn.Id()),
DiskInfos: make(map[string]*master_pb.DiskInfo),
} }
for _, c := range dn.Children() { for _, c := range dn.Children() {
disk := c.(*Disk) disk := c.(*Disk)

View file

@ -31,7 +31,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
disk := dn.getOrCreateDisk(ecShards.DiskType) disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages() deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(ecShards.DiskType)) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
vid := ecShards.VolumeId vid := ecShards.VolumeId
if actualEcShards, ok := actualEcShardMap[vid]; !ok { if actualEcShards, ok := actualEcShardMap[vid]; !ok {
@ -60,7 +60,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
disk := dn.getOrCreateDisk(ecShards.DiskType) disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages() deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(ecShards.DiskType)) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
if dn.hasEcShards(ecShards.VolumeId) { if dn.hasEcShards(ecShards.VolumeId) {
newShards = append(newShards, ecShards) newShards = append(newShards, ecShards)

View file

@ -63,7 +63,7 @@ func (d *DiskUsages) ToMap() interface{} {
defer d.RUnlock() defer d.RUnlock()
ret := make(map[string]interface{}) ret := make(map[string]interface{})
for diskType, diskUsage := range d.usages { for diskType, diskUsage := range d.usages {
ret[types.DiskType(diskType).String()] = diskUsage.ToMap() ret[diskType.String()] = diskUsage.ToMap()
} }
return ret return ret
} }
@ -156,7 +156,7 @@ func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool)
func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
deltaDiskUsages := newDiskUsages() deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(v.DiskType)) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
if oldV, ok := d.volumes[v.Id]; !ok { if oldV, ok := d.volumes[v.Id]; !ok {
d.volumes[v.Id] = v d.volumes[v.Id] = v
deltaDiskUsage.volumeCount = 1 deltaDiskUsage.volumeCount = 1
@ -228,7 +228,7 @@ func (d *Disk) GetTopology() *Topology {
func (d *Disk) ToMap() interface{} { func (d *Disk) ToMap() interface{} {
ret := make(map[string]interface{}) ret := make(map[string]interface{})
diskUsage := d.diskUsages.getOrCreateDisk(types.DiskType(d.Id())) diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
ret["Volumes"] = diskUsage.volumeCount ret["Volumes"] = diskUsage.volumeCount
ret["VolumeIds"] = d.GetVolumeIds() ret["VolumeIds"] = d.GetVolumeIds()
ret["EcShards"] = diskUsage.ecShardCount ret["EcShards"] = diskUsage.ecShardCount
@ -238,12 +238,12 @@ func (d *Disk) ToMap() interface{} {
} }
func (d *Disk) FreeSpace() int64 { func (d *Disk) FreeSpace() int64 {
t := d.diskUsages.getOrCreateDisk(types.DiskType(d.Id())) t := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
return t.FreeSpace() return t.FreeSpace()
} }
func (d *Disk) ToDiskInfo() *master_pb.DiskInfo { func (d *Disk) ToDiskInfo() *master_pb.DiskInfo {
diskUsage := d.diskUsages.getOrCreateDisk(types.DiskType(d.Id())) diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
m := &master_pb.DiskInfo{ m := &master_pb.DiskInfo{
Type: string(d.Id()), Type: string(d.Id()),
VolumeCount: uint64(diskUsage.volumeCount), VolumeCount: uint64(diskUsage.volumeCount),

View file

@ -30,7 +30,7 @@ func (d *Disk) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
} }
deltaDiskUsages := newDiskUsages() deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(d.Id())) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
deltaDiskUsage.ecShardCount = int64(delta) deltaDiskUsage.ecShardCount = int64(delta)
d.UpAdjustDiskUsageDelta(deltaDiskUsages) d.UpAdjustDiskUsageDelta(deltaDiskUsages)
@ -46,7 +46,7 @@ func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
delta := existing.ShardBits.ShardIdCount() - oldCount delta := existing.ShardBits.ShardIdCount() - oldCount
deltaDiskUsages := newDiskUsages() deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(d.Id())) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
deltaDiskUsage.ecShardCount = int64(delta) deltaDiskUsage.ecShardCount = int64(delta)
d.UpAdjustDiskUsageDelta(deltaDiskUsages) d.UpAdjustDiskUsageDelta(deltaDiskUsages)

View file

@ -193,7 +193,7 @@ func (n *NodeImpl) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
// the volume server may have set the max to zero // the volume server may have set the max to zero
continue continue
} }
dt := types.DiskType(diskType) dt := types.ToDiskType(diskType)
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
currentDiskUsage := n.diskUsages.getOrCreateDisk(dt) currentDiskUsage := n.diskUsages.getOrCreateDisk(dt)
deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount

View file

@ -46,7 +46,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
r.LinkChildNode(dn) r.LinkChildNode(dn)
for diskType, maxVolumeCount := range maxVolumeCounts { for diskType, maxVolumeCount := range maxVolumeCounts {
disk := NewDisk(diskType) disk := NewDisk(diskType)
disk.diskUsages.getOrCreateDisk(types.DiskType(diskType)).maxVolumeCount = int64(maxVolumeCount) disk.diskUsages.getOrCreateDisk(types.ToDiskType(diskType)).maxVolumeCount = int64(maxVolumeCount)
dn.LinkChildNode(disk) dn.LinkChildNode(disk)
} }
return dn return dn

View file

@ -52,7 +52,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
disk := dn.getOrCreateDisk(volumeInfo.DiskType) disk := dn.getOrCreateDisk(volumeInfo.DiskType)
deltaDiskUsages := newDiskUsages() deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(volumeInfo.DiskType)) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(volumeInfo.DiskType))
deltaDiskUsage.activeVolumeCount = -1 deltaDiskUsage.activeVolumeCount = -1
disk.UpAdjustDiskUsageDelta(deltaDiskUsages) disk.UpAdjustDiskUsageDelta(deltaDiskUsages)