From 6ce41e30a47666792fa3c4346f5c33d360d25930 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 30 Sep 2012 02:20:33 -0700 Subject: [PATCH] change to 3-digit replication types --- weed-fs/src/cmd/weed/master.go | 11 +- weed-fs/src/cmd/weed/upload.go | 2 +- weed-fs/src/cmd/weed/volume.go | 5 +- weed-fs/src/pkg/replication/volume_growth.go | 161 ++++++++++++------ .../src/pkg/replication/volume_growth_test.go | 19 +-- weed-fs/src/pkg/storage/compact_map_test.go | 2 - weed-fs/src/pkg/storage/store.go | 2 +- weed-fs/src/pkg/storage/volume.go | 4 +- weed-fs/src/pkg/storage/volume_info.go | 126 +++++++++----- weed-fs/src/pkg/topology/node_list.go | 4 +- 10 files changed, 214 insertions(+), 122 deletions(-) diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index 7751a4819..a87c8f312 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -33,7 +33,7 @@ var ( volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") - defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "00", "Default replication type if not specified.") + defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") mReadTimeout = cmdMaster.Flag.Int("readTimeout", 5, "connection read timeout in seconds") ) @@ -70,7 +70,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { if repType == "" { repType = *defaultRepType } - rt, err := storage.NewReplicationType(repType) + rt, err := storage.NewReplicationTypeFromString(repType) if err != nil { writeJson(w, r, map[string]string{"error": err.Error()}) return @@ -107,12 +107,15 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - writeJson(w, r, topo.ToMap()) + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Topology"] = topo.ToMap() + writeJson(w, r, m) } func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { count := 0 - rt, err := storage.NewReplicationType(r.FormValue("replication")) + rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) if err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil { if topo.FreeSpace() < count*rt.GetCopyCount() { diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index 882459c05..e25930b5d 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -17,7 +17,7 @@ func init() { cmdUpload.Run = runUpload // break init cycle IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") - uploadReplication = cmdUpload.Flag.String("replication", "00", "replication type(00,01,10,11)") + uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)") } var cmdUpload = &Command{ diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index a9ab85c98..38946396e 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -41,7 +41,10 @@ var ( ) func statusHandler(w http.ResponseWriter, r *http.Request) { - writeJson(w, r, store.Status()) + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Volumes"] = store.Status() + writeJson(w, r, m) } func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 39597f2a6..3942c5743 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -4,9 +4,9 @@ import ( "errors" "fmt" "math/rand" + "pkg/operation" "pkg/storage" "pkg/topology" - "pkg/operation" ) /* @@ -30,15 +30,17 @@ func NewDefaultVolumeGrowth() *VolumeGrowth { func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) { switch repType { - case storage.Copy00: + case storage.Copy000: return vg.GrowByCountAndType(vg.copy1factor, repType, topo) - case storage.Copy10: + case storage.Copy001: return vg.GrowByCountAndType(vg.copy2factor, repType, topo) - case storage.Copy20: + case storage.Copy010: + return vg.GrowByCountAndType(vg.copy2factor, repType, topo) + case storage.Copy100: + return vg.GrowByCountAndType(vg.copy2factor, repType, topo) + case storage.Copy110: return vg.GrowByCountAndType(vg.copy3factor, repType, topo) - case storage.Copy01: - return vg.GrowByCountAndType(vg.copy2factor, repType, topo) - case storage.Copy11: + case storage.Copy200: return vg.GrowByCountAndType(vg.copy3factor, repType, topo) } return 0, errors.New("Unknown Replication Type!") @@ -46,7 +48,7 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { counter = 0 switch repType { - case storage.Copy00: + case storage.Copy000: for i := 0; i < count; i++ { if ok, server, vid := topo.RandomlyReserveOneVolume(); ok { if err = vg.grow(topo, *vid, repType, server); err == nil { @@ -54,49 +56,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } } } - case storage.Copy10: - for i := 0; i < count; i++ { - nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(2) - vid := topo.NextVolumeId() - if ret { - var servers []*topology.DataNode - for _, n := range picked { - if n.FreeSpace() > 0 { - if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { - servers = append(servers, server) - } - } - } - if len(servers) == 2 { - if err = vg.grow(topo, vid, repType, servers[0], servers[1]); err == nil { - counter++ - } - } - } - } - case storage.Copy20: - for i := 0; i < count; i++ { - nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(3) - vid := topo.NextVolumeId() - if ret { - var servers []*topology.DataNode - for _, n := range picked { - if n.FreeSpace() > 0 { - if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { - servers = append(servers, server) - } - } - } - if len(servers) == 3 { - if err = vg.grow(topo, vid, repType, servers[0], servers[1], servers[2]); err == nil { - counter++ - } - } - } - } - case storage.Copy01: + case storage.Copy001: for i := 0; i < count; i++ { //randomly pick one server, and then choose from the same rack if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { @@ -113,17 +73,110 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } } } - case storage.Copy11: + case storage.Copy010: for i := 0; i < count; i++ { + //randomly pick one server, and then choose from the same rack + if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { + rack := server1.Parent() + dc := rack.Parent() + exclusion := make(map[string]topology.Node) + exclusion[rack.String()] = rack + newNodeList := topology.NewNodeList(dc.Children(), exclusion) + if newNodeList.FreeSpace() > 0 { + if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { + if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { + counter++ + } + } + } + } + } + case storage.Copy100: + for i := 0; i < count; i++ { + nl := topology.NewNodeList(topo.Children(), nil) + picked, ret := nl.RandomlyPickN(2, 1) + vid := topo.NextVolumeId() + if ret { + var servers []*topology.DataNode + for _, n := range picked { + if n.FreeSpace() > 0 { + if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { + servers = append(servers, server) + } + } + } + if len(servers) == 2 { + if err = vg.grow(topo, vid, repType, servers...); err == nil { + counter++ + } + } + } + } + case storage.Copy110: + for i := 0; i < count; i++ { + nl := topology.NewNodeList(topo.Children(), nil) + picked, ret := nl.RandomlyPickN(2, 2) + vid := topo.NextVolumeId() + if ret { + var servers []*topology.DataNode + dc1, dc2 := picked[0], picked[1] + if dc2.FreeSpace() > dc1.FreeSpace() { + dc1, dc2 = dc2, dc1 + } + if dc1.FreeSpace() > 0 { + if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok { + servers = append(servers, server1) + rack := server1.Parent() + exclusion := make(map[string]topology.Node) + exclusion[rack.String()] = rack + newNodeList := topology.NewNodeList(dc1.Children(), exclusion) + if newNodeList.FreeSpace() > 0 { + if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 { + servers = append(servers, server2) + } + } + } + } + if dc2.FreeSpace() > 0 { + if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok { + servers = append(servers, server) + } + } + if len(servers) == 3 { + if err = vg.grow(topo, vid, repType, servers...); err == nil { + counter++ + } + } + } + } + case storage.Copy200: + for i := 0; i < count; i++ { + nl := topology.NewNodeList(topo.Children(), nil) + picked, ret := nl.RandomlyPickN(3, 1) + vid := topo.NextVolumeId() + if ret { + var servers []*topology.DataNode + for _, n := range picked { + if n.FreeSpace() > 0 { + if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { + servers = append(servers, server) + } + } + } + if len(servers) == 3 { + if err = vg.grow(topo, vid, repType, servers...); err == nil { + counter++ + } + } + } } - err = errors.New("Replication Type Not Implemented Yet!") } return } func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := operation.AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, RepType:repType} + vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) fmt.Println("Created Volume", vid, "on", server) diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go index 5068577b1..3cebc62db 100644 --- a/weed-fs/src/pkg/replication/volume_growth_test.go +++ b/weed-fs/src/pkg/replication/volume_growth_test.go @@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork","/tmp","testing",32*1024, 5) + topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := topology.NewDataCenter(dcKey) @@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := &storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -104,8 +104,6 @@ func setup(topologyLayout string) *topology.Topology { } } - fmt.Println("topology:", *topo) - return topo } @@ -125,15 +123,8 @@ func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) rand.Seed(time.Now().UnixNano()) vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4} - vg.GrowByType(storage.Copy20,topo) + if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{ + t.Log("reserved", c) + } } - -func TestXYZ(t *testing.T) { - dn := topology.NewDataNode("server1") - dn.Ip = "localhost" - dn.Port = 8080 - vid, _:= storage.NewVolumeId("600") - out := AllocateVolume(dn,vid,storage.Copy00) - fmt.Println(out) -} diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go index 6c3bb6e96..c05515b29 100644 --- a/weed-fs/src/pkg/storage/compact_map_test.go +++ b/weed-fs/src/pkg/storage/compact_map_test.go @@ -44,8 +44,6 @@ func TestXYZ(t *testing.T) { } } - //println("cm.list =", len(m.list)) - for i := uint32(10 * batch); i < 100*batch; i++ { v, ok := m.Get(Key(i)) if i%37 == 0 { diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index b8337d51f..7ebc028ba 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -29,7 +29,7 @@ func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *S return } func (s *Store) AddVolume(volumeListString string, replicationType string) error { - rt, e := NewReplicationType(replicationType) + rt, e := NewReplicationTypeFromString(replicationType) if e != nil { return e } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 40b322787..0e6095089 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -61,7 +61,7 @@ func (v *Volume) maybeWriteSuperBlock() { if stat.Size() == 0 { header := make([]byte, SuperBlockSize) header[0] = 1 - header[1] = byte(v.replicaType) + header[1] = v.replicaType.Byte() v.dataFile.Write(header) } } @@ -69,7 +69,7 @@ func (v *Volume) readSuperBlock() { v.dataFile.Seek(0, 0) header := make([]byte, SuperBlockSize) if _, error := v.dataFile.Read(header); error == nil { - v.replicaType = ReplicationType(header[1]) + v.replicaType, _ = NewReplicationTypeFromByte(header[1]) } } diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index ca8e3372e..3f96db21b 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -9,75 +9,119 @@ type VolumeInfo struct { Size int64 RepType ReplicationType } -type ReplicationType byte +type ReplicationType string const ( - Copy00 = ReplicationType(00) // single copy - Copy01 = ReplicationType(01) // 2 copies, each on different racks, same data center - Copy10 = ReplicationType(10) // 2 copies, each on different data center - Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center - LengthRelicationType = 5 + Copy000 = ReplicationType("000") // single copy + Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center + Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center + Copy100 = ReplicationType("100") // 2 copies, each on different data center + Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center + LengthRelicationType = 6 CopyNil = ReplicationType(255) // nil value ) -func NewReplicationType(t string) (ReplicationType, error) { +func NewReplicationTypeFromString(t string) (ReplicationType, error) { switch t { - case "00": - return Copy00, nil - case "01": - return Copy01, nil - case "10": - return Copy10, nil - case "11": - return Copy11, nil - case "20": - return Copy20, nil + case "000": + return Copy000, nil + case "001": + return Copy001, nil + case "010": + return Copy010, nil + case "100": + return Copy100, nil + case "110": + return Copy110, nil + case "200": + return Copy200, nil } - return Copy00, errors.New("Unknown Replication Type:"+t) + return Copy000, errors.New("Unknown Replication Type:"+t) } +func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { + switch b { + case byte(000): + return Copy000, nil + case byte(001): + return Copy001, nil + case byte(010): + return Copy010, nil + case byte(100): + return Copy100, nil + case byte(110): + return Copy110, nil + case byte(200): + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:"+string(b)) +} + func (r *ReplicationType) String() string { switch *r { - case Copy00: - return "00" - case Copy01: - return "01" - case Copy10: - return "10" - case Copy11: - return "11" - case Copy20: - return "20" + case Copy000: + return "000" + case Copy001: + return "001" + case Copy010: + return "010" + case Copy100: + return "100" + case Copy110: + return "110" + case Copy200: + return "200" } - return "00" + return "000" +} +func (r *ReplicationType) Byte() byte { + switch *r { + case Copy000: + return byte(000) + case Copy001: + return byte(001) + case Copy010: + return byte(010) + case Copy100: + return byte(100) + case Copy110: + return byte(110) + case Copy200: + return byte(200) + } + return byte(000) } func (repType ReplicationType)GetReplicationLevelIndex() int { switch repType { - case Copy00: + case Copy000: return 0 - case Copy01: + case Copy001: return 1 - case Copy10: - return 2 - case Copy11: + case Copy010: + return 2 + case Copy100: return 3 - case Copy20: + case Copy110: return 4 + case Copy200: + return 5 } return -1 } func (repType ReplicationType)GetCopyCount() int { switch repType { - case Copy00: + case Copy000: return 1 - case Copy01: + case Copy001: return 2 - case Copy10: + case Copy010: + return 2 + case Copy100: return 2 - case Copy11: + case Copy110: return 3 - case Copy20: + case Copy200: return 3 } return 0 diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go index 396f1b16a..1d9e1891a 100644 --- a/weed-fs/src/pkg/topology/node_list.go +++ b/weed-fs/src/pkg/topology/node_list.go @@ -30,10 +30,10 @@ func (nl *NodeList) FreeSpace() int { return freeSpace } -func (nl *NodeList) RandomlyPickN(n int) ([]Node, bool) { +func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { var list []Node for _, n := range nl.nodes { - if n.FreeSpace() > 0 { + if n.FreeSpace() >= min { list = append(list, n) } }