git-svn-id: https://weed-fs.googlecode.com/svn/trunk@16 282b0af5-e82d-9cf1-ede4-77906d7719d0
This commit is contained in:
chris.lu@gmail.com 2011-12-19 05:59:37 +00:00
parent 041a93887c
commit b39d1a77b4
6 changed files with 129 additions and 137 deletions

View file

@ -22,11 +22,15 @@ var (
func dirReadHandler(w http.ResponseWriter, r *http.Request) { func dirReadHandler(w http.ResponseWriter, r *http.Request) {
volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
machine := mapper.Get(volumeId) machine := mapper.Get(volumeId)
writeJson(w, r, machine) writeJson(w, r, machine.Server)
} }
func dirWriteHandler(w http.ResponseWriter, r *http.Request) { func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
machineList := mapper.PickForWrite() machine := mapper.PickForWrite()
writeJson(w, r, machineList) writeJson(w, r, machine)
}
func dirPickHandler(w http.ResponseWriter, r *http.Request) {
machine := mapper.PickForWrite()
writeJson(w, r, machine)
} }
func dirJoinHandler(w http.ResponseWriter, r *http.Request) { func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
@ -64,6 +68,7 @@ func main() {
defer mapper.Save() defer mapper.Save()
http.HandleFunc("/dir/read", dirReadHandler) http.HandleFunc("/dir/read", dirReadHandler)
http.HandleFunc("/dir/write", dirWriteHandler) http.HandleFunc("/dir/write", dirWriteHandler)
http.HandleFunc("/dir/pick", dirPickHandler)
http.HandleFunc("/dir/join", dirJoinHandler) http.HandleFunc("/dir/join", dirJoinHandler)
http.HandleFunc("/dir/status", dirStatusHandler) http.HandleFunc("/dir/status", dirStatusHandler)

View file

@ -15,15 +15,13 @@ const (
) )
type MachineInfo struct { type MachineInfo struct {
Server string //<server name/ip>[:port] Url string //<server name/ip>[:port]
PublicServer string PublicUrl string
} }
type Machine struct { type Machine struct {
MachineInfo Server MachineInfo
Server string //<server name/ip>[:port] Volumes []storage.VolumeInfo
PublicServer string Capacity int
Volumes []storage.VolumeInfo
Capacity int
} }
type Mapper struct { type Mapper struct {
@ -39,19 +37,16 @@ type Mapper struct {
GlobalVolumeSequence uint64 GlobalVolumeSequence uint64
} }
func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) (m *Machine) { func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) *Machine {
m = new(Machine) return &Machine{Server:MachineInfo{Url:server,PublicUrl:publicServer},Volumes:volumes,Capacity:capacity}
m.Server, m.PublicServer, m.Volumes, m.Capacity = server, publicServer, volumes, capacity
return
} }
func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { func NewMapper(dirname string, filename string, capacity int) (m *Mapper) {
m = new(Mapper) m = &Mapper{dir:dirname,fileName:filename,capacity:capacity}
m.dir, m.fileName, m.capacity = dirname, filename, capacity
log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map"))
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644)
m.vid2machineId = make(map[uint64]int) m.vid2machineId = make(map[uint64]int)
m.Writers = *new([]int) m.Writers = *new([]int)
if e != nil { if e != nil {
log.Println("Mapping File Read", e) log.Println("Mapping File Read", e)
m.Machines = *new([]*Machine) m.Machines = *new([]*Machine)
@ -64,7 +59,7 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) {
//add to vid2machineId map, and Writers array //add to vid2machineId map, and Writers array
for machine_index, machine := range m.Machines { for machine_index, machine := range m.Machines {
for _, v := range machine.Volumes { for _, v := range machine.Volumes {
m.vid2machineId[v.Id] = machine_index m.vid2machineId[v.Id] = machine_index
if v.Size < ChunkSizeLimit { if v.Size < ChunkSizeLimit {
m.Writers = append(m.Writers, machine_index) m.Writers = append(m.Writers, machine_index)
} }
@ -74,24 +69,21 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) {
} }
return return
} }
func (m *Mapper) PickForWrite() map[string]string { func (m *Mapper) PickForWrite() MachineInfo {
vid := rand.Intn(len(m.Writers)) vid := rand.Intn(len(m.Writers))
return map[string]string{ return m.Machines[m.Writers[vid]].Server
"server":m.Machines[m.Writers[vid]].Server,
"url":m.Machines[m.Writers[vid]].PublicServer,
}
} }
func (m *Mapper) Get(vid uint64) *Machine { func (m *Mapper) Get(vid uint64) *Machine {
return m.Machines[m.vid2machineId[vid]] return m.Machines[m.vid2machineId[vid]]
} }
func (m *Mapper) Add(machine Machine) []uint64 { func (m *Mapper) Add(machine Machine) []uint64 {
log.Println("Adding existing", machine.Server, len(machine.Volumes), "volumes to dir", len(m.Machines)) log.Println("Adding existing", machine.Server.Url, len(machine.Volumes), "volumes to dir", len(m.Machines))
log.Println("Adding new ", machine.Server, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines)) log.Println("Adding new ", machine.Server.Url, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines))
//check existing machine, linearly //check existing machine, linearly
m.lock.Lock() m.lock.Lock()
foundExistingMachineId := -1 foundExistingMachineId := -1
for index, entry := range m.Machines { for index, entry := range m.Machines {
if machine.Server == entry.Server { if machine.Server.Url == entry.Server.Url {
foundExistingMachineId = index foundExistingMachineId = index
break break
} }
@ -105,11 +97,10 @@ func (m *Mapper) Add(machine Machine) []uint64 {
//generate new volumes //generate new volumes
vids := new([]uint64) vids := new([]uint64)
for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 { for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 {
newVolume := *new(storage.VolumeInfo) newVolume := storage.VolumeInfo{Id: vid, Size: 0}
newVolume.Id, newVolume.Size = vid, 0
machine.Volumes = append(machine.Volumes, newVolume) machine.Volumes = append(machine.Volumes, newVolume)
m.vid2machineId[vid] = machineId m.vid2machineId[vid] = machineId
log.Println("Adding volume", vid, "from", machine.Server) log.Println("Adding volume", vid, "from", machine.Server.Url)
*vids = append(*vids, vid) *vids = append(*vids, vid)
m.GlobalVolumeSequence = vid + 1 m.GlobalVolumeSequence = vid + 1
} }
@ -119,7 +110,7 @@ func (m *Mapper) Add(machine Machine) []uint64 {
//add to vid2machineId map, and Writers array //add to vid2machineId map, and Writers array
for _, v := range machine.Volumes { for _, v := range machine.Volumes {
log.Println("Setting volume", v.Id, "to", machine.Server) log.Println("Setting volume", v.Id, "to", machine.Server.Url)
m.vid2machineId[v.Id] = machineId m.vid2machineId[v.Id] = machineId
if v.Size < ChunkSizeLimit { if v.Size < ChunkSizeLimit {
m.Writers = append(m.Writers, machineId) m.Writers = append(m.Writers, machineId)
@ -127,14 +118,14 @@ func (m *Mapper) Add(machine Machine) []uint64 {
} }
//setting Writers, copy-on-write because of possible updating //setting Writers, copy-on-write because of possible updating
var Writers []int var Writers []int
for machine_index, machine_entry := range m.Machines { for machine_index, machine_entry := range m.Machines {
for _, v := range machine_entry.Volumes { for _, v := range machine_entry.Volumes {
if v.Size < ChunkSizeLimit { if v.Size < ChunkSizeLimit {
Writers = append(Writers, machine_index) Writers = append(Writers, machine_index)
} }
} }
} }
m.Writers = Writers m.Writers = Writers
log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.Writers)) log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.Writers))
return *vids return *vids

View file

@ -1,7 +1,7 @@
package storage package storage
import ( import (
"io" "io"
"io/ioutil" "io/ioutil"
"http" "http"
"log" "log"
@ -9,59 +9,59 @@ import (
"strings" "strings"
) )
type Needle struct{ type Needle struct {
Cookie uint8 "random number to mitigate brute force lookups" Cookie uint8 "random number to mitigate brute force lookups"
Key uint64 "file id" Key uint64 "file id"
AlternateKey uint32 "supplemental id" AlternateKey uint32 "supplemental id"
Size uint32 "Data size" Size uint32 "Data size"
Data []byte "The actual file data" Data []byte "The actual file data"
Checksum int32 "CRC32 to check integrity" Checksum int32 "CRC32 to check integrity"
Padding []byte "Aligned to 8 bytes" Padding []byte "Aligned to 8 bytes"
}
func NewNeedle(r *http.Request)(n *Needle){
n = new(Needle)
form,fe:=r.MultipartReader()
if fe!=nil {
log.Fatalf("MultipartReader [ERROR] %s\n", fe)
}
part,_:=form.NextPart()
data,_:=ioutil.ReadAll(part)
n.Data = data
n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path,".")])
return
}
func (n *Needle) ParsePath(path string){
a := strings.Split(path,"_")
log.Println("cookie",a[0],"key",a[1],"altKey",a[2])
cookie,_ := strconv.Atoi(a[0])
n.Cookie = uint8(cookie)
n.Key,_ = strconv.Atoui64(a[1])
altKey,_ := strconv.Atoui64(a[2])
n.AlternateKey = uint32(altKey)
}
func (n *Needle) Append(w io.Writer){
header := make([]byte,17)
header[0] = n.Cookie
uint64toBytes(header[1:9],n.Key)
uint32toBytes(header[9:13],n.AlternateKey)
n.Size = uint32(len(n.Data))
uint32toBytes(header[13:17],n.Size)
w.Write(header)
w.Write(n.Data)
rest := 8-((n.Size+17+4)%8)
uint32toBytes(header[0:4],uint32(n.Checksum))
w.Write(header[0:rest+4])
}
func (n *Needle) Read(r io.Reader, size uint32){
bytes := make([]byte,size+17+4)
r.Read(bytes)
n.Cookie = bytes[0]
n.Key = bytesToUint64(bytes[1:9])
n.AlternateKey = bytesToUint32(bytes[9:13])
n.Size = bytesToUint32(bytes[13:17])
n.Data = bytes[17:17+size]
n.Checksum = int32(bytesToUint32(bytes[17+size:17+size+4]))
} }
func NewNeedle(r *http.Request) (n *Needle) {
n = new(Needle)
form, fe := r.MultipartReader()
if fe != nil {
log.Fatalf("MultipartReader [ERROR] %s\n", fe)
}
part, _ := form.NextPart()
data, _ := ioutil.ReadAll(part)
n.Data = data
n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path, ".")])
return
}
func (n *Needle) ParsePath(path string) {
a := strings.Split(path, "_")
log.Println("cookie", a[0], "key", a[1], "altKey", a[2])
cookie, _ := strconv.Atoi(a[0])
n.Cookie = uint8(cookie)
n.Key, _ = strconv.Atoui64(a[1])
altKey, _ := strconv.Atoui64(a[2])
n.AlternateKey = uint32(altKey)
}
func (n *Needle) Append(w io.Writer) {
header := make([]byte, 17)
header[0] = n.Cookie
uint64toBytes(header[1:9], n.Key)
uint32toBytes(header[9:13], n.AlternateKey)
n.Size = uint32(len(n.Data))
uint32toBytes(header[13:17], n.Size)
w.Write(header)
w.Write(n.Data)
rest := 8 - ((n.Size + 17 + 4) % 8)
uint32toBytes(header[0:4], uint32(n.Checksum))
w.Write(header[0 : rest+4])
}
func (n *Needle) Read(r io.Reader, size uint32) {
bytes := make([]byte, size+17+4)
r.Read(bytes)
n.Cookie = bytes[0]
n.Key = bytesToUint64(bytes[1:9])
n.AlternateKey = bytesToUint32(bytes[9:13])
n.Size = bytesToUint32(bytes[13:17])
n.Data = bytes[17 : 17+size]
n.Checksum = int32(bytesToUint32(bytes[17+size : 17+size+4]))
}

View file

@ -1,53 +1,53 @@
package storage package storage
import ( import (
"os" "os"
) )
type NeedleKey struct{ type NeedleKey struct {
Key uint64 "file id" Key uint64 "file id"
AlternateKey uint32 "supplemental id" AlternateKey uint32 "supplemental id"
} }
func (k *NeedleKey) String() string { func (k *NeedleKey) String() string {
var tmp [12]byte var tmp [12]byte
for i :=uint(0);i<8;i++{ for i := uint(0); i < 8; i++ {
tmp[i] = byte(k.Key >> (8*i)); tmp[i] = byte(k.Key >> (8 * i))
} }
for i :=uint(0);i<4;i++{ for i := uint(0); i < 4; i++ {
tmp[i+8] = byte(k.AlternateKey >> (8*i)); tmp[i+8] = byte(k.AlternateKey >> (8 * i))
} }
return string(tmp[:]) return string(tmp[:])
} }
type NeedleValue struct{ type NeedleValue struct {
Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G
Size uint32 "Size of the data portion" Size uint32 "Size of the data portion"
} }
type NeedleMap struct{ type NeedleMap struct {
m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue
} }
func NewNeedleMap() (nm *NeedleMap){
nm = new(NeedleMap) func NewNeedleMap() *NeedleMap {
nm.m = make(map[string]*NeedleValue) return &NeedleMap{m: make(map[string]*NeedleValue)}
return
} }
func (nm *NeedleMap) load(file *os.File){ func (nm *NeedleMap) load(file *os.File) {
} }
func makeKey(key uint64, altKey uint32) string { func makeKey(key uint64, altKey uint32) string {
var tmp [12]byte var tmp [12]byte
for i :=uint(0);i<8;i++{ for i := uint(0); i < 8; i++ {
tmp[i] = byte(key >> (8*i)); tmp[i] = byte(key >> (8 * i))
} }
for i :=uint(0);i<4;i++{ for i := uint(0); i < 4; i++ {
tmp[i+8] = byte(altKey >> (8*i)); tmp[i+8] = byte(altKey >> (8 * i))
} }
return string(tmp[:]) return string(tmp[:])
} }
func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32){ func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32) {
nm.m[makeKey(key,altKey)] = &NeedleValue{Offset:offset, Size:size} nm.m[makeKey(key, altKey)] = &NeedleValue{Offset: offset, Size: size}
} }
func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool){ func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool) {
element, ok = nm.m[makeKey(key,altKey)] element, ok = nm.m[makeKey(key, altKey)]
return return
} }

