diff --git a/weed-fs/src/cmd/weed/fix.go b/weed-fs/src/cmd/weed/fix.go index e1d5da02d..dfb6969eb 100644 --- a/weed-fs/src/cmd/weed/fix.go +++ b/weed-fs/src/cmd/weed/fix.go @@ -10,6 +10,7 @@ import ( func init() { cmdFix.Run = runFix // break init cycle + IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") } var cmdFix = &Command{ @@ -23,9 +24,6 @@ var cmdFix = &Command{ var ( dir = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") volumeId = cmdFix.Flag.Int("volumeId", -1, "a non-negative volume id. The volume should already exist in the dir. The volume index file should not exist.") - IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") - - store *storage.Store ) func runFix(cmd *Command, args []string) bool { diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go new file mode 100644 index 000000000..e6af3809a --- /dev/null +++ b/weed-fs/src/cmd/weed/master.go @@ -0,0 +1,88 @@ +package main + +import ( + "pkg/directory" + "encoding/json" + "log" + "net/http" + "pkg/storage" + "strconv" + "strings" +) + +func init() { + cmdMaster.Run = runMaster // break init cycle + IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") + port = cmdMaster.Flag.Int("port", 8080, "http listen port") +} + +var cmdMaster = &Command{ + UsageLine: "master -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333", + Short: "start a master server", + Long: `start a master server to provide volume=>location mapping service + and sequence number of file ids + + `, +} + +var ( + metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") + capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold") + mapper *directory.Mapper + volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") +) + +func dirLookupHandler(w http.ResponseWriter, r *http.Request) { + vid := r.FormValue("volumeId") + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + volumeId, _ := strconv.ParseUint(vid, 10, 64) + machine, e := mapper.Get(uint32(volumeId)) + if e == nil { + writeJson(w, r, machine.Server) + } else { + log.Println("Invalid volume id", volumeId) + writeJson(w, r, map[string]string{"error": "volume id " + strconv.FormatUint(volumeId, 10) + " not found"}) + } +} +func dirAssignHandler(w http.ResponseWriter, r *http.Request) { + c:=r.FormValue("count") + fid, count, machine, err := mapper.PickForWrite(c) + if err == nil { + writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl, "count":strconv.Itoa(count)}) + } else { + log.Println(err) + writeJson(w, r, map[string]string{"error": err.Error()}) + } +} +func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") + publicUrl := r.FormValue("publicUrl") + volumes := new([]storage.VolumeInfo) + json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + if *IsDebug { + log.Println(s, "volumes", r.FormValue("volumes")) + } + mapper.Add(*directory.NewMachine(s, publicUrl, *volumes)) +} +func dirStatusHandler(w http.ResponseWriter, r *http.Request) { + writeJson(w, r, mapper) +} + +func runMaster(cmd *Command, args []string) bool { + log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") + mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024) + http.HandleFunc("/dir/assign", dirAssignHandler) + http.HandleFunc("/dir/lookup", dirLookupHandler) + http.HandleFunc("/dir/join", dirJoinHandler) + http.HandleFunc("/dir/status", dirStatusHandler) + + log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port)) + e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) + if e != nil { + log.Fatal("Fail to start:", e) + } + return true +} diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go new file mode 100644 index 000000000..5cfb6ee97 --- /dev/null +++ b/weed-fs/src/cmd/weed/upload.go @@ -0,0 +1,124 @@ +package main + +import ( + "bytes" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "net/url" + "os" + "pkg/util" + "strconv" +) + +func init() { + cmdUpload.Run = runUpload // break init cycle + IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") + server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") +} + +var cmdUpload = &Command{ + UsageLine: "upload -server=localhost:9333 file1 file2 file2", + Short: "upload a set of files, using consecutive file keys", + Long: `upload a set of files, using consecutive file keys. + e.g. If the file1 uses key k, file2 can be read via k_1 + + `, +} + +type AssignResult struct { + Fid string "fid" + Url string "url" + PublicUrl string "publicUrl" + Count int `json:",string"` + Error string "error" +} + +func assign(count int) (AssignResult, error) { + values := make(url.Values) + values.Add("count", strconv.Itoa(count)) + jsonBlob := util.Post("http://"+*server+"/dir/assign", values) + var ret AssignResult + err := json.Unmarshal(jsonBlob, &ret) + if err != nil { + return ret, err + } + if ret.Count <= 0 { + return ret, errors.New(ret.Error) + } + return ret, nil +} + +type UploadResult struct { + Size int +} + +func upload(filename string, uploadUrl string) (int, string) { + body_buf := bytes.NewBufferString("") + body_writer := multipart.NewWriter(body_buf) + file_writer, err := body_writer.CreateFormFile("file", filename) + if err != nil { + panic(err.Error()) + } + fh, err := os.Open(filename) + if err != nil { + panic(err.Error()) + } + io.Copy(file_writer, fh) + content_type := body_writer.FormDataContentType() + body_writer.Close() + resp, err := http.Post(uploadUrl, content_type, body_buf) + if err != nil { + panic(err.Error()) + } + defer resp.Body.Close() + resp_body, err := ioutil.ReadAll(resp.Body) + if err != nil { + panic(err.Error()) + } + var ret UploadResult + err = json.Unmarshal(resp_body, &ret) + if err != nil { + panic(err.Error()) + } + //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl) + return ret.Size, uploadUrl +} + +type SubmitResult struct { + Fid string "fid" + Size int "size" +} + +func submit(files []string)([]SubmitResult) { + ret, err := assign(len(files)) + if err != nil { + panic(err) + } + results := make([]SubmitResult, len(files)) + for index, file := range files { + fid := ret.Fid + if index > 0 { + fid = fid + "_" + strconv.Itoa(index) + } + uploadUrl := "http://" + ret.PublicUrl + "/" + fid + results[index].Size, _ = upload(file, uploadUrl) + results[index].Fid = fid + } + return results +} + +func runUpload(cmd *Command, args []string) bool { + if len(cmdUpload.Flag.Args()) == 0 { + return false + } + results := submit(flag.Args()) + bytes, _ := json.Marshal(results) + fmt.Print(string(bytes)) + return true +} diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go new file mode 100644 index 000000000..45261f017 --- /dev/null +++ b/weed-fs/src/cmd/weed/volume.go @@ -0,0 +1,177 @@ +package main + +import ( + "log" + "math/rand" + "mime" + "net/http" + "pkg/storage" + "strconv" + "strings" + "time" +) + +func init() { + cmdVolume.Run = runVolume // break init cycle + IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode") + port = cmdVolume.Flag.Int("port", 8080, "http listen port") +} + +var cmdVolume = &Command{ + UsageLine: "volume -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333", + Short: "start a volume server", + Long: `start a volume server to provide storage spaces + + `, +} + +var ( + chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") + volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids") + publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read") + metaServer = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings") + pulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + + store *storage.Store + +) + +func statusHandler(w http.ResponseWriter, r *http.Request) { + writeJson(w, r, store.Status()) +} +func addVolumeHandler(w http.ResponseWriter, r *http.Request) { + store.AddVolume(r.FormValue("volume")) + writeJson(w, r, store.Status()) +} +func storeHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + GetHandler(w, r) + case "DELETE": + DeleteHandler(w, r) + case "POST": + PostHandler(w, r) + } +} +func GetHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, ext := parseURLPath(r.URL.Path) + volumeId, _ := strconv.ParseUint(vid, 10, 64) + n.ParsePath(fid) + + if *IsDebug { + log.Println("volume", volumeId, "reading", n) + } + cookie := n.Cookie + count, e := store.Read(volumeId, n) + if *IsDebug { + log.Println("read bytes", count, "error", e) + } + if n.Cookie != cookie { + log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + if ext != "" { + mtype := mime.TypeByExtension(ext) + w.Header().Set("Content-Type", mtype) + if storage.IsCompressable(ext, mtype){ + if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip"){ + w.Header().Set("Content-Encoding", "gzip") + }else{ + n.Data = storage.UnGzipData(n.Data) + } + } + } + w.Write(n.Data) +} +func PostHandler(w http.ResponseWriter, r *http.Request) { + vid, _, _ := parseURLPath(r.URL.Path) + volumeId, e := strconv.ParseUint(vid, 10, 64) + if e != nil { + writeJson(w, r, e) + } else { + needle, ne := storage.NewNeedle(r) + if ne != nil { + writeJson(w, r, ne) + } else { + ret := store.Write(volumeId, needle) + m := make(map[string]uint32) + m["size"] = ret + writeJson(w, r, m) + } + } +} +func DeleteHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, _ := parseURLPath(r.URL.Path) + volumeId, _ := strconv.ParseUint(vid, 10, 64) + n.ParsePath(fid) + + if *IsDebug { + log.Println("deleting", n) + } + + cookie := n.Cookie + count, ok := store.Read(volumeId, n) + + if ok != nil { + m := make(map[string]uint32) + m["size"] = 0 + writeJson(w, r, m) + return + } + + if n.Cookie != cookie { + log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + + n.Size = 0 + store.Delete(volumeId, n) + m := make(map[string]uint32) + m["size"] = uint32(count) + writeJson(w, r, m) +} +func parseURLPath(path string) (vid, fid, ext string) { + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + if commaIndex <= 0 { + if "favicon.ico" != path[sepIndex+1:] { + log.Println("unknown file id", path[sepIndex+1:]) + } + return + } + dotIndex := strings.LastIndex(path[sepIndex:], ".") + vid = path[sepIndex+1 : commaIndex] + fid = path[commaIndex+1:] + ext = "" + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + ext = path[dotIndex:] + } + return +} + +func runVolume(cmd *Command, args []string) bool { + //TODO: now default to 1G, this value should come from server? + store = storage.NewStore(*port, *publicUrl, *chunkFolder, *volumes) + defer store.Close() + http.HandleFunc("/", storeHandler) + http.HandleFunc("/status", statusHandler) + http.HandleFunc("/add_volume", addVolumeHandler) + + go func() { + for { + store.Join(*metaServer) + time.Sleep(time.Duration(float32(*pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + log.Println("store joined at", *metaServer) + + log.Println("Start storage service at http://127.0.0.1:"+strconv.Itoa(*port), "public url", *publicUrl) + e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) + if e != nil { + log.Fatalf("Fail to start:", e) + } + return true +} diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index b9122d8e6..72ce4c09c 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -1,8 +1,10 @@ package main import ( + "encoding/json" "flag" "fmt" + "net/http" "io" "log" "os" @@ -13,9 +15,16 @@ import ( "unicode/utf8" ) +var IsDebug *bool +var server *string +var port *int + var commands = []*Command{ cmdFix, + cmdMaster, + cmdUpload, cmdVersion, + cmdVolume, } var exitStatus = 0 @@ -163,3 +172,16 @@ func exitIfErrors() { exit() } } +func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { + w.Header().Set("Content-Type", "application/javascript") + bytes, _ := json.Marshal(obj) + callback := r.FormValue("callback") + if callback == "" { + w.Write(bytes) + } else { + w.Write([]uint8(callback)) + w.Write([]uint8("(")) + fmt.Fprint(w, string(bytes)) + w.Write([]uint8(")")) + } +}