From 4846a7232e4c5d980d3f9ac871eb95a7de47f328 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 16 Sep 2012 23:18:47 -0700 Subject: [PATCH] adjusting return messages for growing volumes --- weed-fs/src/cmd/weed/master.go | 22 +++++++-- weed-fs/src/pkg/replication/volume_growth.go | 50 ++++++++++++-------- weed-fs/src/pkg/storage/store.go | 10 ++-- weed-fs/src/pkg/storage/volume_info.go | 22 +++++---- weed-fs/src/pkg/topology/topology.go | 4 +- weed-fs/src/pkg/topology/volume_layout.go | 2 +- 6 files changed, 69 insertions(+), 41 deletions(-) diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index 72c0d6f91..a61c65f39 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -69,7 +69,11 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { } func dirAssign2Handler(w http.ResponseWriter, r *http.Request) { c, _ := strconv.Atoi(r.FormValue("count")) - rt := storage.NewReplicationType(r.FormValue("replication")) + rt, err := storage.NewReplicationType(r.FormValue("replication")) + if err!=nil { + writeJson(w, r, map[string]string{"error": err.Error()}) + return + } if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { if topo.FreeSpace() <= 0 { writeJson(w, r, map[string]string{"error": "No free volumes left!"}) @@ -107,12 +111,22 @@ func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, topo.ToMap()) } func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - rt := storage.NewReplicationType(r.FormValue("replication")) + rt, err := storage.NewReplicationType(r.FormValue("replication")) + if err!=nil { + writeJson(w, r, map[string]string{"error": err.Error()}) + return + } count, err := strconv.Atoi(r.FormValue("count")) + if topo.FreeSpace() < count * rt.GetCopyCount() { + writeJson(w, r, map[string]string{"error": "Only "+strconv.Itoa(topo.FreeSpace())+" volumes left! Not enough for "+strconv.Itoa(count*rt.GetCopyCount())}) + return + } if err != nil { - vg.GrowByType(rt, topo) + count, err := vg.GrowByType(rt, topo) + writeJson(w, r, map[string]interface{}{"count": count, "error": err}) } else { - vg.GrowByCountAndType(count, rt, topo) + count, err := vg.GrowByCountAndType(count, rt, topo) + writeJson(w, r, map[string]interface{}{"count": count, "error": err}) } } diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 97316cb57..aad6ba107 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -31,28 +31,30 @@ func NewDefaultVolumeGrowth() *VolumeGrowth { return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} } -func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) { +func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) { switch repType { case storage.Copy00: - vg.GrowByCountAndType(vg.copy1factor, repType, topo) + return vg.GrowByCountAndType(vg.copy1factor, repType, topo) case storage.Copy10: - vg.GrowByCountAndType(vg.copy2factor, repType, topo) + return vg.GrowByCountAndType(vg.copy2factor, repType, topo) case storage.Copy20: - vg.GrowByCountAndType(vg.copy3factor, repType, topo) + return vg.GrowByCountAndType(vg.copy3factor, repType, topo) case storage.Copy01: - vg.GrowByCountAndType(vg.copy2factor, repType, topo) + return vg.GrowByCountAndType(vg.copy2factor, repType, topo) case storage.Copy11: - vg.GrowByCountAndType(vg.copy3factor, repType, topo) + return vg.GrowByCountAndType(vg.copy3factor, repType, topo) } - + return 0, nil } -func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) { +func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { + counter = 0 switch repType { case storage.Copy00: for i := 0; i < count; i++ { - ret, server, vid := topo.RandomlyReserveOneVolume() - if ret { - vg.grow(topo, *vid, repType, server) + if ok, server, vid := topo.RandomlyReserveOneVolume(); ok { + if err = vg.grow(topo, *vid, repType, server); err == nil { + counter++ + } } } case storage.Copy10: @@ -70,7 +72,9 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } } if len(servers) == 2 { - vg.grow(topo, vid, repType, servers[0], servers[1]) + if err = vg.grow(topo, vid, repType, servers[0], servers[1]); err == nil { + counter++ + } } } } @@ -89,23 +93,25 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } } if len(servers) == 3 { - vg.grow(topo, vid, repType, servers[0], servers[1], servers[2]) + if err = vg.grow(topo, vid, repType, servers[0], servers[1], servers[2]); err == nil { + counter++ + } } } } case storage.Copy01: for i := 0; i < count; i++ { //randomly pick one server, and then choose from the same rack - ret, server1, vid := topo.RandomlyReserveOneVolume() - if ret { + if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { rack := server1.Parent() exclusion := make(map[string]topology.Node) exclusion[server1.String()] = server1 newNodeList := topology.NewNodeList(rack.Children(), exclusion) if newNodeList.FreeSpace() > 0 { - ret2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid) - if ret2 { - vg.grow(topo, *vid, repType, server1, server2) + if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { + if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { + counter++ + } } } } @@ -113,10 +119,11 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio case storage.Copy11: for i := 0; i < count; i++ { } + err = errors.New("Replication Type Not Implemented Yet!") } - + return } -func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) { +func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := AllocateVolume(server, vid, repType); err == nil { vi := &storage.VolumeInfo{Id: vid, Size: 0} @@ -124,11 +131,12 @@ func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repT topo.RegisterVolumeLayout(vi, server) fmt.Println("added", vid, "to", server) } else { - //TODO: need error handling fmt.Println("Failed to assign", vid, "to", servers) + return errors.New("Failed to assign " + vid.String()) } } fmt.Println("Assigning", vid, "to", servers) + return nil } type AllocateVolumeResult struct { diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index a0238d99d..adb243e13 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -30,7 +30,11 @@ func NewStore(port int, publicUrl, dirname string, maxVolumeCount int, volumeLis log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString) return } -func (s *Store) AddVolume(volumeListString string, replicationType string) (e error) { +func (s *Store) AddVolume(volumeListString string, replicationType string) (error) { + rt, e := NewReplicationType(replicationType) + if e!=nil { + return e + } for _, range_string := range strings.Split(volumeListString, ",") { if strings.Index(range_string, "-") < 0 { id_string := range_string @@ -38,7 +42,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) (e er if err != nil { return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") } - e = s.addVolume(VolumeId(id), NewReplicationType(replicationType)) + e = s.addVolume(VolumeId(id), rt) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -50,7 +54,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) (e er return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), NewReplicationType(replicationType)); err != nil { + if err := s.addVolume(VolumeId(id), rt); err != nil { e = err } } diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index 01a56a30c..ca8e3372e 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -1,6 +1,8 @@ package storage -import () +import ( + "errors" +) type VolumeInfo struct { Id VolumeId @@ -19,20 +21,20 @@ const ( CopyNil = ReplicationType(255) // nil value ) -func NewReplicationType(t string) ReplicationType { +func NewReplicationType(t string) (ReplicationType, error) { switch t { case "00": - return Copy00 + return Copy00, nil case "01": - return Copy01 + return Copy01, nil case "10": - return Copy10 + return Copy10, nil case "11": - return Copy11 + return Copy11, nil case "20": - return Copy20 + return Copy20, nil } - return Copy00 + return Copy00, errors.New("Unknown Replication Type:"+t) } func (r *ReplicationType) String() string { switch *r { @@ -50,7 +52,7 @@ func (r *ReplicationType) String() string { return "00" } -func GetReplicationLevelIndex(repType ReplicationType) int { +func (repType ReplicationType)GetReplicationLevelIndex() int { switch repType { case Copy00: return 0 @@ -65,7 +67,7 @@ func GetReplicationLevelIndex(repType ReplicationType) int { } return -1 } -func GetCopyCount(repType ReplicationType) int { +func (repType ReplicationType)GetCopyCount() int { switch repType { case Copy00: return 1 diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 3a6bce4c3..079fb54d9 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -61,7 +61,7 @@ func (t *Topology) NextVolumeId() storage.VolumeId { } func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { - replicationTypeIndex := storage.GetReplicationLevelIndex(repType) + replicationTypeIndex := repType.GetReplicationLevelIndex() if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) } @@ -74,7 +74,7 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (str } func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { - replicationTypeIndex := storage.GetReplicationLevelIndex(repType) + replicationTypeIndex := repType.GetReplicationLevelIndex() if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) } diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 99859668e..031840e88 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -30,7 +30,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id] = NewDataNodeLocationList() } if vl.vid2location[v.Id].Add(dn) { - if len(vl.vid2location[v.Id].list) == storage.GetCopyCount(v.RepType) { + if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { if uint64(v.Size) < vl.volumeSizeLimit { vl.writables = append(vl.writables, v.Id) }