From 23ecd7bb33e8c22b07f630fb963cbdc5c072adad Mon Sep 17 00:00:00 2001 From: "chris.lu@gmail.com" Date: Fri, 16 Dec 2011 14:51:26 +0000 Subject: [PATCH] split into server and clients git-svn-id: https://weed-fs.googlecode.com/svn/trunk@9 282b0af5-e82d-9cf1-ede4-77906d7719d0 --- weed-fs/src/cmd/gstore.go | 145 -------------------- weed-fs/src/cmd/weedc.go | 73 ++++++++++ weed-fs/src/cmd/weeds.go | 80 +++++++++++ weed-fs/src/pkg/directory/volume_mapping.go | 17 ++- weed-fs/src/pkg/storage/needle.go | 67 +++++++++ weed-fs/src/pkg/storage/needle_map.go | 53 +++++++ weed-fs/src/pkg/storage/store.go | 69 ++++++++++ weed-fs/src/pkg/storage/util.go | 50 +++++++ weed-fs/src/pkg/storage/volume.go | 66 +++++++++ 9 files changed, 466 insertions(+), 154 deletions(-) delete mode 100644 weed-fs/src/cmd/gstore.go create mode 100644 weed-fs/src/cmd/weedc.go create mode 100644 weed-fs/src/cmd/weeds.go create mode 100644 weed-fs/src/pkg/storage/needle.go create mode 100644 weed-fs/src/pkg/storage/needle_map.go create mode 100644 weed-fs/src/pkg/storage/store.go create mode 100644 weed-fs/src/pkg/storage/util.go create mode 100644 weed-fs/src/pkg/storage/volume.go diff --git a/weed-fs/src/cmd/gstore.go b/weed-fs/src/cmd/gstore.go deleted file mode 100644 index 5f92d3950..000000000 --- a/weed-fs/src/cmd/gstore.go +++ /dev/null @@ -1,145 +0,0 @@ -package main - -import ( - "store" - "directory" - "flag" - "fmt" - "http" - "json" - "log" - "mime" - "rand" - "strconv" - "strings" - "time" -) - -var ( - port = flag.Int("port", 8080, "http listen port") - chunkFolder = flag.String("dir", "/tmp", "data directory to store files") - chunkCount = flag.Int("chunks", 5, "data chunks to store files") - chunkEnabled = flag.Bool("data", false, "act as a store server") - chunkServer = flag.String("cserver", "localhost:8080", "chunk server to store data") - publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read") - metaServer = flag.String("mserver", "localhost:8080", "metadata server to store mappings") - - metaEnabled = flag.Bool("meta", false, "act as a directory server") - metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") -) - -type Haystack struct { - store *store.Store - directory *directory.Mapper -} - -func storeHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - server.GetHandler(w, r) - case "DELETE": - server.DeleteHandler(w, r) - case "POST": - server.PostHandler(w, r) - } -} -func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) { - n := new(store.Needle) - path := r.URL.Path - sepIndex := strings.Index(path[1:], "/") + 1 - volumeId, _ := strconv.Atoui64(path[1:sepIndex]) - dotIndex := strings.LastIndex(path, ".") - n.ParsePath(path[sepIndex+1 : dotIndex]) - ext := path[dotIndex:] - - s.store.Read(volumeId, n) - w.Header().Set("Content-Type", mime.TypeByExtension(ext)) - w.Write(n.Data) -} -func (s *Haystack) PostHandler(w http.ResponseWriter, r *http.Request) { - volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) - s.store.Write(volumeId, store.NewNeedle(r)) - w.Header().Set("Content-Type", "text/plain") - fmt.Fprint(w, "volumeId=", volumeId, "\n") -} -func (s *Haystack) DeleteHandler(w http.ResponseWriter, r *http.Request) { - -} -func dirReadHandler(w http.ResponseWriter, r *http.Request) { - volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) - machineList := server.directory.Get((uint32)(volumeId)) - x := rand.Intn(len(machineList)) - machine := machineList[x] - bytes, _ := json.Marshal(machine) - callback := r.FormValue("callback") - w.Header().Set("Content-Type", "application/javascript") - if callback == "" { - w.Write(bytes) - } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) - w.Write(bytes) - w.Write([]uint8(")")) - } -} -func dirWriteHandler(w http.ResponseWriter, r *http.Request) { - machineList := server.directory.PickForWrite() - bytes, _ := json.Marshal(machineList) - callback := r.FormValue("callback") - w.Header().Set("Content-Type", "application/javascript") - if callback == "" { - w.Write(bytes) - } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) - w.Write(bytes) - w.Write([]uint8(")")) - } -} -func dirJoinHandler(w http.ResponseWriter, r *http.Request) { - s := r.FormValue("server") - publicServer := r.FormValue("publicServer") - volumes := make([]store.VolumeStat, 0) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) - server.directory.Add(directory.NewMachine(s, publicServer), volumes) -} -func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain") - bytes, _ := json.Marshal(server.directory) - fmt.Fprint(w, bytes) -} - -var server *Haystack - -func main() { - flag.Parse() - bothEnabled := false - if !*chunkEnabled && !*metaEnabled { - bothEnabled = true - log.Println("Act as both a store server and a directory server") - } - server = new(Haystack) - if *chunkEnabled || bothEnabled { - log.Println("data stored in", *chunkFolder) - server.store = store.NewStore(*chunkServer, *publicServer, *chunkFolder) - defer server.store.Close() - http.HandleFunc("/", storeHandler) - } - if *metaEnabled || bothEnabled { - server.directory = directory.NewMapper(*metaFolder, "directory") - defer server.directory.Save() - http.HandleFunc("/dir/read", dirReadHandler) - http.HandleFunc("/dir/write", dirWriteHandler) - http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirStatusHandler) - } - go func() { - time.Sleep(3000 * 1000) - server.store.Join(*metaServer) - log.Println("store joined at", *metaServer) - }() - - log.Println("Serving at http://127.0.0.1:" + strconv.Itoa(*port)) - http.ListenAndServe(":"+strconv.Itoa(*port), nil) - -} diff --git a/weed-fs/src/cmd/weedc.go b/weed-fs/src/cmd/weedc.go new file mode 100644 index 000000000..94032a971 --- /dev/null +++ b/weed-fs/src/cmd/weedc.go @@ -0,0 +1,73 @@ +package main + +import ( + "storage" + "flag" + "fmt" + "http" + "log" + "mime" + "strconv" + "strings" +) + +var ( + port = flag.Int("port", 8080, "http listen port") + chunkFolder = flag.String("dir", "/tmp", "data directory to store files") + chunkCount = flag.Int("chunks", 5, "data chunks to store files") + publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read") + metaServer = flag.String("mserver", "localhost:9333", "metadata server to store mappings") + + store *storage.Store +) + + +func storeHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + GetHandler(w, r) + case "DELETE": + DeleteHandler(w, r) + case "POST": + PostHandler(w, r) + } +} +func GetHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + path := r.URL.Path + sepIndex := strings.Index(path[1:], "/") + 1 + volumeId, _ := strconv.Atoui64(path[1:sepIndex]) + dotIndex := strings.LastIndex(path, ".") + n.ParsePath(path[sepIndex+1 : dotIndex]) + ext := path[dotIndex:] + + store.Read(volumeId, n) + w.Header().Set("Content-Type", mime.TypeByExtension(ext)) + w.Write(n.Data) +} +func PostHandler(w http.ResponseWriter, r *http.Request) { + volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) + store.Write(volumeId, storage.NewNeedle(r)) + w.Header().Set("Content-Type", "text/plain") + fmt.Fprint(w, "volumeId=", volumeId, "\n") +} +func DeleteHandler(w http.ResponseWriter, r *http.Request) { + +} + +func main() { + flag.Parse() + store = storage.NewStore(*port, *publicServer, *chunkFolder) + defer store.Close() + http.HandleFunc("/", storeHandler) + + store.Join(*metaServer) + log.Println("store joined at", *metaServer) + + log.Println("Start storage service at http://127.0.0.1:" + strconv.Itoa(*port)) + e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) + if e!=nil { + log.Fatalf("Fail to start:",e.String()) + } + +} diff --git a/weed-fs/src/cmd/weeds.go b/weed-fs/src/cmd/weeds.go new file mode 100644 index 000000000..ad423b71e --- /dev/null +++ b/weed-fs/src/cmd/weeds.go @@ -0,0 +1,80 @@ +package main + +import ( + "storage" + "directory" + "flag" + "fmt" + "http" + "json" + "log" + "rand" + "strconv" +) + +var ( + port = flag.Int("port", 9333, "http listen port") + metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") + mapper *directory.Mapper +) + +func dirReadHandler(w http.ResponseWriter, r *http.Request) { + volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) + machineList := mapper.Get((uint32)(volumeId)) + x := rand.Intn(len(machineList)) + machine := machineList[x] + bytes, _ := json.Marshal(machine) + callback := r.FormValue("callback") + w.Header().Set("Content-Type", "application/javascript") + if callback == "" { + w.Write(bytes) + } else { + w.Write([]uint8(callback)) + w.Write([]uint8("(")) + w.Write(bytes) + w.Write([]uint8(")")) + } +} +func dirWriteHandler(w http.ResponseWriter, r *http.Request) { + machineList := mapper.PickForWrite() + bytes, _ := json.Marshal(machineList) + callback := r.FormValue("callback") + w.Header().Set("Content-Type", "application/javascript") + if callback == "" { + w.Write(bytes) + } else { + w.Write([]uint8(callback)) + w.Write([]uint8("(")) + w.Write(bytes) + w.Write([]uint8(")")) + } +} +func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + s := r.FormValue("server") + publicServer := r.FormValue("publicServer") + volumes := make([]storage.VolumeStat, 0) + json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + mapper.Add(directory.NewMachine(s, publicServer), volumes) +} +func dirStatusHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + bytes, _ := json.Marshal(mapper) + fmt.Fprint(w, bytes) +} + +func main() { + flag.Parse() + mapper = directory.NewMapper(*metaFolder, "directory") + defer mapper.Save() + http.HandleFunc("/dir/read", dirReadHandler) + http.HandleFunc("/dir/write", dirWriteHandler) + http.HandleFunc("/dir/join", dirJoinHandler) + http.HandleFunc("/dir/status", dirStatusHandler) + + log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port)) + e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) + if e!=nil { + log.Fatalf("Fail to start:",e.String()) + } + +} diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 95ec57d2e..459d7f2b1 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -6,13 +6,19 @@ import ( "path" "rand" "log" - "store" + "storage" ) type Machine struct { Server string //[:port] PublicServer string } +type Mapper struct { + dir string + FileName string + Id2Machine map[uint32][]*Machine + LastId uint32 +} func NewMachine(server, publicServer string) (m *Machine) { m = new(Machine) @@ -20,13 +26,6 @@ func NewMachine(server, publicServer string) (m *Machine) { return } -type Mapper struct { - dir string - FileName string - Id2Machine map[uint32][]*Machine - LastId uint32 -} - func NewMapper(dirname string, filename string) (m *Mapper) { m = new(Mapper) m.dir = dirname @@ -51,7 +50,7 @@ func (m *Mapper) PickForWrite() []*Machine { func (m *Mapper) Get(vid uint32) []*Machine { return m.Id2Machine[vid] } -func (m *Mapper) Add(machine *Machine, volumes []store.VolumeStat) { +func (m *Mapper) Add(machine *Machine, volumes []storage.VolumeStat) { log.Println("Adding store node", machine.Server) for _, v := range volumes { existing := m.Id2Machine[uint32(v.Id)] diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go new file mode 100644 index 000000000..7fde63953 --- /dev/null +++ b/weed-fs/src/pkg/storage/needle.go @@ -0,0 +1,67 @@ +package storage + +import ( + "io" + "io/ioutil" + "http" + "log" + "strconv" + "strings" +) + +type Needle struct{ + Cookie uint8 "random number to mitigate brute force lookups" + Key uint64 "file id" + AlternateKey uint32 "supplemental id" + Size uint32 "Data size" + Data []byte "The actual file data" + Checksum int32 "CRC32 to check integrity" + Padding []byte "Aligned to 8 bytes" +} +func NewNeedle(r *http.Request)(n *Needle){ + n = new(Needle) + form,fe:=r.MultipartReader() + if fe!=nil { + log.Fatalf("MultipartReader [ERROR] %s\n", fe) + } + part,_:=form.NextPart() + data,_:=ioutil.ReadAll(part) + n.Data = data + + n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path,".")]) + + return +} +func (n *Needle) ParsePath(path string){ + a := strings.Split(path,"_") + log.Println("cookie",a[0],"key",a[1],"altKey",a[2]) + cookie,_ := strconv.Atoi(a[0]) + n.Cookie = uint8(cookie) + n.Key,_ = strconv.Atoui64(a[1]) + altKey,_ := strconv.Atoui64(a[2]) + n.AlternateKey = uint32(altKey) +} +func (n *Needle) Append(w io.Writer){ + header := make([]byte,17) + header[0] = n.Cookie + uint64toBytes(header[1:9],n.Key) + uint32toBytes(header[9:13],n.AlternateKey) + n.Size = uint32(len(n.Data)) + uint32toBytes(header[13:17],n.Size) + w.Write(header) + w.Write(n.Data) + rest := 8-((n.Size+17+4)%8) + uint32toBytes(header[0:4],uint32(n.Checksum)) + w.Write(header[0:rest+4]) +} +func (n *Needle) Read(r io.Reader, size uint32){ + bytes := make([]byte,size+17+4) + r.Read(bytes) + n.Cookie = bytes[0] + n.Key = bytesToUint64(bytes[1:9]) + n.AlternateKey = bytesToUint32(bytes[9:13]) + n.Size = bytesToUint32(bytes[13:17]) + n.Data = bytes[17:17+size] + n.Checksum = int32(bytesToUint32(bytes[17+size:17+size+4])) +} + diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go new file mode 100644 index 000000000..9669536ac --- /dev/null +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -0,0 +1,53 @@ +package storage + +import ( + "os" +) + +type NeedleKey struct{ + Key uint64 "file id" + AlternateKey uint32 "supplemental id" +} +func (k *NeedleKey) String() string { + var tmp [12]byte + for i :=uint(0);i<8;i++{ + tmp[i] = byte(k.Key >> (8*i)); + } + for i :=uint(0);i<4;i++{ + tmp[i+8] = byte(k.AlternateKey >> (8*i)); + } + return string(tmp[:]) +} + +type NeedleValue struct{ + Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 "Size of the data portion" +} + +type NeedleMap struct{ + m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue +} +func NewNeedleMap() (nm *NeedleMap){ + nm = new(NeedleMap) + nm.m = make(map[string]*NeedleValue) + return +} +func (nm *NeedleMap) load(file *os.File){ +} +func makeKey(key uint64, altKey uint32) string { + var tmp [12]byte + for i :=uint(0);i<8;i++{ + tmp[i] = byte(key >> (8*i)); + } + for i :=uint(0);i<4;i++{ + tmp[i+8] = byte(altKey >> (8*i)); + } + return string(tmp[:]) +} +func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32){ + nm.m[makeKey(key,altKey)] = &NeedleValue{Offset:offset, Size:size} +} +func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool){ + element, ok = nm.m[makeKey(key,altKey)] + return +} diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go new file mode 100644 index 000000000..201f50528 --- /dev/null +++ b/weed-fs/src/pkg/storage/store.go @@ -0,0 +1,69 @@ +package storage + +import ( + "log" + "io/ioutil" + "json" + "strings" + "strconv" + "url" +) + +type Store struct { + volumes map[uint64]*Volume + dir string + Port int + PublicServer string +} +type VolumeStat struct { + Id uint64 "id" + Status int "status" //0:read, 1:write +} + +func NewStore(port int, publicServer, dirname string) (s *Store) { + s = new(Store) + s.Port, s.PublicServer, s.dir = port, publicServer, dirname + s.volumes = make(map[uint64]*Volume) + + counter := uint64(0) + files, _ := ioutil.ReadDir(dirname) + for _, f := range files { + if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") { + continue + } + id, err := strconv.Atoui64(f.Name[:-4]) + if err == nil { + continue + } + s.volumes[counter] = NewVolume(s.dir, id) + counter++ + } + log.Println("Store started on dir:", dirname, "with", counter, "existing volumes") + return +} + +func (s *Store) Join(mserver string) { + stats := make([]*VolumeStat, len(s.volumes)) + for k, _ := range s.volumes { + s := new(VolumeStat) + s.Id, s.Status = k, 1 + stats = append(stats, s) + } + bytes, _ := json.Marshal(stats) + values := make(url.Values) + values.Add("port", strconv.Itoa(s.Port)) + values.Add("publicServer", s.PublicServer) + values.Add("volumes", string(bytes)) + post("http://"+mserver+"/dir/join", values) +} +func (s *Store) Close() { + for _, v := range s.volumes { + v.Close() + } +} +func (s *Store) Write(i uint64, n *Needle) { + s.volumes[i].write(n) +} +func (s *Store) Read(i uint64, n *Needle) { + s.volumes[i].read(n) +} diff --git a/weed-fs/src/pkg/storage/util.go b/weed-fs/src/pkg/storage/util.go new file mode 100644 index 000000000..3315dbe01 --- /dev/null +++ b/weed-fs/src/pkg/storage/util.go @@ -0,0 +1,50 @@ +package storage + +import ( + "http" + "io/ioutil" + "url" + "log" +) + +func bytesToUint64(b []byte)(v uint64){ + for i :=uint(7);i>0;i-- { + v += uint64(b[i]) + v <<= 8 + } + v+=uint64(b[0]) + return +} +func bytesToUint32(b []byte)(v uint32){ + for i :=uint(3);i>0;i-- { + v += uint32(b[i]) + v <<= 8 + } + v+=uint32(b[0]) + return +} +func uint64toBytes(b []byte, v uint64){ + for i :=uint(0);i<8;i++ { + b[i] = byte(v>>(i*8)) + } +} +func uint32toBytes(b []byte, v uint32){ + for i :=uint(0);i<4;i++ { + b[i] = byte(v>>(i*8)) + } +} + +func post(url string, values url.Values)string{ + r, err := http.PostForm(url, values) + if err != nil { + log.Println("post:", err) + return "" + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("post:", err) + return "" + } + return string(b) +} \ No newline at end of file diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go new file mode 100644 index 000000000..6b6db1ba2 --- /dev/null +++ b/weed-fs/src/pkg/storage/volume.go @@ -0,0 +1,66 @@ +package storage + +import ( + "os" + "path" + "strconv" + "log" +) + +type Volume struct { + Id uint64 + dir string + dataFile, indexFile *os.File + nm *NeedleMap + + accessChannel chan int +} + +func NewVolume(dirname string, id uint64) (v *Volume) { + var e os.Error + v = new(Volume) + v.dir = dirname + v.Id = id + fileName := strconv.Uitoa64(v.Id) + log.Println("file", v.dir, "/", fileName) + v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + log.Fatalf("New Volume [ERROR] %s\n", e) + } + v.indexFile, e = os.OpenFile(path.Join(v.dir,fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + log.Fatalf("New Volume [ERROR] %s\n", e) + } + v.nm = NewNeedleMap() + v.nm.load(v.indexFile) + + v.accessChannel = make(chan int, 1) + v.accessChannel <- 0 + + return +} +func (v *Volume) Close() { + close(v.accessChannel) + v.dataFile.Close() + v.indexFile.Close() +} + +func (v *Volume) write(n *Needle) { + counter := <-v.accessChannel + offset, _ := v.dataFile.Seek(0, 2) + n.Append(v.dataFile) + nv, ok := v.nm.get(n.Key, n.AlternateKey) + if !ok || int64(nv.Offset)*8 < offset { + v.nm.put(n.Key, n.AlternateKey, uint32(offset/8), n.Size) + } + v.accessChannel <- counter + 1 +} +func (v *Volume) read(n *Needle) { + counter := <-v.accessChannel + nv, ok := v.nm.get(n.Key, n.AlternateKey) + if ok && nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*8, 0) + n.Read(v.dataFile, nv.Size) + } + v.accessChannel <- counter + 1 +}