load configuration file, with topology configured

This commit is contained in:
Chris Lu 2012-09-22 20:46:31 -07:00
parent a34570fc5b
commit 73dbb3aed9
7 changed files with 68 additions and 68 deletions

View file

@ -34,6 +34,7 @@ var (
capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold") capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "./weed.xml", "configuration file")
) )
var mapper *directory.Mapper var mapper *directory.Mapper
@ -53,7 +54,7 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
for _, machine := range machines { for _, machine := range machines {
ret = append(ret, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl}) ret = append(ret, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl})
} }
writeJson(w, r, map[string]interface{}{"locations":ret}) writeJson(w, r, map[string]interface{}{"locations": ret})
} else { } else {
log.Println("Invalid volume id", volumeId) log.Println("Invalid volume id", volumeId)
writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. " + e.Error()}) writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. " + e.Error()})
@ -131,7 +132,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
} }
func runMaster(cmd *Command, args []string) bool { func runMaster(cmd *Command, args []string) bool {
topo = topology.NewTopology("topo", *metaFolder, "toposequence", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) topo = topology.NewTopology("topo", *confFile, *metaFolder, "toposequence", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
vg = replication.NewDefaultVolumeGrowth() vg = replication.NewDefaultVolumeGrowth()
log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)

View file

@ -1,34 +1,56 @@
package topology package topology
import ( import (
"encoding/xml" "encoding/xml"
) )
type loc struct {
dcName string
rackName string
}
type rack struct { type rack struct {
Name string `xml:"name,attr"` Name string `xml:"name,attr"`
Ips []string `xml:"Ip"` Ips []string `xml:"Ip"`
} }
type dataCenter struct { type dataCenter struct {
Name string `xml:"name,attr"` Name string `xml:"name,attr"`
Racks []rack `xml:"Rack"` Racks []rack `xml:"Rack"`
} }
type topology struct { type topology struct {
DataCenters []dataCenter `xml:"DataCenter"` DataCenters []dataCenter `xml:"DataCenter"`
} }
type configuration struct { type Configuration struct {
XMLName xml.Name `xml:"Configuration"` XMLName xml.Name `xml:"Configuration"`
Topo topology `xml:"Topology"` Topo topology `xml:"Topology"`
ip2location map[string]loc
} }
func NewConfiguration(b []byte) (*configuration, error){ func NewConfiguration(b []byte) (*Configuration, error) {
c := &configuration{} c := &Configuration{}
err := xml.Unmarshal(b, c) err := xml.Unmarshal(b, c)
return c, err c.ip2location = make(map[string]loc)
for _, dc := range c.Topo.DataCenters {
for _, rack := range dc.Racks {
for _, ip := range rack.Ips {
c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name}
}
}
}
return c, err
} }
func (c *configuration) String() string{ func (c *Configuration) String() string {
if b, e := xml.MarshalIndent(c, " ", " "); e==nil { if b, e := xml.MarshalIndent(c, " ", " "); e == nil {
return string(b) return string(b)
} }
return "" return ""
}
func (c *Configuration) Locate(ip string) (dc string, rack string) {
if c != nil && c.ip2location != nil {
if loc, ok := c.ip2location[ip]; ok {
return loc.dcName, loc.rackName
}
}
return "DefaultDataCenter", "DefaultRack"
} }

View file

@ -5,7 +5,6 @@ import (
type DataCenter struct { type DataCenter struct {
NodeImpl NodeImpl
ipRange *IpRange
} }
func NewDataCenter(id string) *DataCenter { func NewDataCenter(id string) *DataCenter {
@ -17,21 +16,14 @@ func NewDataCenter(id string) *DataCenter {
return dc return dc
} }
func (dc *DataCenter) MatchLocationRange(ip string) bool { func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
if dc.ipRange == nil {
return true
}
return dc.ipRange.Match(ip)
}
func (dc *DataCenter) GetOrCreateRack(ip string) *Rack {
for _, c := range dc.Children() { for _, c := range dc.Children() {
rack := c.(*Rack) rack := c.(*Rack)
if rack.MatchLocationRange(ip) { if string(rack.Id()) == rackName {
return rack return rack
} }
} }
rack := NewRack("DefaultRack") rack := NewRack(rackName)
dc.LinkChildNode(rack) dc.LinkChildNode(rack)
return rack return rack
} }

View file

@ -1,21 +0,0 @@
package topology
import (
)
type IpRange struct {
inclusives []string
exclusives []string
}
func (r *IpRange) Match(ip string) bool {
// TODO
// for _, exc := range r.exclusives {
// if exc
// }
// for _, inc := range r.inclusives {
// }
return true
}

View file

@ -137,7 +137,7 @@ func (n *NodeImpl) LinkChildNode(node Node) {
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n) node.SetParent(n)
fmt.Println(n, "adds child", node) fmt.Println(n, "adds child", node.Id())
} }
} }