View file

@ -17,13 +17,12 @@ type Store struct {
PublicServer string PublicServer string
} }
type VolumeInfo struct { type VolumeInfo struct {
Id uint64 Id uint64
Size int64 Size int64
} }
func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) { func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) {
s = new(Store) s = &Store{Port: port, PublicServer: publicServer, dir: dirname, capacity: capacity}
s.Port, s.PublicServer, s.dir, s.capacity = port, publicServer, dirname, capacity
s.volumes = make(map[uint64]*Volume) s.volumes = make(map[uint64]*Volume)
files, _ := ioutil.ReadDir(dirname) files, _ := ioutil.ReadDir(dirname)
@ -60,7 +59,7 @@ func (s *Store) Join(mserver string) {
retString := post("http://"+mserver+"/dir/join", values) retString := post("http://"+mserver+"/dir/join", values)
if retString != nil { if retString != nil {
newVids := new([]int) newVids := new([]int)
log.Println("Instructed to create volume",string(retString)) log.Println("Instructed to create volume", string(retString))
e := json.Unmarshal(retString, newVids) e := json.Unmarshal(retString, newVids)
if e == nil { if e == nil {
for _, vid := range *newVids { for _, vid := range *newVids {

View file

@ -18,9 +18,7 @@ type Volume struct {
func NewVolume(dirname string, id uint64) (v *Volume) { func NewVolume(dirname string, id uint64) (v *Volume) {
var e os.Error var e os.Error
v = new(Volume) v = &Volume{dir:dirname,Id:id, nm:NewNeedleMap()}
v.dir = dirname
v.Id = id
fileName := strconv.Uitoa64(v.Id) fileName := strconv.Uitoa64(v.Id)
v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
if e != nil { if e != nil {
@ -30,7 +28,6 @@ func NewVolume(dirname string, id uint64) (v *Volume) {
if e != nil { if e != nil {
log.Fatalf("New Volume [ERROR] %s\n", e) log.Fatalf("New Volume [ERROR] %s\n", e)
} }
v.nm = NewNeedleMap()
v.nm.load(v.indexFile) v.nm.load(v.indexFile)
v.accessChannel = make(chan int, 1) v.accessChannel = make(chan int, 1)