mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@12 282b0af5-e82d-9cf1-ede4-77906d7719d0
This commit is contained in:
parent
25d0caa901
commit
10930ce6b6
|
@ -57,7 +57,8 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
store = storage.NewStore(*port, *publicServer, *chunkFolder)
|
//TODO: now default to 1G, this value should come from server?
|
||||||
|
store = storage.NewStore(*port, *publicServer, *chunkFolder, 1024*1024*1024, *chunkCount)
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
http.HandleFunc("/", storeHandler)
|
http.HandleFunc("/", storeHandler)
|
||||||
|
|
||||||
|
|
|
@ -10,17 +10,19 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"rand"
|
"rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
port = flag.Int("port", 9333, "http listen port")
|
port = flag.Int("port", 9333, "http listen port")
|
||||||
metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings")
|
metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings")
|
||||||
|
capacity = flag.Int("capacity", 100, "maximum number of volumes to hold")
|
||||||
mapper *directory.Mapper
|
mapper *directory.Mapper
|
||||||
)
|
)
|
||||||
|
|
||||||
func dirReadHandler(w http.ResponseWriter, r *http.Request) {
|
func dirReadHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
|
volumeId, _ := strconv.Atoi(r.FormValue("volumeId"))
|
||||||
machineList := mapper.Get((uint32)(volumeId))
|
machineList := mapper.Get(volumeId)
|
||||||
x := rand.Intn(len(machineList))
|
x := rand.Intn(len(machineList))
|
||||||
machine := machineList[x]
|
machine := machineList[x]
|
||||||
bytes, _ := json.Marshal(machine)
|
bytes, _ := json.Marshal(machine)
|
||||||
|
@ -37,9 +39,27 @@ func dirReadHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
|
func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
machineList := mapper.PickForWrite()
|
machineList := mapper.PickForWrite()
|
||||||
bytes, _ := json.Marshal(machineList)
|
writeJson(w, r, machineList)
|
||||||
callback := r.FormValue("callback")
|
}
|
||||||
|
func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
|
||||||
|
publicServer := r.FormValue("publicServer")
|
||||||
|
volumes := new([]storage.VolumeStat)
|
||||||
|
json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
|
||||||
|
capacity, _ := strconv.Atoi(r.FormValue("capacity"))
|
||||||
|
log.Println("Recieved joining request from remote address", s, "capacity=", capacity, "volumes", r.FormValue("volumes"))
|
||||||
|
vids := mapper.Add(*directory.NewMachine(s, publicServer), *volumes, capacity)
|
||||||
|
writeJson(w, r, vids)
|
||||||
|
}
|
||||||
|
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
bytes, _ := json.Marshal(mapper)
|
||||||
|
fmt.Fprint(w, bytes)
|
||||||
|
}
|
||||||
|
func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
|
||||||
w.Header().Set("Content-Type", "application/javascript")
|
w.Header().Set("Content-Type", "application/javascript")
|
||||||
|
bytes, _ := json.Marshal(obj)
|
||||||
|
callback := r.FormValue("callback")
|
||||||
if callback == "" {
|
if callback == "" {
|
||||||
w.Write(bytes)
|
w.Write(bytes)
|
||||||
} else {
|
} else {
|
||||||
|
@ -48,23 +68,12 @@ func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Write(bytes)
|
w.Write(bytes)
|
||||||
w.Write([]uint8(")"))
|
w.Write([]uint8(")"))
|
||||||
}
|
}
|
||||||
}
|
log.Println("JSON Response", string(bytes))
|
||||||
func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
s := r.FormValue("server")
|
|
||||||
publicServer := r.FormValue("publicServer")
|
|
||||||
volumes := make([]storage.VolumeStat, 0)
|
|
||||||
json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
|
|
||||||
mapper.Add(directory.NewMachine(s, publicServer), volumes)
|
|
||||||
}
|
|
||||||
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "text/plain")
|
|
||||||
bytes, _ := json.Marshal(mapper)
|
|
||||||
fmt.Fprint(w, bytes)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
mapper = directory.NewMapper(*metaFolder, "directory")
|
mapper = directory.NewMapper(*metaFolder, "directory", *capacity)
|
||||||
defer mapper.Save()
|
defer mapper.Save()
|
||||||
http.HandleFunc("/dir/read", dirReadHandler)
|
http.HandleFunc("/dir/read", dirReadHandler)
|
||||||
http.HandleFunc("/dir/write", dirWriteHandler)
|
http.HandleFunc("/dir/write", dirWriteHandler)
|
||||||
|
@ -73,8 +82,8 @@ func main() {
|
||||||
|
|
||||||
log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port))
|
log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port))
|
||||||
e := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
|
e := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
|
||||||
if e!=nil {
|
if e != nil {
|
||||||
log.Fatalf("Fail to start:",e.String())
|
log.Fatalf("Fail to start:", e.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,12 +12,14 @@ import (
|
||||||
type Machine struct {
|
type Machine struct {
|
||||||
Server string //<server name/ip>[:port]
|
Server string //<server name/ip>[:port]
|
||||||
PublicServer string
|
PublicServer string
|
||||||
|
CanWrite bool
|
||||||
}
|
}
|
||||||
type Mapper struct {
|
type Mapper struct {
|
||||||
dir string
|
dir string
|
||||||
FileName string
|
fileName string
|
||||||
Id2Machine map[uint32][]*Machine
|
capacity int
|
||||||
LastId uint32
|
Machines [][]Machine //initial version only support one copy per machine
|
||||||
|
writers [][]Machine // transient value to lookup writers fast
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMachine(server, publicServer string) (m *Machine) {
|
func NewMachine(server, publicServer string) (m *Machine) {
|
||||||
|
@ -26,57 +28,88 @@ func NewMachine(server, publicServer string) (m *Machine) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMapper(dirname string, filename string) (m *Mapper) {
|
func NewMapper(dirname string, filename string, capacity int) (m *Mapper) {
|
||||||
m = new(Mapper)
|
m = new(Mapper)
|
||||||
m.dir = dirname
|
m.dir, m.fileName, m.capacity = dirname, filename, capacity
|
||||||
m.FileName = filename
|
log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map"))
|
||||||
log.Println("Loading virtual to physical:", path.Join(m.dir,m.FileName+".map"))
|
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644)
|
||||||
dataFile, e := os.OpenFile(path.Join(m.dir,m.FileName+".map"), os.O_RDONLY, 0644)
|
m.Machines = *new([][]Machine)
|
||||||
m.Id2Machine = make(map[uint32][]*Machine)
|
m.writers = *new([][]Machine)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
log.Println("Mapping File Read", e)
|
log.Println("Mapping File Read", e)
|
||||||
} else {
|
} else {
|
||||||
decoder := gob.NewDecoder(dataFile)
|
decoder := gob.NewDecoder(dataFile)
|
||||||
decoder.Decode(m.LastId)
|
decoder.Decode(m.Machines)
|
||||||
decoder.Decode(m.Id2Machine)
|
for _, list := range m.Machines {
|
||||||
|
//TODO: what if a list has mixed readers and writers? Now it's treated as readonly
|
||||||
|
allCanWrite := false
|
||||||
|
for _, entry := range list {
|
||||||
|
allCanWrite = allCanWrite && entry.CanWrite
|
||||||
|
}
|
||||||
|
if allCanWrite {
|
||||||
|
m.writers = append(m.writers, list)
|
||||||
|
}
|
||||||
|
}
|
||||||
dataFile.Close()
|
dataFile.Close()
|
||||||
|
log.Println("Loaded mapping size", len(m.Machines))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (m *Mapper) PickForWrite() []*Machine {
|
func (m *Mapper) PickForWrite() []Machine {
|
||||||
vid := uint32(rand.Intn(len(m.Id2Machine)))
|
vid := rand.Intn(len(m.Machines))
|
||||||
return m.Id2Machine[vid]
|
return m.Machines[vid]
|
||||||
}
|
}
|
||||||
func (m *Mapper) Get(vid uint32) []*Machine {
|
func (m *Mapper) Get(vid int) []Machine {
|
||||||
return m.Id2Machine[vid]
|
return m.Machines[vid]
|
||||||
}
|
}
|
||||||
func (m *Mapper) Add(machine *Machine, volumes []storage.VolumeStat) {
|
func (m *Mapper) Add(machine Machine, volumes []storage.VolumeStat, capacity int) []int {
|
||||||
log.Println("Adding store node", machine.Server)
|
log.Println("Adding existing", machine.Server, len(volumes), "volumes to dir", len(m.Machines))
|
||||||
|
log.Println("Adding new ", machine.Server, capacity - len(volumes), "volumes to dir", len(m.Machines))
|
||||||
|
maxId := len(m.Machines)-1
|
||||||
|
for _, v := range volumes {
|
||||||
|
if maxId < int(v.Id) {
|
||||||
|
maxId = int(v.Id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := len(m.Machines); i <= maxId; i++ {
|
||||||
|
m.Machines = append(m.Machines, nil)
|
||||||
|
}
|
||||||
|
log.Println("Machine list now is", len(m.Machines))
|
||||||
for _, v := range volumes {
|
for _, v := range volumes {
|
||||||
existing := m.Id2Machine[uint32(v.Id)]
|
|
||||||
found := false
|
found := false
|
||||||
|
existing := m.Machines[v.Id]
|
||||||
for _, entry := range existing {
|
for _, entry := range existing {
|
||||||
if machine == entry {
|
if machine.Server == entry.Server {
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
m.Id2Machine[uint32(v.Id)] = append(existing, machine)
|
m.Machines[v.Id] = append(existing, machine)
|
||||||
|
log.Println("Setting volume", v.Id, "to", machine.Server)
|
||||||
}
|
}
|
||||||
log.Println(v.Id, "=>", machine.Server)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vids := new([]int)
|
||||||
|
for vid,i := len(m.Machines),len(volumes); i < capacity; i,vid=i+1,vid+1 {
|
||||||
|
list := new([]Machine)
|
||||||
|
*list = append(*list, machine)
|
||||||
|
m.Machines = append(m.Machines, *list)
|
||||||
|
log.Println("Adding volume", vid, "from", machine.Server)
|
||||||
|
*vids = append(*vids, vid)
|
||||||
|
}
|
||||||
|
|
||||||
m.Save()
|
m.Save()
|
||||||
|
log.Println("Dir size =>", len(m.Machines))
|
||||||
|
return *vids
|
||||||
}
|
}
|
||||||
func (m *Mapper) Save() {
|
func (m *Mapper) Save() {
|
||||||
log.Println("Saving virtual to physical:", path.Join(m.dir,m.FileName+".map"))
|
log.Println("Saving virtual to physical:", path.Join(m.dir, m.fileName+".map"))
|
||||||
dataFile, e := os.OpenFile(path.Join(m.dir,m.FileName+".map"), os.O_CREATE|os.O_WRONLY, 0644)
|
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
log.Fatalf("Mapping File Save [ERROR] %s\n", e)
|
log.Fatalf("Mapping File Save [ERROR] %s\n", e)
|
||||||
}
|
}
|
||||||
defer dataFile.Close()
|
defer dataFile.Close()
|
||||||
m.Id2Machine = make(map[uint32][]*Machine)
|
|
||||||
encoder := gob.NewEncoder(dataFile)
|
encoder := gob.NewEncoder(dataFile)
|
||||||
encoder.Encode(m.LastId)
|
encoder.Encode(m.Machines)
|
||||||
encoder.Encode(m.Id2Machine)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,50 +11,64 @@ import (
|
||||||
|
|
||||||
type Store struct {
|
type Store struct {
|
||||||
volumes map[uint64]*Volume
|
volumes map[uint64]*Volume
|
||||||
|
capacity int
|
||||||
dir string
|
dir string
|
||||||
Port int
|
Port int
|
||||||
PublicServer string
|
PublicServer string
|
||||||
}
|
}
|
||||||
type VolumeStat struct {
|
type VolumeStat struct {
|
||||||
Id uint64 "id"
|
Id uint64 "id"
|
||||||
Status int "status" //0:read, 1:write
|
CanWrite bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStore(port int, publicServer, dirname string) (s *Store) {
|
func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) {
|
||||||
s = new(Store)
|
s = new(Store)
|
||||||
s.Port, s.PublicServer, s.dir = port, publicServer, dirname
|
s.Port, s.PublicServer, s.dir, s.capacity = port, publicServer, dirname, capacity
|
||||||
s.volumes = make(map[uint64]*Volume)
|
s.volumes = make(map[uint64]*Volume)
|
||||||
|
|
||||||
counter := uint64(0)
|
|
||||||
files, _ := ioutil.ReadDir(dirname)
|
files, _ := ioutil.ReadDir(dirname)
|
||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") {
|
if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
id, err := strconv.Atoui64(f.Name[:-4])
|
id, err := strconv.Atoui64(f.Name[0:(strings.LastIndex(f.Name, ".dat"))])
|
||||||
if err == nil {
|
log.Println("Loading data file name:", f.Name)
|
||||||
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.volumes[counter] = NewVolume(s.dir, id)
|
s.volumes[id] = NewVolume(s.dir, id)
|
||||||
counter++
|
|
||||||
}
|
}
|
||||||
log.Println("Store started on dir:", dirname, "with", counter, "existing volumes")
|
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "existing volumes")
|
||||||
|
log.Println("Expected capacity=", s.capacity, "volumes")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) Join(mserver string) {
|
func (s *Store) Join(mserver string) {
|
||||||
stats := make([]*VolumeStat, len(s.volumes))
|
stats := new([]*VolumeStat)
|
||||||
for k, _ := range s.volumes {
|
for k, _ := range s.volumes {
|
||||||
s := new(VolumeStat)
|
s := new(VolumeStat)
|
||||||
s.Id, s.Status = k, 1
|
s.Id, s.CanWrite = k, true
|
||||||
stats = append(stats, s)
|
*stats = append(*stats, s)
|
||||||
}
|
}
|
||||||
bytes, _ := json.Marshal(stats)
|
bytes, _ := json.Marshal(stats)
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Add("port", strconv.Itoa(s.Port))
|
values.Add("port", strconv.Itoa(s.Port))
|
||||||
values.Add("publicServer", s.PublicServer)
|
values.Add("publicServer", s.PublicServer)
|
||||||
values.Add("volumes", string(bytes))
|
values.Add("volumes", string(bytes))
|
||||||
post("http://"+mserver+"/dir/join", values)
|
log.Println("Registering exiting volumes", string(bytes))
|
||||||
|
values.Add("capacity", strconv.Itoa(s.capacity))
|
||||||
|
retString := post("http://"+mserver+"/dir/join", values)
|
||||||
|
if retString != nil {
|
||||||
|
newVids := new([]int)
|
||||||
|
log.Println("Instructed to create volume",string(retString))
|
||||||
|
e := json.Unmarshal(retString, newVids)
|
||||||
|
if e == nil {
|
||||||
|
for _, vid := range *newVids {
|
||||||
|
s.volumes[uint64(vid)] = NewVolume(s.dir, uint64(vid))
|
||||||
|
log.Println("Adding volume", vid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
func (s *Store) Close() {
|
func (s *Store) Close() {
|
||||||
for _, v := range s.volumes {
|
for _, v := range s.volumes {
|
||||||
|
|
|
@ -34,17 +34,17 @@ func uint32toBytes(b []byte, v uint32){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func post(url string, values url.Values)string{
|
func post(url string, values url.Values)[]byte{
|
||||||
r, err := http.PostForm(url, values)
|
r, err := http.PostForm(url, values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("post:", err)
|
log.Println("post:", err)
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
b, err := ioutil.ReadAll(r.Body)
|
b, err := ioutil.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("post:", err)
|
log.Println("post:", err)
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
return string(b)
|
return b
|
||||||
}
|
}
|
|
@ -22,7 +22,6 @@ func NewVolume(dirname string, id uint64) (v *Volume) {
|
||||||
v.dir = dirname
|
v.dir = dirname
|
||||||
v.Id = id
|
v.Id = id
|
||||||
fileName := strconv.Uitoa64(v.Id)
|
fileName := strconv.Uitoa64(v.Id)
|
||||||
log.Println("file", v.dir, "/", fileName)
|
|
||||||
v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
|
v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
log.Fatalf("New Volume [ERROR] %s\n", e)
|
log.Fatalf("New Volume [ERROR] %s\n", e)
|
||||||
|
@ -39,6 +38,13 @@ func NewVolume(dirname string, id uint64) (v *Volume) {
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
func (v *Volume) CanWrite(limit int64) bool {
|
||||||
|
stat, e:=v.dataFile.Stat()
|
||||||
|
if e!=nil{
|
||||||
|
return stat.Size < limit
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
func (v *Volume) Close() {
|
func (v *Volume) Close() {
|
||||||
close(v.accessChannel)
|
close(v.accessChannel)
|
||||||
v.dataFile.Close()
|
v.dataFile.Close()
|
||||||
|
|
Loading…
Reference in a new issue