seaweedfs/weed-fs/src/pkg/topology/topology.go

128 lines
3.6 KiB
Go
Raw Normal View History

2012-08-24 05:33:37 +00:00
package topology
import (
"errors"
"math/rand"
"pkg/directory"
"pkg/sequence"
"pkg/storage"
2012-08-24 05:33:37 +00:00
)
type Topology struct {
NodeImpl
//transient vid~servers mapping for each replication type
replicaType2VolumeLayout []*VolumeLayout
pulse int64
volumeSizeLimit uint64
sequence sequence.Sequencer
2012-08-31 08:35:11 +00:00
}
func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
t.children = make(map[NodeId]Node)
t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.sequence = sequence.NewSequencer(dirname, filename)
return t
}
func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
if t.FreeSpace() <= 0 {
return false, nil, nil
}
2012-09-03 08:50:04 +00:00
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
return ret, node, &vid
}
func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) {
freeSpace := t.FreeSpace()
for _, node := range except {
freeSpace -= node.FreeSpace()
}
if freeSpace <= 0 {
return false, nil, nil
}
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
return ret, node, &vid
2012-09-03 08:50:04 +00:00
}
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
2012-08-29 08:37:40 +00:00
return vid.Next()
}
func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
replicationTypeIndex := storage.GetReplicationLevelIndex(repType)
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
}
vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
if err != nil {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
fileId, count := t.sequence.NextFileId(count)
return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
replicationTypeIndex := storage.GetReplicationLevelIndex(repType)
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
}
return t.replicaType2VolumeLayout[replicationTypeIndex]
2012-09-14 08:17:13 +00:00
}
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
}
func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
2012-09-14 08:17:13 +00:00
dc := t.GetOrCreateDataCenter(ip)
rack := dc.GetOrCreateRack(ip)
dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
for _, v := range volumeInfos {
dn.AddOrUpdateVolume(&v)
t.RegisterVolumeLayout(&v, dn)
}
2012-09-14 08:17:13 +00:00
}
func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter {
for _, c := range t.Children() {
dc := c.(*DataCenter)
if dc.MatchLocationRange(ip) {
return dc
}
}
dc := NewDataCenter("DefaultDataCenter")
t.LinkChildNode(dc)
return dc
}
func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{})
m["Free"] = t.FreeSpace()
var dcs []interface{}
for _, c := range t.Children() {
dc := c.(*DataCenter)
dcs = append(dcs, dc.ToMap())
}
m["DataCenters"] = dcs
var layouts []interface{}
for _, layout := range t.replicaType2VolumeLayout {
if layout != nil {
layouts = append(layouts, layout.ToMap())
}
}
m["layouts"] = layouts
return m
}