View file

@ -7,7 +7,6 @@ import (
type Rack struct { type Rack struct {
NodeImpl NodeImpl
ipRange *IpRange
} }
func NewRack(id string) *Rack { func NewRack(id string) *Rack {
@ -19,13 +18,6 @@ func NewRack(id string) *Rack {
return r return r
} }
func (r *Rack) MatchLocationRange(ip string) bool {
if r.ipRange == nil {
return true
}
return r.ipRange.Match(ip)
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
for _, c := range r.Children() { for _, c := range r.Children() {
dn := c.(*DataNode) dn := c.(*DataNode)
@ -39,7 +31,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
return dn return dn
} }
} }
dn := NewDataNode("DataNode" + ip + ":" + strconv.Itoa(port)) dn := NewDataNode(ip + ":" + strconv.Itoa(port))
dn.Ip = ip dn.Ip = ip
dn.Port = port dn.Port = port
dn.PublicUrl = publicUrl dn.PublicUrl = publicUrl

View file

@ -6,6 +6,7 @@ import (
"pkg/directory" "pkg/directory"
"pkg/sequence" "pkg/sequence"
"pkg/storage" "pkg/storage"
"io/ioutil"
) )
type Topology struct { type Topology struct {
@ -23,9 +24,11 @@ type Topology struct {
chanDeadDataNodes chan *DataNode chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan *storage.VolumeInfo chanFullVolumes chan *storage.VolumeInfo
configuration *Configuration
} }
func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology { func NewTopology(id string, confFile string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology {
t := &Topology{} t := &Topology{}
t.id = NodeId(id) t.id = NodeId(id)
t.nodeType = "Topology" t.nodeType = "Topology"
@ -41,9 +44,19 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin
t.chanRecoveredDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan *storage.VolumeInfo) t.chanFullVolumes = make(chan *storage.VolumeInfo)
t.loadConfiguration(confFile)
return t return t
} }
func (t *Topology) loadConfiguration(configurationFile string)error{
b, e := ioutil.ReadFile(configurationFile);
if e ==nil{
t.configuration, e = NewConfiguration(b)
}
return e
}
func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
if t.FreeSpace() <= 0 { if t.FreeSpace() <= 0 {
return false, nil, nil return false, nil, nil
@ -97,8 +110,9 @@ func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
} }
func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
dc := t.GetOrCreateDataCenter(ip) dcName, rackName := t.configuration.Locate(ip)
rack := dc.GetOrCreateRack(ip) dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
for _, v := range volumeInfos { for _, v := range volumeInfos {
dn.AddOrUpdateVolume(v) dn.AddOrUpdateVolume(v)
@ -106,14 +120,14 @@ func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string,
} }
} }
func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter { func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
for _, c := range t.Children() { for _, c := range t.Children() {
dc := c.(*DataCenter) dc := c.(*DataCenter)
if dc.MatchLocationRange(ip) { if string(dc.Id()) == dcName {
return dc return dc
} }
} }
dc := NewDataCenter("DefaultDataCenter") dc := NewDataCenter(dcName)
t.LinkChildNode(dc) t.LinkChildNode(dc)
return dc return dc
} }