mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
add dataCenter option when assign file keys
add dataCenter option when starting volume servers some work related to freeze a volume. Not tested yet.
This commit is contained in:
parent
715d327df0
commit
50269b74ce
|
@ -31,24 +31,24 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
|
||||||
return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
|
return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) {
|
func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (int, error) {
|
||||||
switch repType {
|
switch repType {
|
||||||
case storage.Copy000:
|
case storage.Copy000:
|
||||||
return vg.GrowByCountAndType(vg.copy1factor, repType, topo)
|
return vg.GrowByCountAndType(vg.copy1factor, repType, dataCenter, topo)
|
||||||
case storage.Copy001:
|
case storage.Copy001:
|
||||||
return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
|
return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
|
||||||
case storage.Copy010:
|
case storage.Copy010:
|
||||||
return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
|
return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
|
||||||
case storage.Copy100:
|
case storage.Copy100:
|
||||||
return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
|
return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
|
||||||
case storage.Copy110:
|
case storage.Copy110:
|
||||||
return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
|
return vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo)
|
||||||
case storage.Copy200:
|
case storage.Copy200:
|
||||||
return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
|
return vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo)
|
||||||
}
|
}
|
||||||
return 0, errors.New("Unknown Replication Type!")
|
return 0, errors.New("Unknown Replication Type!")
|
||||||
}
|
}
|
||||||
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
|
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) {
|
||||||
vg.accessLock.Lock()
|
vg.accessLock.Lock()
|
||||||
defer vg.accessLock.Unlock()
|
defer vg.accessLock.Unlock()
|
||||||
|
|
||||||
|
@ -56,16 +56,20 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
|
||||||
switch repType {
|
switch repType {
|
||||||
case storage.Copy000:
|
case storage.Copy000:
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
if ok, server, vid := topo.RandomlyReserveOneVolume(); ok {
|
if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
|
||||||
if err = vg.grow(topo, *vid, repType, server); err == nil {
|
if err = vg.grow(topo, *vid, repType, server); err == nil {
|
||||||
counter++
|
counter++
|
||||||
|
} else {
|
||||||
|
return counter, err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return counter, fmt.Errorf("Failed to grown volume for data center %s", dataCenter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case storage.Copy001:
|
case storage.Copy001:
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
//randomly pick one server, and then choose from the same rack
|
//randomly pick one server from the datacenter, and then choose from the same rack
|
||||||
if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
|
if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
|
||||||
rack := server1.Parent()
|
rack := server1.Parent()
|
||||||
exclusion := make(map[string]topology.Node)
|
exclusion := make(map[string]topology.Node)
|
||||||
exclusion[server1.String()] = server1
|
exclusion[server1.String()] = server1
|
||||||
|
@ -81,8 +85,8 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
|
||||||
}
|
}
|
||||||
case storage.Copy010:
|
case storage.Copy010:
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
//randomly pick one server, and then choose from the same rack
|
//randomly pick one server from the datacenter, and then choose from the a different rack
|
||||||
if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
|
if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
|
||||||
rack := server1.Parent()
|
rack := server1.Parent()
|
||||||
dc := rack.Parent()
|
dc := rack.Parent()
|
||||||
exclusion := make(map[string]topology.Node)
|
exclusion := make(map[string]topology.Node)
|
||||||
|
@ -100,28 +104,32 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
|
||||||
case storage.Copy100:
|
case storage.Copy100:
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
nl := topology.NewNodeList(topo.Children(), nil)
|
nl := topology.NewNodeList(topo.Children(), nil)
|
||||||
picked, ret := nl.RandomlyPickN(2, 1)
|
picked, ret := nl.RandomlyPickN(2, 1, dataCenter)
|
||||||
vid := topo.NextVolumeId()
|
vid := topo.NextVolumeId()
|
||||||
|
println("growing on picked servers", picked)
|
||||||
if ret {
|
if ret {
|
||||||
var servers []*topology.DataNode
|
var servers []*topology.DataNode
|
||||||
for _, n := range picked {
|
for _, n := range picked {
|
||||||
if n.FreeSpace() > 0 {
|
if n.FreeSpace() > 0 {
|
||||||
if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
|
if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok {
|
||||||
servers = append(servers, server)
|
servers = append(servers, server)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
println("growing on servers", servers)
|
||||||
if len(servers) == 2 {
|
if len(servers) == 2 {
|
||||||
if err = vg.grow(topo, vid, repType, servers...); err == nil {
|
if err = vg.grow(topo, vid, repType, servers...); err == nil {
|
||||||
counter++
|
counter++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return counter, fmt.Errorf("Failed to grown volume on data center %s and another data center", dataCenter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case storage.Copy110:
|
case storage.Copy110:
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
nl := topology.NewNodeList(topo.Children(), nil)
|
nl := topology.NewNodeList(topo.Children(), nil)
|
||||||
picked, ret := nl.RandomlyPickN(2, 2)
|
picked, ret := nl.RandomlyPickN(2, 2, dataCenter)
|
||||||
vid := topo.NextVolumeId()
|
vid := topo.NextVolumeId()
|
||||||
if ret {
|
if ret {
|
||||||
var servers []*topology.DataNode
|
var servers []*topology.DataNode
|
||||||
|
@ -130,7 +138,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
|
||||||
dc1, dc2 = dc2, dc1
|
dc1, dc2 = dc2, dc1
|
||||||
}
|
}
|
||||||
if dc1.FreeSpace() > 0 {
|
if dc1.FreeSpace() > 0 {
|
||||||
if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok {
|
if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid, ""); ok {
|
||||||
servers = append(servers, server1)
|
servers = append(servers, server1)
|
||||||
rack := server1.Parent()
|
rack := server1.Parent()
|
||||||
exclusion := make(map[string]topology.Node)
|
exclusion := make(map[string]topology.Node)
|
||||||
|
@ -144,7 +152,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if dc2.FreeSpace() > 0 {
|
if dc2.FreeSpace() > 0 {
|
||||||
if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok {
|
if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid, ""); ok {
|
||||||
servers = append(servers, server)
|
servers = append(servers, server)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,13 +166,13 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
|
||||||
case storage.Copy200:
|
case storage.Copy200:
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
nl := topology.NewNodeList(topo.Children(), nil)
|
nl := topology.NewNodeList(topo.Children(), nil)
|
||||||
picked, ret := nl.RandomlyPickN(3, 1)
|
picked, ret := nl.RandomlyPickN(3, 1, dataCenter)
|
||||||
vid := topo.NextVolumeId()
|
vid := topo.NextVolumeId()
|
||||||
if ret {
|
if ret {
|
||||||
var servers []*topology.DataNode
|
var servers []*topology.DataNode
|
||||||
for _, n := range picked {
|
for _, n := range picked {
|
||||||
if n.FreeSpace() > 0 {
|
if n.FreeSpace() > 0 {
|
||||||
if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
|
if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok {
|
||||||
servers = append(servers, server)
|
servers = append(servers, server)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,9 +5,7 @@ import (
|
||||||
"code.google.com/p/weed-fs/go/topology"
|
"code.google.com/p/weed-fs/go/topology"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var topologyLayout = `
|
var topologyLayout = `
|
||||||
|
@ -80,7 +78,7 @@ func setup(topologyLayout string) *topology.Topology {
|
||||||
fmt.Println("data:", data)
|
fmt.Println("data:", data)
|
||||||
|
|
||||||
//need to connect all nodes first before server adding volumes
|
//need to connect all nodes first before server adding volumes
|
||||||
topo, err := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf",
|
topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
|
||||||
"/tmp", "testing", 32*1024, 5)
|
"/tmp", "testing", 32*1024, 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("error: " + err.Error())
|
panic("error: " + err.Error())
|
||||||
|
@ -125,12 +123,3 @@ func TestRemoveDataCenter(t *testing.T) {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReserveOneVolume(t *testing.T) {
|
|
||||||
topo := setup(topologyLayout)
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
|
||||||
vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4}
|
|
||||||
if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil {
|
|
||||||
t.Log("reserved", c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -52,14 +52,7 @@ const (
|
||||||
|
|
||||||
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
|
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||||
nm := NewNeedleMap(file)
|
nm := NewNeedleMap(file)
|
||||||
bufferReader := bufio.NewReaderSize(nm.indexFile, 1024*1024)
|
e := walkIndexFile(file, func(key uint64, offset, size uint32) error {
|
||||||
bytes := make([]byte, 16*RowsToRead)
|
|
||||||
count, e := bufferReader.Read(bytes)
|
|
||||||
for count > 0 && e == nil {
|
|
||||||
for i := 0; i < count; i += 16 {
|
|
||||||
key := util.BytesToUint64(bytes[i : i+8])
|
|
||||||
offset := util.BytesToUint32(bytes[i+8 : i+12])
|
|
||||||
size := util.BytesToUint32(bytes[i+12 : i+16])
|
|
||||||
nm.FileCounter++
|
nm.FileCounter++
|
||||||
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
|
@ -75,14 +68,45 @@ func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||||
nm.DeletionCounter++
|
nm.DeletionCounter++
|
||||||
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
|
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return nm, e
|
||||||
}
|
}
|
||||||
|
|
||||||
count, e = bufferReader.Read(bytes)
|
// walks through the index file, calls fn function with each key, offset, size
|
||||||
|
// stops with the error returned by the fn function
|
||||||
|
func walkIndexFile(r io.Reader, fn func(key uint64, offset, size uint32) error) error {
|
||||||
|
br := bufio.NewReaderSize(r, 1024*1024)
|
||||||
|
bytes := make([]byte, 16*RowsToRead)
|
||||||
|
count, e := br.Read(bytes)
|
||||||
|
var (
|
||||||
|
key uint64
|
||||||
|
offset, size uint32
|
||||||
|
i int
|
||||||
|
)
|
||||||
|
|
||||||
|
for count > 0 && e == nil {
|
||||||
|
for i = 0; i+16 <= count; i += 16 {
|
||||||
|
key = util.BytesToUint64(bytes[i : i+8])
|
||||||
|
offset = util.BytesToUint32(bytes[i+8 : i+12])
|
||||||
|
size = util.BytesToUint32(bytes[i+12 : i+16])
|
||||||
|
if e = fn(key, offset, size); e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count%16 != 0 {
|
||||||
|
copy(bytes[:count-i], bytes[i:count])
|
||||||
|
i = count - i
|
||||||
|
count, e = br.Read(bytes[i:])
|
||||||
|
count += i
|
||||||
|
} else {
|
||||||
|
count, e = br.Read(bytes)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if e == io.EOF {
|
if e == io.EOF {
|
||||||
e = nil
|
return nil
|
||||||
}
|
}
|
||||||
return nm, e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
|
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
|
||||||
|
|
|
@ -20,6 +20,8 @@ type Store struct {
|
||||||
MaxVolumeCount int
|
MaxVolumeCount int
|
||||||
|
|
||||||
masterNode string
|
masterNode string
|
||||||
|
dataCenter string //optional informaton, overwriting master setting if exists
|
||||||
|
rack string //optional information, overwriting master setting if exists
|
||||||
connected bool
|
connected bool
|
||||||
volumeSizeLimit uint64 //read from the master
|
volumeSizeLimit uint64 //read from the master
|
||||||
|
|
||||||
|
@ -99,6 +101,16 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error {
|
||||||
}
|
}
|
||||||
return s.volumes[vid].commitCompact()
|
return s.volumes[vid].commitCompact()
|
||||||
}
|
}
|
||||||
|
func (s *Store) FreezeVolume(volumeIdString string) error {
|
||||||
|
vid, err := NewVolumeId(volumeIdString)
|
||||||
|
if err != nil {
|
||||||
|
return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
|
||||||
|
}
|
||||||
|
if s.volumes[vid].readOnly {
|
||||||
|
return errors.New("Volume " + volumeIdString + " is already read-only")
|
||||||
|
}
|
||||||
|
return s.volumes[vid].freeze()
|
||||||
|
}
|
||||||
func (s *Store) loadExistingVolumes() {
|
func (s *Store) loadExistingVolumes() {
|
||||||
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
|
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
|
||||||
for _, dir := range dirs {
|
for _, dir := range dirs {
|
||||||
|
@ -138,6 +150,12 @@ type JoinResult struct {
|
||||||
func (s *Store) SetMaster(mserver string) {
|
func (s *Store) SetMaster(mserver string) {
|
||||||
s.masterNode = mserver
|
s.masterNode = mserver
|
||||||
}
|
}
|
||||||
|
func (s *Store) SetDataCenter(dataCenter string) {
|
||||||
|
s.dataCenter = dataCenter
|
||||||
|
}
|
||||||
|
func (s *Store) SetRack(rack string) {
|
||||||
|
s.rack = rack
|
||||||
|
}
|
||||||
func (s *Store) Join() error {
|
func (s *Store) Join() error {
|
||||||
stats := new([]*VolumeInfo)
|
stats := new([]*VolumeInfo)
|
||||||
for k, v := range s.volumes {
|
for k, v := range s.volumes {
|
||||||
|
@ -159,6 +177,8 @@ func (s *Store) Join() error {
|
||||||
values.Add("publicUrl", s.PublicUrl)
|
values.Add("publicUrl", s.PublicUrl)
|
||||||
values.Add("volumes", string(bytes))
|
values.Add("volumes", string(bytes))
|
||||||
values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
|
values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
|
||||||
|
values.Add("dataCenter", s.dataCenter)
|
||||||
|
values.Add("rack", s.rack)
|
||||||
jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
|
jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -70,10 +70,29 @@ func (v *Volume) load(alsoLoadIndex bool) error {
|
||||||
e = v.maybeWriteSuperBlock()
|
e = v.maybeWriteSuperBlock()
|
||||||
}
|
}
|
||||||
if e == nil && alsoLoadIndex {
|
if e == nil && alsoLoadIndex {
|
||||||
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
|
var indexFile *os.File
|
||||||
if ie != nil {
|
if v.readOnly {
|
||||||
|
if indexFile, e = os.Open(fileName + ".idx"); e != nil && !os.IsNotExist(e) {
|
||||||
|
return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e)
|
||||||
|
}
|
||||||
|
if indexFile != nil {
|
||||||
|
log.Printf("converting %s.idx to %s.cdb", fileName, fileName)
|
||||||
|
if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
|
||||||
|
log.Printf("error converting %s.idx to %s.cdb: %s", fileName, fileName)
|
||||||
|
} else {
|
||||||
|
indexFile.Close()
|
||||||
|
os.Remove(indexFile.Name())
|
||||||
|
indexFile = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
v.nm, e = OpenCdbMap(fileName + ".cdb")
|
||||||
|
return e
|
||||||
|
} else {
|
||||||
|
indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
|
||||||
|
if e != nil {
|
||||||
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
v.nm, e = LoadNeedleMap(indexFile)
|
v.nm, e = LoadNeedleMap(indexFile)
|
||||||
}
|
}
|
||||||
return e
|
return e
|
||||||
|
@ -224,6 +243,31 @@ func (v *Volume) commitCompact() error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (v *Volume) freeze() error {
|
||||||
|
if v.readOnly {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
nm, ok := v.nm.(*NeedleMap)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
v.accessLock.Lock()
|
||||||
|
defer v.accessLock.Unlock()
|
||||||
|
bn, _ := nakeFilename(v.dataFile.Name())
|
||||||
|
cdbFn := bn + ".cdb"
|
||||||
|
log.Printf("converting %s to %s", nm.indexFile.Name(), cdbFn)
|
||||||
|
err := DumpNeedleMapToCdb(cdbFn, nm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if v.nm, err = OpenCdbMap(cdbFn); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nm.indexFile.Close()
|
||||||
|
os.Remove(nm.indexFile.Name())
|
||||||
|
v.readOnly = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func ScanVolumeFile(dirname string, id VolumeId,
|
func ScanVolumeFile(dirname string, id VolumeId,
|
||||||
visitSuperBlock func(SuperBlock) error,
|
visitSuperBlock func(SuperBlock) error,
|
||||||
|
|
|
@ -46,11 +46,20 @@ func (c *Configuration) String() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Configuration) Locate(ip string) (dc string, rack string) {
|
func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
|
||||||
|
if dcName == "" {
|
||||||
if c != nil && c.ip2location != nil {
|
if c != nil && c.ip2location != nil {
|
||||||
if loc, ok := c.ip2location[ip]; ok {
|
if loc, ok := c.ip2location[ip]; ok {
|
||||||
return loc.dcName, loc.rackName
|
return loc.dcName, loc.rackName
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if rackName == "" {
|
||||||
|
return dcName, "DefaultRack"
|
||||||
|
} else {
|
||||||
|
return dcName, rackName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return "DefaultDataCenter", "DefaultRack"
|
return "DefaultDataCenter", "DefaultRack"
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,8 +34,11 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
|
||||||
dn.volumes[v.Id] = v
|
dn.volumes[v.Id] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func (dn *DataNode) GetDataCenter() *DataCenter {
|
||||||
|
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
|
||||||
|
}
|
||||||
func (dn *DataNode) GetTopology() *Topology {
|
func (dn *DataNode) GetTopology() *Topology {
|
||||||
p := dn.parent
|
p := dn.Parent()
|
||||||
for p.Parent() != nil {
|
for p.Parent() != nil {
|
||||||
p = p.Parent()
|
p = p.Parent()
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ type Node interface {
|
||||||
Id() NodeId
|
Id() NodeId
|
||||||
String() string
|
String() string
|
||||||
FreeSpace() int
|
FreeSpace() int
|
||||||
ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode)
|
ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode)
|
||||||
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
|
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
|
||||||
UpAdjustVolumeCountDelta(volumeCountDelta int)
|
UpAdjustVolumeCountDelta(volumeCountDelta int)
|
||||||
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
|
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
|
||||||
|
@ -26,6 +26,8 @@ type Node interface {
|
||||||
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
|
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
|
||||||
|
|
||||||
IsDataNode() bool
|
IsDataNode() bool
|
||||||
|
IsRack() bool
|
||||||
|
IsDataCenter() bool
|
||||||
Children() map[NodeId]Node
|
Children() map[NodeId]Node
|
||||||
Parent() Node
|
Parent() Node
|
||||||
|
|
||||||
|
@ -78,7 +80,7 @@ func (n *NodeImpl) Parent() Node {
|
||||||
func (n *NodeImpl) GetValue() interface{} {
|
func (n *NodeImpl) GetValue() interface{} {
|
||||||
return n.value
|
return n.value
|
||||||
}
|
}
|
||||||
func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
|
func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) {
|
||||||
ret := false
|
ret := false
|
||||||
var assignedNode *DataNode
|
var assignedNode *DataNode
|
||||||
for _, node := range n.children {
|
for _, node := range n.children {
|
||||||
|
@ -87,6 +89,9 @@ func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNod
|
||||||
if freeSpace <= 0 {
|
if freeSpace <= 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if r >= freeSpace {
|
if r >= freeSpace {
|
||||||
r -= freeSpace
|
r -= freeSpace
|
||||||
} else {
|
} else {
|
||||||
|
@ -94,7 +99,7 @@ func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNod
|
||||||
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
|
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
|
||||||
return true, node.(*DataNode)
|
return true, node.(*DataNode)
|
||||||
}
|
}
|
||||||
ret, assignedNode = node.ReserveOneVolume(r, vid)
|
ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter)
|
||||||
if ret {
|
if ret {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,23 +30,37 @@ func (nl *NodeList) FreeSpace() int {
|
||||||
return freeSpace
|
return freeSpace
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
|
func (nl *NodeList) RandomlyPickN(count int, minSpace int, firstNodeName string) ([]Node, bool) {
|
||||||
var list []Node
|
var list []Node
|
||||||
|
var preferredNode *Node
|
||||||
|
if firstNodeName != "" {
|
||||||
for _, n := range nl.nodes {
|
for _, n := range nl.nodes {
|
||||||
if n.FreeSpace() >= min {
|
if n.Id() == NodeId(firstNodeName) && n.FreeSpace() >= minSpace {
|
||||||
|
preferredNode = &n
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if preferredNode == nil {
|
||||||
|
return list, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, n := range nl.nodes {
|
||||||
|
if n.FreeSpace() >= minSpace && n.Id() != NodeId(firstNodeName) {
|
||||||
list = append(list, n)
|
list = append(list, n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if n > len(list) {
|
if count > len(list) || count == len(list) && firstNodeName != "" {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
for i := n; i > 0; i-- {
|
for i := len(list); i > 0; i-- {
|
||||||
r := rand.Intn(i)
|
r := rand.Intn(i)
|
||||||
t := list[r]
|
list[r], list[i-1] = list[i-1], list[r]
|
||||||
list[r] = list[i-1]
|
|
||||||
list[i-1] = t
|
|
||||||
}
|
}
|
||||||
return list[len(list)-n:], true
|
if firstNodeName != "" {
|
||||||
|
list[0] = *preferredNode
|
||||||
|
}
|
||||||
|
return list[:count], true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) {
|
func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) {
|
||||||
|
|
|
@ -20,22 +20,38 @@ func TestXYZ(t *testing.T) {
|
||||||
}
|
}
|
||||||
nl := NewNodeList(topo.Children(), nil)
|
nl := NewNodeList(topo.Children(), nil)
|
||||||
|
|
||||||
picked, ret := nl.RandomlyPickN(1, 0)
|
picked, ret := nl.RandomlyPickN(1, 0, "")
|
||||||
if !ret || len(picked) != 1 {
|
if !ret || len(picked) != 1 {
|
||||||
t.Error("need to randomly pick 1 node")
|
t.Error("need to randomly pick 1 node")
|
||||||
}
|
}
|
||||||
|
|
||||||
picked, ret = nl.RandomlyPickN(4, 0)
|
picked, ret = nl.RandomlyPickN(1, 0, "dc1")
|
||||||
|
if !ret || len(picked) != 1 {
|
||||||
|
t.Error("need to randomly pick 1 node")
|
||||||
|
}
|
||||||
|
if picked[0].Id() != "dc1" {
|
||||||
|
t.Error("need to randomly pick 1 dc1 node")
|
||||||
|
}
|
||||||
|
|
||||||
|
picked, ret = nl.RandomlyPickN(2, 0, "dc1")
|
||||||
|
if !ret || len(picked) != 2 {
|
||||||
|
t.Error("need to randomly pick 1 node")
|
||||||
|
}
|
||||||
|
if picked[0].Id() != "dc1" {
|
||||||
|
t.Error("need to randomly pick 2 with one dc1 node")
|
||||||
|
}
|
||||||
|
|
||||||
|
picked, ret = nl.RandomlyPickN(4, 0, "")
|
||||||
if !ret || len(picked) != 4 {
|
if !ret || len(picked) != 4 {
|
||||||
t.Error("need to randomly pick 4 nodes")
|
t.Error("need to randomly pick 4 nodes")
|
||||||
}
|
}
|
||||||
|
|
||||||
picked, ret = nl.RandomlyPickN(5, 0)
|
picked, ret = nl.RandomlyPickN(5, 0, "")
|
||||||
if !ret || len(picked) != 5 {
|
if !ret || len(picked) != 5 {
|
||||||
t.Error("need to randomly pick 5 nodes")
|
t.Error("need to randomly pick 5 nodes")
|
||||||
}
|
}
|
||||||
|
|
||||||
picked, ret = nl.RandomlyPickN(6, 0)
|
picked, ret = nl.RandomlyPickN(6, 0, "")
|
||||||
if ret || len(picked) != 0 {
|
if ret || len(picked) != 0 {
|
||||||
t.Error("can not randomly pick 6 nodes:", ret, picked)
|
t.Error("can not randomly pick 6 nodes:", ret, picked)
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,10 @@ func TestReserveOneVolume(t *testing.T) {
|
||||||
topo := setup(topologyLayout)
|
topo := setup(topologyLayout)
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
rand.Seed(1)
|
rand.Seed(1)
|
||||||
ret, node, vid := topo.RandomlyReserveOneVolume()
|
ret, node, vid := topo.RandomlyReserveOneVolume("dc1")
|
||||||
|
if node.Parent().Parent().Id() != NodeId("dc1") {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
|
fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"code.google.com/p/weed-fs/go/sequence"
|
"code.google.com/p/weed-fs/go/sequence"
|
||||||
"code.google.com/p/weed-fs/go/storage"
|
"code.google.com/p/weed-fs/go/storage"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
@ -71,25 +72,13 @@ func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
|
func (t *Topology) RandomlyReserveOneVolume(dataCenter string) (bool, *DataNode, *storage.VolumeId) {
|
||||||
if t.FreeSpace() <= 0 {
|
if t.FreeSpace() <= 0 {
|
||||||
|
fmt.Println("Topology does not have free space left!")
|
||||||
return false, nil, nil
|
return false, nil, nil
|
||||||
}
|
}
|
||||||
vid := t.NextVolumeId()
|
vid := t.NextVolumeId()
|
||||||
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
|
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid, dataCenter)
|
||||||
return ret, node, &vid
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) {
|
|
||||||
freeSpace := t.FreeSpace()
|
|
||||||
for _, node := range except {
|
|
||||||
freeSpace -= node.FreeSpace()
|
|
||||||
}
|
|
||||||
if freeSpace <= 0 {
|
|
||||||
return false, nil, nil
|
|
||||||
}
|
|
||||||
vid := t.NextVolumeId()
|
|
||||||
ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
|
|
||||||
return ret, node, &vid
|
return ret, node, &vid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,12 +87,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
|
||||||
return vid.Next()
|
return vid.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
|
func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
|
||||||
replicationTypeIndex := repType.GetReplicationLevelIndex()
|
replicationTypeIndex := repType.GetReplicationLevelIndex()
|
||||||
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
|
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
|
||||||
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
|
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
|
||||||
}
|
}
|
||||||
vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
|
vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter)
|
||||||
if err != nil || datanodes.Length() == 0 {
|
if err != nil || datanodes.Length() == 0 {
|
||||||
return "", 0, nil, errors.New("No writable volumes avalable!")
|
return "", 0, nil, errors.New("No writable volumes avalable!")
|
||||||
}
|
}
|
||||||
|
@ -114,6 +103,7 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (str
|
||||||
func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
|
func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
|
||||||
replicationTypeIndex := repType.GetReplicationLevelIndex()
|
replicationTypeIndex := repType.GetReplicationLevelIndex()
|
||||||
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
|
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
|
||||||
|
fmt.Println("adding replication type", repType)
|
||||||
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
|
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
|
||||||
}
|
}
|
||||||
return t.replicaType2VolumeLayout[replicationTypeIndex]
|
return t.replicaType2VolumeLayout[replicationTypeIndex]
|
||||||
|
@ -123,8 +113,8 @@ func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
|
||||||
t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
|
t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
|
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
|
||||||
dcName, rackName := t.configuration.Locate(ip)
|
dcName, rackName = t.configuration.Locate(ip, dcName, rackName)
|
||||||
dc := t.GetOrCreateDataCenter(dcName)
|
dc := t.GetOrCreateDataCenter(dcName)
|
||||||
rack := dc.GetOrCreateRack(rackName)
|
rack := dc.GetOrCreateRack(rackName)
|
||||||
dn := rack.FindDataNode(ip, port)
|
dn := rack.FindDataNode(ip, port)
|
||||||
|
|
|
@ -51,23 +51,53 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) {
|
func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) {
|
||||||
len_writers := len(vl.writables)
|
len_writers := len(vl.writables)
|
||||||
if len_writers <= 0 {
|
if len_writers <= 0 {
|
||||||
fmt.Println("No more writable volumes!")
|
fmt.Println("No more writable volumes!")
|
||||||
return nil, 0, nil, errors.New("No more writable volumes!")
|
return nil, 0, nil, errors.New("No more writable volumes!")
|
||||||
}
|
}
|
||||||
|
if dataCenter == "" {
|
||||||
vid := vl.writables[rand.Intn(len_writers)]
|
vid := vl.writables[rand.Intn(len_writers)]
|
||||||
locationList := vl.vid2location[vid]
|
locationList := vl.vid2location[vid]
|
||||||
if locationList != nil {
|
if locationList != nil {
|
||||||
return &vid, count, locationList, nil
|
return &vid, count, locationList, nil
|
||||||
}
|
}
|
||||||
return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
|
return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
|
||||||
|
} else {
|
||||||
|
var vid storage.VolumeId
|
||||||
|
var locationList *VolumeLocationList
|
||||||
|
counter := 0
|
||||||
|
for _, v := range vl.writables {
|
||||||
|
volumeLocationList := vl.vid2location[v]
|
||||||
|
for _, dn := range volumeLocationList.list {
|
||||||
|
if dn.GetDataCenter().Id() == NodeId(dataCenter) {
|
||||||
|
counter++
|
||||||
|
if rand.Intn(counter) < 1 {
|
||||||
|
vid, locationList = v, volumeLocationList
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &vid, count, locationList, nil
|
||||||
|
}
|
||||||
|
return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vl *VolumeLayout) GetActiveVolumeCount() int {
|
func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int {
|
||||||
|
if dataCenter == "" {
|
||||||
return len(vl.writables)
|
return len(vl.writables)
|
||||||
}
|
}
|
||||||
|
counter := 0
|
||||||
|
for _, v := range vl.writables {
|
||||||
|
for _, dn := range vl.vid2location[v].list {
|
||||||
|
if dn.GetDataCenter().Id() == NodeId(dataCenter) {
|
||||||
|
counter++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return counter
|
||||||
|
}
|
||||||
|
|
||||||
func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
|
func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
|
||||||
for i, v := range vl.writables {
|
for i, v := range vl.writables {
|
||||||
|
|
|
@ -77,25 +77,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if repType == "" {
|
if repType == "" {
|
||||||
repType = *defaultRepType
|
repType = *defaultRepType
|
||||||
}
|
}
|
||||||
|
dataCenter := r.FormValue("dataCenter")
|
||||||
rt, err := storage.NewReplicationTypeFromString(repType)
|
rt, err := storage.NewReplicationTypeFromString(repType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusNotAcceptable)
|
w.WriteHeader(http.StatusNotAcceptable)
|
||||||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
|
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
|
|
||||||
|
if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 {
|
||||||
if topo.FreeSpace() <= 0 {
|
if topo.FreeSpace() <= 0 {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
|
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
if _, err = vg.GrowByType(rt, topo); err != nil {
|
if _, err = vg.GrowByType(rt, dataCenter, topo); err != nil {
|
||||||
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
|
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fid, count, dn, err := topo.PickForWrite(rt, c)
|
fid, count, dn, err := topo.PickForWrite(rt, c, dataCenter)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
|
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
|
||||||
} else {
|
} else {
|
||||||
|
@ -120,7 +122,7 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
debug(s, "volumes", r.FormValue("volumes"))
|
debug(s, "volumes", r.FormValue("volumes"))
|
||||||
topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
|
topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
|
m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
|
||||||
writeJsonQuiet(w, r, m)
|
writeJsonQuiet(w, r, m)
|
||||||
|
@ -151,7 +153,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if topo.FreeSpace() < count*rt.GetCopyCount() {
|
if topo.FreeSpace() < count*rt.GetCopyCount() {
|
||||||
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
|
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
|
||||||
} else {
|
} else {
|
||||||
count, err = vg.GrowByCountAndType(count, rt, topo)
|
count, err = vg.GrowByCountAndType(count, rt, r.FormValue("dataCneter"), topo)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err = errors.New("parameter count is not found")
|
err = errors.New("parameter count is not found")
|
||||||
|
|
|
@ -38,6 +38,8 @@ var (
|
||||||
maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes")
|
maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes")
|
||||||
vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
|
vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
|
||||||
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
||||||
|
dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
|
||||||
|
rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
|
||||||
|
|
||||||
store *storage.Store
|
store *storage.Store
|
||||||
)
|
)
|
||||||
|
@ -86,6 +88,16 @@ func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
|
debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
|
||||||
}
|
}
|
||||||
|
func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
//TODO: notify master that this volume will be read-only
|
||||||
|
err := store.FreezeVolume(r.FormValue("volume"))
|
||||||
|
if err == nil {
|
||||||
|
writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
|
||||||
|
} else {
|
||||||
|
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
|
||||||
|
}
|
||||||
|
debug("freeze volume =", r.FormValue("volume"), ", error =", err)
|
||||||
|
}
|
||||||
func storeHandler(w http.ResponseWriter, r *http.Request) {
|
func storeHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case "GET":
|
case "GET":
|
||||||
|
@ -289,10 +301,13 @@ func runVolume(cmd *Command, args []string) bool {
|
||||||
http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
|
http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
|
||||||
http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
|
http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
|
||||||
http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
|
http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
|
||||||
|
http.HandleFunc("/admin/freeze_volume", freezeVolumeHandler)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
connected := true
|
connected := true
|
||||||
store.SetMaster(*masterNode)
|
store.SetMaster(*masterNode)
|
||||||
|
store.SetDataCenter(*dataCenter)
|
||||||
|
store.SetRack(*rack)
|
||||||
for {
|
for {
|
||||||
err := store.Join()
|
err := store.Join()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
Loading…
Reference in a new issue