From 50269b74ce615ab02f6bf64a2bc0fc9e71122267 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 19 Jun 2013 18:10:38 -0700 Subject: [PATCH] add dataCenter option when assign file keys add dataCenter option when starting volume servers some work related to freeze a volume. Not tested yet. --- go/replication/store_replicate.go | 2 +- go/replication/volume_growth.go | 48 +++++++++++-------- go/replication/volume_growth_test.go | 13 +----- go/storage/needle_map.go | 70 +++++++++++++++++++--------- go/storage/store.go | 20 ++++++++ go/storage/volume.go | 52 +++++++++++++++++++-- go/topology/configuration.go | 17 +++++-- go/topology/data_node.go | 5 +- go/topology/node.go | 15 ++++-- go/topology/node_list.go | 30 ++++++++---- go/topology/node_list_test.go | 24 ++++++++-- go/topology/topo_test.go | 5 +- go/topology/topology.go | 28 ++++------- go/topology/volume_layout.go | 44 ++++++++++++++--- go/weed/master.go | 12 +++-- go/weed/volume.go | 17 ++++++- 16 files changed, 287 insertions(+), 115 deletions(-) diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go index cc5d806d2..8306a31b4 100644 --- a/go/replication/store_replicate.go +++ b/go/replication/store_replicate.go @@ -3,7 +3,7 @@ package replication import ( "bytes" "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/storage" "log" "net/http" "strconv" diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go index 1f266f73f..ce54b1fd4 100644 --- a/go/replication/volume_growth.go +++ b/go/replication/volume_growth.go @@ -31,24 +31,24 @@ func NewDefaultVolumeGrowth() *VolumeGrowth { return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} } -func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) { +func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (int, error) { switch repType { case storage.Copy000: - return vg.GrowByCountAndType(vg.copy1factor, repType, topo) + return vg.GrowByCountAndType(vg.copy1factor, repType, dataCenter, topo) case storage.Copy001: - return vg.GrowByCountAndType(vg.copy2factor, repType, topo) + return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) case storage.Copy010: - return vg.GrowByCountAndType(vg.copy2factor, repType, topo) + return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) case storage.Copy100: - return vg.GrowByCountAndType(vg.copy2factor, repType, topo) + return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) case storage.Copy110: - return vg.GrowByCountAndType(vg.copy3factor, repType, topo) + return vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo) case storage.Copy200: - return vg.GrowByCountAndType(vg.copy3factor, repType, topo) + return vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo) } return 0, errors.New("Unknown Replication Type!") } -func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() @@ -56,16 +56,20 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio switch repType { case storage.Copy000: for i := 0; i < count; i++ { - if ok, server, vid := topo.RandomlyReserveOneVolume(); ok { + if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok { if err = vg.grow(topo, *vid, repType, server); err == nil { counter++ + } else { + return counter, err } + } else { + return counter, fmt.Errorf("Failed to grown volume for data center %s", dataCenter) } } case storage.Copy001: for i := 0; i < count; i++ { - //randomly pick one server, and then choose from the same rack - if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { + //randomly pick one server from the datacenter, and then choose from the same rack + if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok { rack := server1.Parent() exclusion := make(map[string]topology.Node) exclusion[server1.String()] = server1 @@ -81,8 +85,8 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } case storage.Copy010: for i := 0; i < count; i++ { - //randomly pick one server, and then choose from the same rack - if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { + //randomly pick one server from the datacenter, and then choose from the a different rack + if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok { rack := server1.Parent() dc := rack.Parent() exclusion := make(map[string]topology.Node) @@ -100,28 +104,32 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio case storage.Copy100: for i := 0; i < count; i++ { nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(2, 1) + picked, ret := nl.RandomlyPickN(2, 1, dataCenter) vid := topo.NextVolumeId() + println("growing on picked servers", picked) if ret { var servers []*topology.DataNode for _, n := range picked { if n.FreeSpace() > 0 { - if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { + if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok { servers = append(servers, server) } } } + println("growing on servers", servers) if len(servers) == 2 { if err = vg.grow(topo, vid, repType, servers...); err == nil { counter++ } } + } else { + return counter, fmt.Errorf("Failed to grown volume on data center %s and another data center", dataCenter) } } case storage.Copy110: for i := 0; i < count; i++ { nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(2, 2) + picked, ret := nl.RandomlyPickN(2, 2, dataCenter) vid := topo.NextVolumeId() if ret { var servers []*topology.DataNode @@ -130,7 +138,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio dc1, dc2 = dc2, dc1 } if dc1.FreeSpace() > 0 { - if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok { + if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid, ""); ok { servers = append(servers, server1) rack := server1.Parent() exclusion := make(map[string]topology.Node) @@ -144,7 +152,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } } if dc2.FreeSpace() > 0 { - if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok { + if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid, ""); ok { servers = append(servers, server) } } @@ -158,13 +166,13 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio case storage.Copy200: for i := 0; i < count; i++ { nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(3, 1) + picked, ret := nl.RandomlyPickN(3, 1, dataCenter) vid := topo.NextVolumeId() if ret { var servers []*topology.DataNode for _, n := range picked { if n.FreeSpace() > 0 { - if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { + if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok { servers = append(servers, server) } } diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go index a4104716e..031972bee 100644 --- a/go/replication/volume_growth_test.go +++ b/go/replication/volume_growth_test.go @@ -5,9 +5,7 @@ import ( "code.google.com/p/weed-fs/go/topology" "encoding/json" "fmt" - "math/rand" "testing" - "time" ) var topologyLayout = ` @@ -80,7 +78,7 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo, err := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", + topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) if err != nil { panic("error: " + err.Error()) @@ -125,12 +123,3 @@ func TestRemoveDataCenter(t *testing.T) { t.Fail() } } - -func TestReserveOneVolume(t *testing.T) { - topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4} - if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil { - t.Log("reserved", c) - } -} diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index c836a87fb..89773b341 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -52,37 +52,61 @@ const ( func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) - bufferReader := bufio.NewReaderSize(nm.indexFile, 1024*1024) - bytes := make([]byte, 16*RowsToRead) - count, e := bufferReader.Read(bytes) - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - } else { - oldSize := nm.m.Delete(Key(key)) - //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) + e := walkIndexFile(file, func(key uint64, offset, size uint32) error { + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) + if oldSize > 0 { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } + } else { + oldSize := nm.m.Delete(Key(key)) + //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } + return nil + }) + return nm, e +} - count, e = bufferReader.Read(bytes) +// walks through the index file, calls fn function with each key, offset, size +// stops with the error returned by the fn function +func walkIndexFile(r io.Reader, fn func(key uint64, offset, size uint32) error) error { + br := bufio.NewReaderSize(r, 1024*1024) + bytes := make([]byte, 16*RowsToRead) + count, e := br.Read(bytes) + var ( + key uint64 + offset, size uint32 + i int + ) + + for count > 0 && e == nil { + for i = 0; i+16 <= count; i += 16 { + key = util.BytesToUint64(bytes[i : i+8]) + offset = util.BytesToUint32(bytes[i+8 : i+12]) + size = util.BytesToUint32(bytes[i+12 : i+16]) + if e = fn(key, offset, size); e != nil { + return e + } + } + if count%16 != 0 { + copy(bytes[:count-i], bytes[i:count]) + i = count - i + count, e = br.Read(bytes[i:]) + count += i + } else { + count, e = br.Read(bytes) + } } if e == io.EOF { - e = nil + return nil } - return nm, e + return e } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { diff --git a/go/storage/store.go b/go/storage/store.go index 954bae0ae..0889c9330 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -20,6 +20,8 @@ type Store struct { MaxVolumeCount int masterNode string + dataCenter string //optional informaton, overwriting master setting if exists + rack string //optional information, overwriting master setting if exists connected bool volumeSizeLimit uint64 //read from the master @@ -99,6 +101,16 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error { } return s.volumes[vid].commitCompact() } +func (s *Store) FreezeVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") + } + if s.volumes[vid].readOnly { + return errors.New("Volume " + volumeIdString + " is already read-only") + } + return s.volumes[vid].freeze() +} func (s *Store) loadExistingVolumes() { if dirs, err := ioutil.ReadDir(s.dir); err == nil { for _, dir := range dirs { @@ -138,6 +150,12 @@ type JoinResult struct { func (s *Store) SetMaster(mserver string) { s.masterNode = mserver } +func (s *Store) SetDataCenter(dataCenter string) { + s.dataCenter = dataCenter +} +func (s *Store) SetRack(rack string) { + s.rack = rack +} func (s *Store) Join() error { stats := new([]*VolumeInfo) for k, v := range s.volumes { @@ -159,6 +177,8 @@ func (s *Store) Join() error { values.Add("publicUrl", s.PublicUrl) values.Add("volumes", string(bytes)) values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) + values.Add("dataCenter", s.dataCenter) + values.Add("rack", s.rack) jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values) if err != nil { return err diff --git a/go/storage/volume.go b/go/storage/volume.go index 98f712433..4e6db3634 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -70,10 +70,29 @@ func (v *Volume) load(alsoLoadIndex bool) error { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) - if ie != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) - } + var indexFile *os.File + if v.readOnly { + if indexFile, e = os.Open(fileName + ".idx"); e != nil && !os.IsNotExist(e) { + return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e) + } + if indexFile != nil { + log.Printf("converting %s.idx to %s.cdb", fileName, fileName) + if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { + log.Printf("error converting %s.idx to %s.cdb: %s", fileName, fileName) + } else { + indexFile.Close() + os.Remove(indexFile.Name()) + indexFile = nil + } + } + v.nm, e = OpenCdbMap(fileName + ".cdb") + return e + } else { + indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + } v.nm, e = LoadNeedleMap(indexFile) } return e @@ -224,6 +243,31 @@ func (v *Volume) commitCompact() error { } return nil } +func (v *Volume) freeze() error { + if v.readOnly { + return nil + } + nm, ok := v.nm.(*NeedleMap) + if !ok { + return nil + } + v.accessLock.Lock() + defer v.accessLock.Unlock() + bn, _ := nakeFilename(v.dataFile.Name()) + cdbFn := bn + ".cdb" + log.Printf("converting %s to %s", nm.indexFile.Name(), cdbFn) + err := DumpNeedleMapToCdb(cdbFn, nm) + if err != nil { + return err + } + if v.nm, err = OpenCdbMap(cdbFn); err != nil { + return err + } + nm.indexFile.Close() + os.Remove(nm.indexFile.Name()) + v.readOnly = true + return nil +} func ScanVolumeFile(dirname string, id VolumeId, visitSuperBlock func(SuperBlock) error, diff --git a/go/topology/configuration.go b/go/topology/configuration.go index 4c8424214..058600a7c 100644 --- a/go/topology/configuration.go +++ b/go/topology/configuration.go @@ -46,11 +46,20 @@ func (c *Configuration) String() string { return "" } -func (c *Configuration) Locate(ip string) (dc string, rack string) { - if c != nil && c.ip2location != nil { - if loc, ok := c.ip2location[ip]; ok { - return loc.dcName, loc.rackName +func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) { + if dcName == "" { + if c != nil && c.ip2location != nil { + if loc, ok := c.ip2location[ip]; ok { + return loc.dcName, loc.rackName + } + } + } else { + if rackName == "" { + return dcName, "DefaultRack" + } else { + return dcName, rackName } } + return "DefaultDataCenter", "DefaultRack" } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index ea4ea5d39..3a6edb447 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -34,8 +34,11 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { dn.volumes[v.Id] = v } } +func (dn *DataNode) GetDataCenter() *DataCenter { + return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) +} func (dn *DataNode) GetTopology() *Topology { - p := dn.parent + p := dn.Parent() for p.Parent() != nil { p = p.Parent() } diff --git a/go/topology/node.go b/go/topology/node.go index 786f76702..d61f01244 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -10,7 +10,7 @@ type Node interface { Id() NodeId String() string FreeSpace() int - ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) + ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) UpAdjustVolumeCountDelta(volumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) @@ -26,6 +26,8 @@ type Node interface { CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) IsDataNode() bool + IsRack() bool + IsDataCenter() bool Children() map[NodeId]Node Parent() Node @@ -78,23 +80,26 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) { +func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) { ret := false var assignedNode *DataNode for _, node := range n.children { freeSpace := node.FreeSpace() - //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) + // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue } + if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) { + continue + } if r >= freeSpace { r -= freeSpace } else { if node.IsDataNode() && node.FreeSpace() > 0 { - //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) return true, node.(*DataNode) } - ret, assignedNode = node.ReserveOneVolume(r, vid) + ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter) if ret { break } diff --git a/go/topology/node_list.go b/go/topology/node_list.go index db7723714..2be90b123 100644 --- a/go/topology/node_list.go +++ b/go/topology/node_list.go @@ -30,23 +30,37 @@ func (nl *NodeList) FreeSpace() int { return freeSpace } -func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { +func (nl *NodeList) RandomlyPickN(count int, minSpace int, firstNodeName string) ([]Node, bool) { var list []Node + var preferredNode *Node + if firstNodeName != "" { + for _, n := range nl.nodes { + if n.Id() == NodeId(firstNodeName) && n.FreeSpace() >= minSpace { + preferredNode = &n + break + } + } + if preferredNode == nil { + return list, false + } + } + for _, n := range nl.nodes { - if n.FreeSpace() >= min { + if n.FreeSpace() >= minSpace && n.Id() != NodeId(firstNodeName) { list = append(list, n) } } - if n > len(list) { + if count > len(list) || count == len(list) && firstNodeName != "" { return nil, false } - for i := n; i > 0; i-- { + for i := len(list); i > 0; i-- { r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t + list[r], list[i-1] = list[i-1], list[r] } - return list[len(list)-n:], true + if firstNodeName != "" { + list[0] = *preferredNode + } + return list[:count], true } func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) { diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go index c6e530724..0037cbaa9 100644 --- a/go/topology/node_list_test.go +++ b/go/topology/node_list_test.go @@ -20,22 +20,38 @@ func TestXYZ(t *testing.T) { } nl := NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(1, 0) + picked, ret := nl.RandomlyPickN(1, 0, "") if !ret || len(picked) != 1 { t.Error("need to randomly pick 1 node") } - picked, ret = nl.RandomlyPickN(4, 0) + picked, ret = nl.RandomlyPickN(1, 0, "dc1") + if !ret || len(picked) != 1 { + t.Error("need to randomly pick 1 node") + } + if picked[0].Id() != "dc1" { + t.Error("need to randomly pick 1 dc1 node") + } + + picked, ret = nl.RandomlyPickN(2, 0, "dc1") + if !ret || len(picked) != 2 { + t.Error("need to randomly pick 1 node") + } + if picked[0].Id() != "dc1" { + t.Error("need to randomly pick 2 with one dc1 node") + } + + picked, ret = nl.RandomlyPickN(4, 0, "") if !ret || len(picked) != 4 { t.Error("need to randomly pick 4 nodes") } - picked, ret = nl.RandomlyPickN(5, 0) + picked, ret = nl.RandomlyPickN(5, 0, "") if !ret || len(picked) != 5 { t.Error("need to randomly pick 5 nodes") } - picked, ret = nl.RandomlyPickN(6, 0) + picked, ret = nl.RandomlyPickN(6, 0, "") if ret || len(picked) != 0 { t.Error("can not randomly pick 6 nodes:", ret, picked) } diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go index 99e570821..d5ea08086 100644 --- a/go/topology/topo_test.go +++ b/go/topology/topo_test.go @@ -127,7 +127,10 @@ func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) rand.Seed(time.Now().UnixNano()) rand.Seed(1) - ret, node, vid := topo.RandomlyReserveOneVolume() + ret, node, vid := topo.RandomlyReserveOneVolume("dc1") + if node.Parent().Parent().Id() != NodeId("dc1") { + t.Fail() + } fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) } diff --git a/go/topology/topology.go b/go/topology/topology.go index 5dcc56204..e488319d1 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -4,6 +4,7 @@ import ( "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" "errors" + "fmt" "io/ioutil" "log" "math/rand" @@ -71,25 +72,13 @@ func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { return nil } -func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { +func (t *Topology) RandomlyReserveOneVolume(dataCenter string) (bool, *DataNode, *storage.VolumeId) { if t.FreeSpace() <= 0 { + fmt.Println("Topology does not have free space left!") return false, nil, nil } vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid) - return ret, node, &vid -} - -func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) { - freeSpace := t.FreeSpace() - for _, node := range except { - freeSpace -= node.FreeSpace() - } - if freeSpace <= 0 { - return false, nil, nil - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid) + ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid, dataCenter) return ret, node, &vid } @@ -98,12 +87,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return vid.Next() } -func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { +func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { replicationTypeIndex := repType.GetReplicationLevelIndex() if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) } - vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count) + vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -114,6 +103,7 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (str func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { replicationTypeIndex := repType.GetReplicationLevelIndex() if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + fmt.Println("adding replication type", repType) t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) } return t.replicaType2VolumeLayout[replicationTypeIndex] @@ -123,8 +113,8 @@ func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) } -func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { - dcName, rackName := t.configuration.Locate(ip) +func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { + dcName, rackName = t.configuration.Locate(ip, dcName, rackName) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) dn := rack.FindDataNode(ip, port) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index cd725c132..d8ed49b0b 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -51,22 +51,52 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { return nil } -func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 { fmt.Println("No more writable volumes!") return nil, 0, nil, errors.New("No more writable volumes!") } - vid := vl.writables[rand.Intn(len_writers)] - locationList := vl.vid2location[vid] - if locationList != nil { + if dataCenter == "" { + vid := vl.writables[rand.Intn(len_writers)] + locationList := vl.vid2location[vid] + if locationList != nil { + return &vid, count, locationList, nil + } + return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + } else { + var vid storage.VolumeId + var locationList *VolumeLocationList + counter := 0 + for _, v := range vl.writables { + volumeLocationList := vl.vid2location[v] + for _, dn := range volumeLocationList.list { + if dn.GetDataCenter().Id() == NodeId(dataCenter) { + counter++ + if rand.Intn(counter) < 1 { + vid, locationList = v, volumeLocationList + } + } + } + } return &vid, count, locationList, nil } - return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!") } -func (vl *VolumeLayout) GetActiveVolumeCount() int { - return len(vl.writables) +func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int { + if dataCenter == "" { + return len(vl.writables) + } + counter := 0 + for _, v := range vl.writables { + for _, dn := range vl.vid2location[v].list { + if dn.GetDataCenter().Id() == NodeId(dataCenter) { + counter++ + } + } + } + return counter } func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { diff --git a/go/weed/master.go b/go/weed/master.go index f6cc88df0..7da7831bf 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -77,25 +77,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { if repType == "" { repType = *defaultRepType } + dataCenter := r.FormValue("dataCenter") rt, err := storage.NewReplicationTypeFromString(repType) if err != nil { w.WriteHeader(http.StatusNotAcceptable) writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) return } - if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { + + if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 { if topo.FreeSpace() <= 0 { w.WriteHeader(http.StatusNotFound) writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) return } else { - if _, err = vg.GrowByType(rt, topo); err != nil { + if _, err = vg.GrowByType(rt, dataCenter, topo); err != nil { writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) return } } } - fid, count, dn, err := topo.PickForWrite(rt, c) + fid, count, dn, err := topo.PickForWrite(rt, c, dataCenter) if err == nil { writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) } else { @@ -120,7 +122,7 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { return } debug(s, "volumes", r.FormValue("volumes")) - topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) + topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack")) m := make(map[string]interface{}) m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 writeJsonQuiet(w, r, m) @@ -151,7 +153,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { if topo.FreeSpace() < count*rt.GetCopyCount() { err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) } else { - count, err = vg.GrowByCountAndType(count, rt, topo) + count, err = vg.GrowByCountAndType(count, rt, r.FormValue("dataCneter"), topo) } } else { err = errors.New("parameter count is not found") diff --git a/go/weed/volume.go b/go/weed/volume.go index 33121388e..6cbbceaef 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -2,7 +2,7 @@ package main import ( "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/replication" + "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/storage" "log" "math/rand" @@ -38,6 +38,8 @@ var ( maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes") vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds") vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") + rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") store *storage.Store ) @@ -86,6 +88,16 @@ func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { } debug("commit compact volume =", r.FormValue("volume"), ", error =", err) } +func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) { + //TODO: notify master that this volume will be read-only + err := store.FreezeVolume(r.FormValue("volume")) + if err == nil { + writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) + } else { + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + } + debug("freeze volume =", r.FormValue("volume"), ", error =", err) +} func storeHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": @@ -289,10 +301,13 @@ func runVolume(cmd *Command, args []string) bool { http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler) http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler) http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler) + http.HandleFunc("/admin/freeze_volume", freezeVolumeHandler) go func() { connected := true store.SetMaster(*masterNode) + store.SetDataCenter(*dataCenter) + store.SetRack(*rack) for { err := store.Join() if err == nil {