mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
store is sort of working
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@3 282b0af5-e82d-9cf1-ede4-77906d7719d0
This commit is contained in:
parent
4f362fe883
commit
8fade18e40
17
weed-fs/.project
Normal file
17
weed-fs/.project
Normal file
|
@ -0,0 +1,17 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<projectDescription>
|
||||
<name>weed-fs</name>
|
||||
<comment></comment>
|
||||
<projects>
|
||||
</projects>
|
||||
<buildSpec>
|
||||
<buildCommand>
|
||||
<name>com.googlecode.goclipse.goBuilder</name>
|
||||
<arguments>
|
||||
</arguments>
|
||||
</buildCommand>
|
||||
</buildSpec>
|
||||
<natures>
|
||||
<nature>goclipse.goNature</nature>
|
||||
</natures>
|
||||
</projectDescription>
|
47
weed-fs/src/cmd/agent.go
Normal file
47
weed-fs/src/cmd/agent.go
Normal file
|
@ -0,0 +1,47 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"http"
|
||||
// "runtime"
|
||||
"log"
|
||||
"os"
|
||||
"exec"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func HelloServer(w http.ResponseWriter, req *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.Write([]byte("hello, world!\n"))
|
||||
}
|
||||
func ShellHandler(w http.ResponseWriter, r *http.Request) {
|
||||
name := r.FormValue("cmd")
|
||||
args := r.Form["arg"]
|
||||
dir := r.FormValue("dir")
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
|
||||
cmd := run(w, dir, name, args)
|
||||
cmd.Wait()
|
||||
}
|
||||
|
||||
func run(w http.ResponseWriter, dir string, name string, args []string) *exec.Cmd {
|
||||
cmd := exec.Command(name, args...)
|
||||
cmd.Dir = dir
|
||||
cmd.Env = os.Environ()
|
||||
cmd.Stdin = os.Stdin
|
||||
cmd.Stdout = w
|
||||
cmd.Stderr = w
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
fmt.Fprint(w, "could not execute", args, ":", cmd.Args, "\n", err,"\n")
|
||||
}
|
||||
return cmd
|
||||
}
|
||||
|
||||
func main() {
|
||||
// runtime.GOMAXPROCS(1)
|
||||
http.HandleFunc("/", HelloServer)
|
||||
http.HandleFunc("/shell", ShellHandler)
|
||||
|
||||
log.Println("Serving at http://127.0.0.1:8080/")
|
||||
http.ListenAndServe(":8080", nil)
|
||||
}
|
18
weed-fs/src/cmd/gdir.go
Normal file
18
weed-fs/src/cmd/gdir.go
Normal file
|
@ -0,0 +1,18 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"directory"
|
||||
// "runtime"
|
||||
"log"
|
||||
)
|
||||
|
||||
func main() {
|
||||
m := directory.NewMapper("/tmp", "directory")
|
||||
log.Println("map size", len(m.Virtual2physical))
|
||||
m.Add(10, 11,12,13)
|
||||
m.Add(20, 21,22,23)
|
||||
log.Println("map(10)", m.Get(10))
|
||||
log.Println("map size", len(m.Virtual2physical))
|
||||
m.Save()
|
||||
defer m.Save()
|
||||
}
|
88
weed-fs/src/cmd/gstore.go
Normal file
88
weed-fs/src/cmd/gstore.go
Normal file
|
@ -0,0 +1,88 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"store"
|
||||
"directory"
|
||||
"flag"
|
||||
"fmt"
|
||||
"http"
|
||||
"log"
|
||||
"mime"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var (
|
||||
port = flag.Int("port", 8080, "http listen port")
|
||||
chunkFolder = flag.String("dir", "/tmp", "data directory to store files")
|
||||
chunkCount = flag.Int("chunks", 5, "data chunks to store files")
|
||||
chunkEnabled = flag.Bool("data", false, "act as a store server")
|
||||
metaEnabled = flag.Bool("meta", false, "act as a directory server")
|
||||
metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings")
|
||||
)
|
||||
|
||||
type Haystack struct {
|
||||
store *store.Store
|
||||
directory *directory.Mapper
|
||||
}
|
||||
|
||||
func storeHandler(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case "GET":
|
||||
server.GetHandler(w, r)
|
||||
case "DELETE":
|
||||
server.DeleteHandler(w, r)
|
||||
case "POST":
|
||||
server.PostHandler(w, r)
|
||||
}
|
||||
}
|
||||
func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
n := new(store.Needle)
|
||||
path := r.URL.Path
|
||||
sepIndex := strings.Index(path[1:], "/") + 1
|
||||
volumeId, _ := strconv.Atoi(path[1:sepIndex])
|
||||
dotIndex := strings.LastIndex(path, ".")
|
||||
n.ParsePath(path[sepIndex+1 : dotIndex])
|
||||
ext := path[dotIndex:]
|
||||
|
||||
s.store.Read(volumeId, n)
|
||||
w.Header().Set("Content-Type", mime.TypeByExtension(ext))
|
||||
w.Write(n.Data)
|
||||
}
|
||||
func (s *Haystack) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||
volumeId := s.store.Write(store.NewNeedle(r))
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
fmt.Fprint(w, "volumeId=", volumeId, "\n")
|
||||
}
|
||||
func (s *Haystack) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
func directoryHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
var server *Haystack
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
if !*chunkEnabled && !*metaEnabled {
|
||||
fmt.Fprintf(os.Stderr, "Need to act as either a store server or a directory server, or both\n")
|
||||
flag.PrintDefaults()
|
||||
os.Exit(-1)
|
||||
}
|
||||
server = new(Haystack)
|
||||
if *chunkEnabled {
|
||||
fmt.Fprintf(os.Stdout, "Chunk data stored in %s\n", *chunkFolder)
|
||||
server.store = store.NewStore(*chunkFolder, *chunkCount)
|
||||
defer server.store.Close()
|
||||
http.HandleFunc("/", storeHandler)
|
||||
}
|
||||
if *metaEnabled {
|
||||
server.directory = directory.NewMapper(*metaFolder, "directory")
|
||||
defer server.directory.Save()
|
||||
http.HandleFunc("/directory", directoryHandler)
|
||||
}
|
||||
|
||||
log.Println("Serving at http://127.0.0.1:" + strconv.Itoa(*port))
|
||||
http.ListenAndServe(":"+strconv.Itoa(*port), nil)
|
||||
}
|
47
weed-fs/src/pkg/directory/volume_mapping.go
Normal file
47
weed-fs/src/pkg/directory/volume_mapping.go
Normal file
|
@ -0,0 +1,47 @@
|
|||
package directory
|
||||
|
||||
import (
|
||||
"gob"
|
||||
"os"
|
||||
"log"
|
||||
)
|
||||
|
||||
type Mapper struct {
|
||||
dir string
|
||||
fileName string
|
||||
Virtual2physical map[uint32][]uint32
|
||||
}
|
||||
|
||||
func NewMapper(dirname string, filename string) (m *Mapper) {
|
||||
m = new(Mapper)
|
||||
m.dir = dirname
|
||||
m.fileName = filename
|
||||
log.Println("Loading virtual to physical:", m.dir, "/", m.fileName)
|
||||
dataFile, e := os.OpenFile(m.dir+string(os.PathSeparator)+m.fileName+".map", os.O_RDONLY, 0644)
|
||||
if e != nil {
|
||||
log.Fatalf("Mapping File Read [ERROR] %s\n", e)
|
||||
} else {
|
||||
m.Virtual2physical = make(map[uint32][]uint32)
|
||||
decoder := gob.NewDecoder(dataFile)
|
||||
decoder.Decode(m.Virtual2physical)
|
||||
dataFile.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
func (m *Mapper) Get(vid uint32) []uint32 {
|
||||
return m.Virtual2physical[vid]
|
||||
}
|
||||
func (m *Mapper) Add(vid uint32, pids ...uint32) {
|
||||
m.Virtual2physical[vid] = append(m.Virtual2physical[vid], pids...)
|
||||
}
|
||||
func (m *Mapper) Save() {
|
||||
log.Println("Saving virtual to physical:", m.dir, "/", m.fileName)
|
||||
dataFile, e := os.OpenFile(m.dir+string(os.PathSeparator)+m.fileName+".map", os.O_WRONLY, 0644)
|
||||
if e != nil {
|
||||
log.Fatalf("Mapping File Save [ERROR] %s\n", e)
|
||||
}
|
||||
defer dataFile.Close()
|
||||
m.Virtual2physical = make(map[uint32][]uint32)
|
||||
encoder := gob.NewEncoder(dataFile)
|
||||
encoder.Encode(m.Virtual2physical)
|
||||
}
|
67
weed-fs/src/pkg/store/needle.go
Normal file
67
weed-fs/src/pkg/store/needle.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"http"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Needle struct{
|
||||
Cookie uint8 "random number to mitigate brute force lookups"
|
||||
Key uint64 "file id"
|
||||
AlternateKey uint32 "supplemental id"
|
||||
Size uint32 "Data size"
|
||||
Data []byte "The actual file data"
|
||||
Checksum int32 "CRC32 to check integrity"
|
||||
Padding []byte "Aligned to 8 bytes"
|
||||
}
|
||||
func NewNeedle(r *http.Request)(n *Needle){
|
||||
n = new(Needle)
|
||||
form,fe:=r.MultipartReader()
|
||||
if fe!=nil {
|
||||
log.Fatalf("MultipartReader [ERROR] %s\n", fe)
|
||||
}
|
||||
part,_:=form.NextPart()
|
||||
data,_:=ioutil.ReadAll(part)
|
||||
n.Data = data
|
||||
|
||||
n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path,".")])
|
||||
|
||||
return
|
||||
}
|
||||
func (n *Needle) ParsePath(path string){
|
||||
a := strings.Split(path,"_")
|
||||
log.Println("cookie",a[0],"key",a[1],"altKey",a[2])
|
||||
cookie,_ := strconv.Atoi(a[0])
|
||||
n.Cookie = uint8(cookie)
|
||||
n.Key,_ = strconv.Atoui64(a[1])
|
||||
altKey,_ := strconv.Atoui64(a[2])
|
||||
n.AlternateKey = uint32(altKey)
|
||||
}
|
||||
func (n *Needle) Append(w io.Writer){
|
||||
header := make([]byte,17)
|
||||
header[0] = n.Cookie
|
||||
uint64toBytes(header[1:9],n.Key)
|
||||
uint32toBytes(header[9:13],n.AlternateKey)
|
||||
n.Size = uint32(len(n.Data))
|
||||
uint32toBytes(header[13:17],n.Size)
|
||||
w.Write(header)
|
||||
w.Write(n.Data)
|
||||
rest := 8-((n.Size+17+4)%8)
|
||||
uint32toBytes(header[0:4],uint32(n.Checksum))
|
||||
w.Write(header[0:rest+4])
|
||||
}
|
||||
func (n *Needle) Read(r io.Reader, size uint32){
|
||||
bytes := make([]byte,size+17+4)
|
||||
r.Read(bytes)
|
||||
n.Cookie = bytes[0]
|
||||
n.Key = bytesToUint64(bytes[1:9])
|
||||
n.AlternateKey = bytesToUint32(bytes[9:13])
|
||||
n.Size = bytesToUint32(bytes[13:17])
|
||||
n.Data = bytes[17:17+size]
|
||||
n.Checksum = int32(bytesToUint32(bytes[17+size:17+size+4]))
|
||||
}
|
||||
|
53
weed-fs/src/pkg/store/needle_map.go
Normal file
53
weed-fs/src/pkg/store/needle_map.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
type NeedleKey struct{
|
||||
Key uint64 "file id"
|
||||
AlternateKey uint32 "supplemental id"
|
||||
}
|
||||
func (k *NeedleKey) String() string {
|
||||
var tmp [12]byte
|
||||
for i :=uint(0);i<8;i++{
|
||||
tmp[i] = byte(k.Key >> (8*i));
|
||||
}
|
||||
for i :=uint(0);i<4;i++{
|
||||
tmp[i+8] = byte(k.AlternateKey >> (8*i));
|
||||
}
|
||||
return string(tmp[:])
|
||||
}
|
||||
|
||||
type NeedleValue struct{
|
||||
Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G
|
||||
Size uint32 "Size of the data portion"
|
||||
}
|
||||
|
||||
type NeedleMap struct{
|
||||
m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue
|
||||
}
|
||||
func NewNeedleMap() (nm *NeedleMap){
|
||||
nm = new(NeedleMap)
|
||||
nm.m = make(map[string]*NeedleValue)
|
||||
return
|
||||
}
|
||||
func (nm *NeedleMap) load(file *os.File){
|
||||
}
|
||||
func makeKey(key uint64, altKey uint32) string {
|
||||
var tmp [12]byte
|
||||
for i :=uint(0);i<8;i++{
|
||||
tmp[i] = byte(key >> (8*i));
|
||||
}
|
||||
for i :=uint(0);i<4;i++{
|
||||
tmp[i+8] = byte(altKey >> (8*i));
|
||||
}
|
||||
return string(tmp[:])
|
||||
}
|
||||
func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32){
|
||||
nm.m[makeKey(key,altKey)] = &NeedleValue{Offset:offset, Size:size}
|
||||
}
|
||||
func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool){
|
||||
element, ok = nm.m[makeKey(key,altKey)]
|
||||
return
|
||||
}
|
39
weed-fs/src/pkg/store/store.go
Normal file
39
weed-fs/src/pkg/store/store.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
)
|
||||
type Store struct{
|
||||
volumes []*Volume
|
||||
dir string
|
||||
|
||||
freeVolumeChannel chan int
|
||||
}
|
||||
func NewStore(dirname string, count int) (s *Store){
|
||||
s = new(Store)
|
||||
s.dir = dirname
|
||||
s.volumes = make([]*Volume,count)
|
||||
s.freeVolumeChannel = make(chan int, count)
|
||||
for i:=0;i<count;i++{
|
||||
s.volumes[i] = NewVolume(s.dir, strconv.Itob(i,16))
|
||||
s.freeVolumeChannel <- i
|
||||
}
|
||||
log.Println("Store started on dir:", dirname, "with", count,"volumes");
|
||||
return
|
||||
}
|
||||
func (s *Store)Close(){
|
||||
close(s.freeVolumeChannel)
|
||||
for _, v := range s.volumes{
|
||||
v.Close()
|
||||
}
|
||||
}
|
||||
func (s *Store)Write(n *Needle)(int){
|
||||
i := <- s.freeVolumeChannel
|
||||
s.volumes[i].write(n)
|
||||
s.freeVolumeChannel <- i
|
||||
return i
|
||||
}
|
||||
func (s *Store)Read(i int, n *Needle){
|
||||
s.volumes[i].read(n)
|
||||
}
|
28
weed-fs/src/pkg/store/util.go
Normal file
28
weed-fs/src/pkg/store/util.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package store
|
||||
|
||||
func bytesToUint64(b []byte)(v uint64){
|
||||
for i :=uint(7);i>0;i-- {
|
||||
v += uint64(b[i])
|
||||
v <<= 8
|
||||
}
|
||||
v+=uint64(b[0])
|
||||
return
|
||||
}
|
||||
func bytesToUint32(b []byte)(v uint32){
|
||||
for i :=uint(3);i>0;i-- {
|
||||
v += uint32(b[i])
|
||||
v <<= 8
|
||||
}
|
||||
v+=uint32(b[0])
|
||||
return
|
||||
}
|
||||
func uint64toBytes(b []byte, v uint64){
|
||||
for i :=uint(0);i<8;i++ {
|
||||
b[i] = byte(v>>(i*8))
|
||||
}
|
||||
}
|
||||
func uint32toBytes(b []byte, v uint32){
|
||||
for i :=uint(0);i<4;i++ {
|
||||
b[i] = byte(v>>(i*8))
|
||||
}
|
||||
}
|
63
weed-fs/src/pkg/store/volume.go
Normal file
63
weed-fs/src/pkg/store/volume.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"os"
|
||||
"log"
|
||||
)
|
||||
|
||||
type Volume struct {
|
||||
dir string
|
||||
fileName string
|
||||
dataFile, indexFile *os.File
|
||||
nm *NeedleMap
|
||||
|
||||
accessChannel chan int
|
||||
}
|
||||
|
||||
func NewVolume(dirname string, filename string) (v *Volume) {
|
||||
var e os.Error
|
||||
v = new(Volume)
|
||||
v.dir = dirname
|
||||
v.fileName = filename
|
||||
log.Println("file", v.dir, "/", v.fileName)
|
||||
v.dataFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
|
||||
if e != nil {
|
||||
log.Fatalf("New Volume [ERROR] %s\n", e)
|
||||
}
|
||||
v.indexFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
|
||||
if e != nil {
|
||||
log.Fatalf("New Volume [ERROR] %s\n", e)
|
||||
}
|
||||
v.nm = NewNeedleMap()
|
||||
v.nm.load(v.indexFile)
|
||||
|
||||
v.accessChannel = make(chan int, 1)
|
||||
v.accessChannel <- 0
|
||||
|
||||
return
|
||||
}
|
||||
func (v *Volume) Close() {
|
||||
close(v.accessChannel)
|
||||
v.dataFile.Close()
|
||||
v.indexFile.Close()
|
||||
}
|
||||
|
||||
func (v *Volume) write(n *Needle) {
|
||||
counter := <-v.accessChannel
|
||||
offset, _ := v.dataFile.Seek(0, 2)
|
||||
n.Append(v.dataFile)
|
||||
nv, ok := v.nm.get(n.Key, n.AlternateKey)
|
||||
if !ok || int64(nv.Offset)*8 < offset {
|
||||
v.nm.put(n.Key, n.AlternateKey, uint32(offset/8), n.Size)
|
||||
}
|
||||
v.accessChannel <- counter + 1
|
||||
}
|
||||
func (v *Volume) read(n *Needle) {
|
||||
counter := <-v.accessChannel
|
||||
nv, ok := v.nm.get(n.Key, n.AlternateKey)
|
||||
if ok && nv.Offset > 0 {
|
||||
v.dataFile.Seek(int64(nv.Offset)*8, 0)
|
||||
n.Read(v.dataFile, nv.Size)
|
||||
}
|
||||
v.accessChannel <- counter + 1
|
||||
}
|
Loading…
Reference in a new issue