replication related work on data nodes

This commit is contained in:
Chris Lu 2012-09-10 17:08:52 -07:00
parent 6daf221937
commit e4c0693b03
7 changed files with 204 additions and 111 deletions

View file

@ -1,14 +1,15 @@
1. each file can choose the replication factor 1. each file can choose the replication factor
2. replication granularity is in volume level 2. replication granularity is in volume level
3. if not enough spaces, we can automatically decrease some volume's the replication factor, especially for cold data 3. if not enough spaces, we can automatically decrease some volume's the replication factor, especially for cold data
4. support migrating data to cheaper storage 4. plan to support migrating data to cheaper storage
5. manual volume placement, access-based volume placement, auction based volume placement 5. plan to manual volume placement, access-based volume placement, auction based volume placement
When a new volume server is started, it reports When a new volume server is started, it reports
1. how many volumes it can hold 1. how many volumes it can hold
2. current list of existing volumes 2. current list of existing volumes and each volume's replication type
Each volume server remembers: Each volume server remembers:
1. current volume ids, replica locations 1. current volume ids
2. replica locations are read from the master
The master assign volume ids based on The master assign volume ids based on
1. replication factor 1. replication factor
@ -17,12 +18,13 @@ The master assign volume ids based on
On master, stores the replication configuration On master, stores the replication configuration
{ {
replication:{ replication:{
{factor:1, min_volume_count:3, weight:10}, {type:"00", min_volume_count:3, weight:10},
{factor:2, min_volume_count:2, weight:20}, {type:"01", min_volume_count:2, weight:20},
{factor:3, min_volume_count:3, weight:30} {type:"10", min_volume_count:2, weight:20},
{type:"11", min_volume_count:3, weight:30},
{type:"20", min_volume_count:2, weight:20}
}, },
port:9333, port:9333,
} }
Or manually via command line Or manually via command line
1. add volume with specified replication factor 1. add volume with specified replication factor
@ -35,8 +37,6 @@ if less than the replication factor, the volume is in readonly mode
if more than the replication factor, the volume will purge the smallest/oldest volume if more than the replication factor, the volume will purge the smallest/oldest volume
if equal, the volume will function as usual if equal, the volume will function as usual
maybe use gossip to send the volumeServer~volumes information
Use cases: Use cases:
on volume server on volume server
@ -47,13 +47,33 @@ Use cases:
Bootstrap Bootstrap
1. at the very beginning, the system has no volumes at all. 1. at the very beginning, the system has no volumes at all.
2. if maxReplicationFactor==1, always initialize volumes right away When data node starts:
3. if nServersHasFreeSpaces >= maxReplicationFactor, auto initialize 1. each data node send to master its existing volumes and max volume blocks
4. if maxReplicationFactor>1 2. master remembers the topology/data_center/rack/data_node/volumes
weed shell for each replication level, stores
> disable_auto_initialize volume id ~ data node
> enable_auto_initialize writable volume ids
> assign_free_volume vid "server1:port","server2:port","server3:port" If any "assign" request comes in
> status 1. find a writable volume with the right replicationLevel
5. 2. if not found, grow the volumes with the right replication level
3. return a writable volume to the user
For the above operations, here are the todo list:
for data node:
1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE
2. accept command to grow a volume( id + replication level) DONE
/admin/assign_volume?volume=some_id&replicationType=01
3. accept status for a volumeLocationList if replication > 1 DONE
/admin/set_volume_locations?volumeLocations=[{Vid:xxx,Locations:[loc1,loc2,loc3]}]
4. for each write, pass the write to the next location
POST method should accept an index, like ttl, get decremented every hop
for master:
1. accept data node's report of existing volumes and maxVolumeCount
2. periodically refresh for active data nodes, and adjust writable volumes
3. send command to grow a volume(id + replication level)
4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info
to other data nodes. BECAUSE the master will stop sending writes to these data nodes

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"encoding/json"
"log" "log"
"math/rand" "math/rand"
"mime" "mime"
@ -38,9 +39,23 @@ var (
func statusHandler(w http.ResponseWriter, r *http.Request) { func statusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, store.Status()) writeJson(w, r, store.Status())
} }
func addVolumeHandler(w http.ResponseWriter, r *http.Request) { func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
store.AddVolume(r.FormValue("volume")) err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
writeJson(w, r, store.Status()) if err == nil {
writeJson(w, r, map[string]string{"error": ""})
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
}
func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) {
volumeLocationsList := new([]storage.VolumeLocations)
json.Unmarshal([]byte(r.FormValue("volumeLocations")), volumeLocationsList)
err := store.SetVolumeLocations(*volumeLocationsList)
if err == nil {
writeJson(w, r, map[string]string{"error": ""})
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
} }
func storeHandler(w http.ResponseWriter, r *http.Request) { func storeHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method { switch r.Method {
@ -157,7 +172,8 @@ func runVolume(cmd *Command, args []string) bool {
defer store.Close() defer store.Close()
http.HandleFunc("/", storeHandler) http.HandleFunc("/", storeHandler)
http.HandleFunc("/status", statusHandler) http.HandleFunc("/status", statusHandler)
http.HandleFunc("/add_volume", addVolumeHandler) http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
http.HandleFunc("/admin/set_volume_locations", setVolumeLocationsHandler)
go func() { go func() {
for { for {

View file

@ -5,9 +5,9 @@ import (
"errors" "errors"
"log" "log"
"net/url" "net/url"
"pkg/util"
"strconv" "strconv"
"strings" "strings"
"pkg/util"
) )
type Store struct { type Store struct {
@ -22,12 +22,12 @@ func NewStore(port int, publicUrl, dirname string, volumeListString string) (s *
s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname} s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname}
s.volumes = make(map[VolumeId]*Volume) s.volumes = make(map[VolumeId]*Volume)
s.AddVolume(volumeListString) s.AddVolume(volumeListString, "00")
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
return return
} }
func (s *Store) AddVolume(volumeListString string) error { func (s *Store) AddVolume(volumeListString string, replicationType string) error {
for _, range_string := range strings.Split(volumeListString, ",") { for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 { if strings.Index(range_string, "-") < 0 {
id_string := range_string id_string := range_string
@ -35,7 +35,7 @@ func (s *Store) AddVolume(volumeListString string) error {
if err != nil { if err != nil {
return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!")
} }
s.addVolume(VolumeId(id)) s.addVolume(VolumeId(id), NewReplicationType(replicationType))
} else { } else {
pair := strings.Split(range_string, "-") pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64) start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -47,24 +47,24 @@ func (s *Store) AddVolume(volumeListString string) error {
return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!")
} }
for id := start; id <= end; id++ { for id := start; id <= end; id++ {
s.addVolume(VolumeId(id)) s.addVolume(VolumeId(id), NewReplicationType(replicationType))
} }
} }
} }
return nil return nil
} }
func (s *Store) addVolume(vid VolumeId) error { func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
if s.volumes[vid] != nil { if s.volumes[vid] != nil {
return errors.New("Volume Id " + vid.String() + " already exists!") return errors.New("Volume Id " + vid.String() + " already exists!")
} }
s.volumes[vid] = NewVolume(s.dir, vid) s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
return nil return nil
} }
func (s *Store) Status() *[]*VolumeInfo { func (s *Store) Status() *[]*VolumeInfo {
stats := new([]*VolumeInfo) stats := new([]*VolumeInfo)
for k, v := range s.volumes { for k, v := range s.volumes {
s := new(VolumeInfo) s := new(VolumeInfo)
s.Id, s.Size = VolumeId(k), v.Size() s.Id, s.Size, s.RepType = VolumeId(k), v.Size(), v.replicaType
*stats = append(*stats, s) *stats = append(*stats, s)
} }
return stats return stats
@ -73,7 +73,7 @@ func (s *Store) Join(mserver string) {
stats := new([]*VolumeInfo) stats := new([]*VolumeInfo)
for k, v := range s.volumes { for k, v := range s.volumes {
s := new(VolumeInfo) s := new(VolumeInfo)
s.Id, s.Size = VolumeId(k), v.Size() s.Id, s.Size, s.RepType = VolumeId(k), v.Size(), v.replicaType
*stats = append(*stats, s) *stats = append(*stats, s)
} }
bytes, _ := json.Marshal(stats) bytes, _ := json.Marshal(stats)
@ -90,22 +90,38 @@ func (s *Store) Close() {
} }
func (s *Store) Write(i VolumeId, n *Needle) uint32 { func (s *Store) Write(i VolumeId, n *Needle) uint32 {
v := s.volumes[i] v := s.volumes[i]
if v!=nil{ if v != nil {
return v.write(n) return v.write(n)
} }
return 0 return 0
} }
func (s *Store) Delete(i VolumeId, n *Needle) uint32 { func (s *Store) Delete(i VolumeId, n *Needle) uint32 {
v := s.volumes[i] v := s.volumes[i]
if v!=nil{ if v != nil {
return v.delete(n) return v.delete(n)
} }
return 0 return 0
} }
func (s *Store) Read(i VolumeId, n *Needle) (int, error) { func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
v := s.volumes[i] v := s.volumes[i]
if v!=nil{ if v != nil {
return v.read(n) return v.read(n)
} }
return 0, errors.New("Not Found") return 0, errors.New("Not Found")
} }
type VolumeLocations struct {
Vid VolumeId
Locations []string
}
func (s *Store) SetVolumeLocations(volumeLocationList []VolumeLocations) error {
for _, volumeLocations := range volumeLocationList {
vid := volumeLocations.Vid
v := s.volumes[vid]
if v != nil {
v.locations = volumeLocations.Locations
}
}
return nil
}

View file

@ -18,12 +18,17 @@ type Volume struct {
dataFile *os.File dataFile *os.File
nm *NeedleMap nm *NeedleMap
replicaType ReplicationType
accessLock sync.Mutex accessLock sync.Mutex
//transient
locations []string
} }
func NewVolume(dirname string, id VolumeId) (v *Volume) { func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {
var e error var e error
v = &Volume{dir: dirname, Id: id} v = &Volume{dir: dirname, Id: id, replicaType:replicationType}
fileName := id.String() fileName := id.String()
v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
if e != nil { if e != nil {
@ -53,7 +58,7 @@ func (v *Volume) maybeWriteSuperBlock() {
stat, _ := v.dataFile.Stat() stat, _ := v.dataFile.Stat()
if stat.Size() == 0 { if stat.Size() == 0 {
header := make([]byte, SuperBlockSize) header := make([]byte, SuperBlockSize)
header[0] = 1 //number of copies header[0] = byte(v.replicaType)
v.dataFile.Write(header) v.dataFile.Write(header)
} }
} }

View file

@ -5,9 +5,9 @@ import ()
type VolumeInfo struct { type VolumeInfo struct {
Id VolumeId Id VolumeId
Size int64 Size int64
ReplicationType ReplicationType RepType ReplicationType
} }
type ReplicationType int type ReplicationType byte
const ( const (
Copy00 = ReplicationType(00) // single copy Copy00 = ReplicationType(00) // single copy
@ -18,8 +18,24 @@ const (
LengthRelicationType = 5 LengthRelicationType = 5
) )
func NewReplicationType(t string) ReplicationType {
switch t {
case "00":
return Copy00
case "01":
return Copy01
case "10":
return Copy10
case "11":
return Copy11
case "20":
return Copy20
}
return Copy00
}
func GetReplicationLevelIndex(v *VolumeInfo) int { func GetReplicationLevelIndex(v *VolumeInfo) int {
switch v.ReplicationType { switch v.RepType {
case Copy00: case Copy00:
return 0 return 0
case Copy01: case Copy01:
@ -34,7 +50,7 @@ func GetReplicationLevelIndex(v *VolumeInfo) int {
return -1 return -1
} }
func GetCopyCount(v *VolumeInfo) int { func GetCopyCount(v *VolumeInfo) int {
switch v.ReplicationType { switch v.RepType {
case Copy00: case Copy00:
return 1 return 1
case Copy01: case Copy01:

View file

@ -29,7 +29,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
} }
vl.vid2location[v.Id].Add(dn) vl.vid2location[v.Id].Add(dn)
if len(vl.vid2location[v.Id].list) >= storage.GetCopyCount(v) { if len(vl.vid2location[v.Id].list) >= storage.GetCopyCount(v) {
vl.writables = append(vl.writables,v.Id) if uint64(v.Size) < vl.volumeSizeLimit {
vl.writables = append(vl.writables, v.Id)
}
} }
} }

View file

@ -1,7 +1,6 @@
package topology package topology
import ( import ()
)
type DataNodeLocationList struct { type DataNodeLocationList struct {
list []*DataNode list []*DataNode
@ -11,7 +10,7 @@ func NewDataNodeLocationList() *DataNodeLocationList {
return &DataNodeLocationList{} return &DataNodeLocationList{}
} }
func (dnll *DataNodeLocationList) Add(loc *DataNode){ func (dnll *DataNodeLocationList) Add(loc *DataNode) {
for _, dnl := range dnll.list { for _, dnl := range dnll.list {
if loc.ip == dnl.ip && loc.port == dnl.port { if loc.ip == dnl.ip && loc.port == dnl.port {
break break
@ -19,3 +18,22 @@ func (dnll *DataNodeLocationList) Add(loc *DataNode){
} }
dnll.list = append(dnll.list, loc) dnll.list = append(dnll.list, loc)
} }
func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) {
var changed bool
for _, dnl := range dnll.list {
if dnl.lastSeen < freshThreshHold {
changed = true
break
}
}
if changed {
var l []*DataNode
for _, dnl := range dnll.list {
if dnl.lastSeen >= freshThreshHold {
l = append(l, dnl)
}
}
dnll.list = l
}
}