diff --git a/src/command.go b/src/command.go new file mode 100644 index 000000000..c8d86ca66 --- /dev/null +++ b/src/command.go @@ -0,0 +1,54 @@ +package main + +import ( + "flag" + "fmt" + "os" + "strings" +) + +type Command struct { + // Run runs the command. + // The args are the arguments after the command name. + Run func(cmd *Command, args []string) bool + + // UsageLine is the one-line usage message. + // The first word in the line is taken to be the command name. + UsageLine string + + // Short is the short description shown in the 'go help' output. + Short string + + // Long is the long message shown in the 'go help ' output. + Long string + + // Flag is a set of flags specific to this command. + Flag flag.FlagSet + + IsDebug *bool +} + +// Name returns the command's name: the first word in the usage line. +func (c *Command) Name() string { + name := c.UsageLine + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name +} + +func (c *Command) Usage() { + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") + fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) + os.Exit(2) +} + +// Runnable reports whether the command can be run; otherwise +// it is a documentation pseudo-command such as importpath. +func (c *Command) Runnable() bool { + return c.Run != nil +} diff --git a/src/export.go b/src/export.go new file mode 100644 index 000000000..44cf85c24 --- /dev/null +++ b/src/export.go @@ -0,0 +1,164 @@ +package main + +import ( + "archive/tar" + "bytes" + "fmt" + "log" + "os" + "path" + "code.google.com/p/weed-fs/weed/directory" + "code.google.com/p/weed-fs/weed/storage" + "strconv" + "strings" + "text/template" + "time" +) + +func init() { + cmdExport.Run = runExport // break init cycle + cmdExport.IsDebug = cmdExport.Flag.Bool("debug", false, "enable debug mode") +} + +const ( + defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}` +) + +var cmdExport = &Command{ + UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}}", + Short: "list or export files from one volume data file", + Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified. + + The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{Key}}. + + `, +} + +var ( + exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files") + exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") + dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") + format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}") + tarFh *tar.Writer + tarHeader tar.Header + fnTmpl *template.Template + fnTmplBuf = bytes.NewBuffer(nil) +) + +func runExport(cmd *Command, args []string) bool { + + if *exportVolumeId == -1 { + return false + } + + var err error + if *dest != "" { + if *dest != "-" && !strings.HasSuffix(*dest, ".tar") { + fmt.Println("the output file", *dest, "should be '-' or end with .tar") + return false + } + + if fnTmpl, err = template.New("name").Parse(*format); err != nil { + fmt.Println("cannot parse format " + *format + ": " + err.Error()) + return false + } + + var fh *os.File + if *dest == "-" { + fh = os.Stdout + } else { + if fh, err = os.Create(*dest); err != nil { + log.Fatalf("cannot open output tar %s: %s", *dest, err) + } + } + defer fh.Close() + tarFh = tar.NewWriter(fh) + defer tarFh.Close() + t := time.Now() + tarHeader = tar.Header{Mode: 0644, + ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), + Typeflag: tar.TypeReg, + AccessTime: t, ChangeTime: t} + } + + fileName := strconv.Itoa(*exportVolumeId) + vid := storage.VolumeId(*exportVolumeId) + indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644) + if err != nil { + log.Fatalf("Create Volume Index [ERROR] %s\n", err) + } + defer indexFile.Close() + + nm := storage.LoadNeedleMap(indexFile) + + var version storage.Version + + err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error { + version = superBlock.Version + return nil + }, func(n *storage.Needle, offset uint32) error { + debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) + nv, ok := nm.Get(n.Id) + if ok && nv.Size > 0 { + return walker(vid, n, version) + } else { + if !ok { + debug("This seems deleted", n.Id) + } else { + debug("Id", n.Id, "size", n.Size) + } + } + return nil + }) + if err != nil { + log.Fatalf("Export Volume File [ERROR] %s\n", err) + } + return true +} + +type nameParams struct { + Name string + Id uint64 + Mime string + Key string +} + +func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) { + key := directory.NewFileId(vid, n.Id, n.Cookie).String() + if tarFh != nil { + fnTmplBuf.Reset() + if err = fnTmpl.Execute(fnTmplBuf, + nameParams{Name: string(n.Name), + Id: n.Id, + Mime: string(n.Mime), + Key: key, + }, + ); err != nil { + return err + } + nm := fnTmplBuf.String() + + if n.IsGzipped() && path.Ext(nm) != ".gz" { + nm = nm + ".gz" + } + + tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) + if err = tarFh.WriteHeader(&tarHeader); err != nil { + return err + } + _, err = tarFh.Write(n.Data) + } else { + size := n.DataSize + if version == storage.Version1 { + size = n.Size + } + fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n", + key, + n.Name, + size, + n.IsGzipped(), + n.Mime, + ) + } + return +} diff --git a/src/fix.go b/src/fix.go new file mode 100644 index 000000000..85693d9b1 --- /dev/null +++ b/src/fix.go @@ -0,0 +1,64 @@ +package main + +import ( + "log" + "os" + "path" + "code.google.com/p/weed-fs/weed/storage" + "strconv" +) + +func init() { + cmdFix.Run = runFix // break init cycle + cmdFix.IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") +} + +var cmdFix = &Command{ + UsageLine: "fix -dir=/tmp -volumeId=234", + Short: "run weed tool fix on index file if corrupted", + Long: `Fix runs the WeedFS fix command to re-create the index .idx file. + + `, +} + +var ( + fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") + fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") +) + +func runFix(cmd *Command, args []string) bool { + + if *fixVolumeId == -1 { + return false + } + + fileName := strconv.Itoa(*fixVolumeId) + indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + log.Fatalf("Create Volume Index [ERROR] %s\n", err) + } + defer indexFile.Close() + + nm := storage.NewNeedleMap(indexFile) + defer nm.Close() + + vid := storage.VolumeId(*fixVolumeId) + err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error { + return nil + }, func(n *storage.Needle, offset uint32) error { + debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) + if n.Size > 0 { + count, pe := nm.Put(n.Id, offset/storage.NeedlePaddingSize, n.Size) + debug("saved", count, "with error", pe) + } else { + debug("skipping deleted file ...") + nm.Delete(n.Id) + } + return nil + }) + if err != nil { + log.Fatalf("Export Volume File [ERROR] %s\n", err) + } + + return true +} diff --git a/src/master.go b/src/master.go new file mode 100644 index 000000000..78db929f2 --- /dev/null +++ b/src/master.go @@ -0,0 +1,217 @@ +package main + +import ( + "encoding/json" + "errors" + "log" + "net/http" + "code.google.com/p/weed-fs/weed/replication" + "code.google.com/p/weed-fs/weed/storage" + "code.google.com/p/weed-fs/weed/topology" + "runtime" + "strconv" + "strings" + "time" +) + +func init() { + cmdMaster.Run = runMaster // break init cycle + cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") +} + +var cmdMaster = &Command{ + UsageLine: "master -port=9333", + Short: "start a master server", + Long: `start a master server to provide volume=>location mapping service + and sequence number of file ids + + `, +} + +var ( + mport = cmdMaster.Flag.Int("port", 9333, "http listen port") + metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") + volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") + mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") + defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") + mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds") + mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") +) + +var topo *topology.Topology +var vg *replication.VolumeGrowth + +func dirLookupHandler(w http.ResponseWriter, r *http.Request) { + vid := r.FormValue("volumeId") + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + volumeId, err := storage.NewVolumeId(vid) + if err == nil { + machines := topo.Lookup(volumeId) + if machines != nil { + ret := []map[string]string{} + for _, dn := range machines { + ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) + } + writeJson(w, r, map[string]interface{}{"locations": ret}) + } else { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid}) + } +} + +func dirAssignHandler(w http.ResponseWriter, r *http.Request) { + c, e := strconv.Atoi(r.FormValue("count")) + if e != nil { + c = 1 + } + repType := r.FormValue("replication") + if repType == "" { + repType = *defaultRepType + } + rt, err := storage.NewReplicationTypeFromString(repType) + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": err.Error()}) + return + } + if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { + if topo.FreeSpace() <= 0 { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "No free volumes left!"}) + return + } else { + vg.GrowByType(rt, topo) + } + } + fid, count, dn, err := topo.PickForWrite(rt, c) + if err == nil { + writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": err.Error()}) + } +} + +func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + init := r.FormValue("init") == "true" + ip := r.FormValue("ip") + if ip == "" { + ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] + } + port, _ := strconv.Atoi(r.FormValue("port")) + maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") + publicUrl := r.FormValue("publicUrl") + volumes := new([]storage.VolumeInfo) + json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + debug(s, "volumes", r.FormValue("volumes")) + topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) + m := make(map[string]interface{}) + m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 + writeJson(w, r, m) +} + +func dirStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Topology"] = topo.ToMap() + writeJson(w, r, m) +} + +func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { + gcThreshold := r.FormValue("garbageThreshold") + if gcThreshold == "" { + gcThreshold = *garbageThreshold + } + debug("garbageThreshold =", gcThreshold) + topo.Vacuum(gcThreshold) + dirStatusHandler(w, r) +} + +func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { + count := 0 + rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) + if err == nil { + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if topo.FreeSpace() < count*rt.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) + } else { + count, err = vg.GrowByCountAndType(count, rt, topo) + } + } else { + err = errors.New("parameter count is not found") + } + } + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]interface{}{"count": count}) + } +} + +func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Volumes"] = topo.ToVolumeMap() + writeJson(w, r, m) +} + +func redirectHandler(w http.ResponseWriter, r *http.Request) { + vid, _, _ := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + machines := topo.Lookup(volumeId) + if machines != nil && len(machines) > 0 { + http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) + } else { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } +} + +func runMaster(cmd *Command, args []string) bool { + if *mMaxCpu < 1 { + *mMaxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*mMaxCpu) + topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) + vg = replication.NewDefaultVolumeGrowth() + log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") + http.HandleFunc("/dir/assign", dirAssignHandler) + http.HandleFunc("/dir/lookup", dirLookupHandler) + http.HandleFunc("/dir/join", dirJoinHandler) + http.HandleFunc("/dir/status", dirStatusHandler) + http.HandleFunc("/vol/grow", volumeGrowHandler) + http.HandleFunc("/vol/status", volumeStatusHandler) + http.HandleFunc("/vol/vacuum", volumeVacuumHandler) + + http.HandleFunc("/", redirectHandler) + + topo.StartRefreshWritableVolumes(*garbageThreshold) + + log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport)) + srv := &http.Server{ + Addr: ":" + strconv.Itoa(*mport), + Handler: http.DefaultServeMux, + ReadTimeout: time.Duration(*mReadTimeout) * time.Second, + } + e := srv.ListenAndServe() + if e != nil { + log.Fatalf("Fail to start:%s", e.Error()) + } + return true +} diff --git a/src/shell.go b/src/shell.go new file mode 100644 index 000000000..daf0b7e1f --- /dev/null +++ b/src/shell.go @@ -0,0 +1,53 @@ +package main + +import ( + "bufio" + "fmt" + "os" +) + +func init() { + cmdShell.Run = runShell // break init cycle +} + +var cmdShell = &Command{ + UsageLine: "shell", + Short: "run interactive commands, now just echo", + Long: `run interactive commands. + + `, +} + +var () + +func runShell(command *Command, args []string) bool { + r := bufio.NewReader(os.Stdin) + o := bufio.NewWriter(os.Stdout) + e := bufio.NewWriter(os.Stderr) + prompt := func() { + o.WriteString("> ") + o.Flush() + } + readLine := func() string { + ret, err := r.ReadString('\n') + if err != nil { + fmt.Fprint(e, err) + os.Exit(1) + } + return ret + } + execCmd := func(cmd string) int { + if cmd != "" { + o.WriteString(cmd) + } + return 0 + } + + cmd := "" + for { + prompt() + cmd = readLine() + execCmd(cmd) + } + return true +} diff --git a/src/upload.go b/src/upload.go new file mode 100644 index 000000000..e1e296bf2 --- /dev/null +++ b/src/upload.go @@ -0,0 +1,113 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "os" + "path" + "code.google.com/p/weed-fs/weed/operation" + "code.google.com/p/weed-fs/weed/util" + "strconv" +) + +var uploadReplication *string + +func init() { + cmdUpload.Run = runUpload // break init cycle + cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") + server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") + uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)") +} + +var cmdUpload = &Command{ + UsageLine: "upload -server=localhost:9333 file1 [file2 file3]", + Short: "upload one or a list of files", + Long: `upload one or a list of files. + It uses consecutive file keys for the list of files. + e.g. If the file1 uses key k, file2 can be read via k_1 + + `, +} + +type AssignResult struct { + Fid string "fid" + Url string "url" + PublicUrl string "publicUrl" + Count int + Error string "error" +} + +func assign(count int) (*AssignResult, error) { + values := make(url.Values) + values.Add("count", strconv.Itoa(count)) + values.Add("replication", *uploadReplication) + jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values) + debug("assign result :", string(jsonBlob)) + if err != nil { + return nil, err + } + var ret AssignResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Count <= 0 { + return nil, errors.New(ret.Error) + } + return &ret, nil +} + +func upload(filename string, server string, fid string) (int, error) { + debug("Start uploading file:", filename) + fh, err := os.Open(filename) + if err != nil { + debug("Failed to open file:", filename) + return 0, err + } + ret, e := operation.Upload("http://"+server+"/"+fid, path.Base(filename), fh) + if e != nil { + return 0, e + } + return ret.Size, e +} + +type SubmitResult struct { + Fid string "fid" + Size int "size" + Error string "error" +} + +func submit(files []string) []SubmitResult { + ret, err := assign(len(files)) + if err != nil { + fmt.Println(err) + return nil + } + results := make([]SubmitResult, len(files)) + for index, file := range files { + fid := ret.Fid + if index > 0 { + fid = fid + "_" + strconv.Itoa(index) + } + results[index].Size, err = upload(file, ret.PublicUrl, fid) + if err != nil { + fid = "" + results[index].Error = err.Error() + } + results[index].Fid = fid + } + return results +} + +func runUpload(cmd *Command, args []string) bool { + *IsDebug = true + if len(cmdUpload.Flag.Args()) == 0 { + return false + } + results := submit(args) + bytes, _ := json.Marshal(results) + fmt.Print(string(bytes)) + return true +} diff --git a/src/version.go b/src/version.go new file mode 100644 index 000000000..b418126a4 --- /dev/null +++ b/src/version.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "runtime" +) + +const ( + VERSION = "0.28 beta" +) + +var cmdVersion = &Command{ + Run: runVersion, + UsageLine: "version", + Short: "print Weed File System version", + Long: `Version prints the Weed File System version`, +} + +func runVersion(cmd *Command, args []string) bool { + if len(args) != 0 { + cmd.Usage() + } + + fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH) + return true +} diff --git a/src/volume.go b/src/volume.go new file mode 100644 index 000000000..8bfc7681a --- /dev/null +++ b/src/volume.go @@ -0,0 +1,378 @@ +package main + +import ( + "bytes" + "log" + "math/rand" + "mime" + "net/http" + "os" + "code.google.com/p/weed-fs/weed/operation" + "code.google.com/p/weed-fs/weed/storage" + "runtime" + "strconv" + "strings" + "time" +) + +func init() { + cmdVolume.Run = runVolume // break init cycle + cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode") +} + +var cmdVolume = &Command{ + UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333", + Short: "start a volume server", + Long: `start a volume server to provide storage spaces + + `, +} + +var ( + vport = cmdVolume.Flag.Int("port", 8080, "http listen port") + volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files") + ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name") + publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible :") + masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") + vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting") + maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes") + vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds") + vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + + store *storage.Store +) + +var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") + +func statusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Volumes"] = store.Status() + writeJson(w, r, m) +} +func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { + err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) + if err == nil { + writeJson(w, r, map[string]string{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } + debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) +} +func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { + err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) + if err == nil { + writeJson(w, r, map[string]interface{}{"error": "", "result": ret}) + } else { + writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false}) + } + debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) +} +func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { + err := store.CompactVolume(r.FormValue("volume")) + if err == nil { + writeJson(w, r, map[string]string{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } + debug("compacted volume =", r.FormValue("volume"), ", error =", err) +} +func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { + err := store.CommitCompactVolume(r.FormValue("volume")) + if err == nil { + writeJson(w, r, map[string]interface{}{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } + debug("commit compact volume =", r.FormValue("volume"), ", error =", err) +} +func storeHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + GetHandler(w, r) + case "DELETE": + DeleteHandler(w, r) + case "POST": + PostHandler(w, r) + } +} +func GetHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, ext := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + n.ParsePath(fid) + + debug("volume", volumeId, "reading", n) + if !store.HasVolume(volumeId) { + lookupResult, err := operation.Lookup(*masterNode, volumeId) + debug("volume", volumeId, "found on", lookupResult, "error", err) + if err == nil { + http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) + } else { + debug("lookup error:", err, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + return + } + cookie := n.Cookie + count, e := store.Read(volumeId, n) + debug("read bytes", count, "error", e) + if e != nil || count <= 0 { + debug("read error:", e, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + if n.Cookie != cookie { + log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + w.WriteHeader(http.StatusNotFound) + return + } + if n.NameSize > 0 { + fname := string(n.Name) + dotIndex := strings.LastIndex(fname, ".") + if dotIndex > 0 { + ext = fname[dotIndex:] + } + } + mtype := "" + if ext != "" { + mtype = mime.TypeByExtension(ext) + } + if n.MimeSize > 0 { + mtype = string(n.Mime) + } + if mtype != "" { + w.Header().Set("Content-Type", mtype) + } + if n.NameSize > 0 { + w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(string(n.Name))) + } + if ext != ".gz" { + if n.IsGzipped() { + if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { + w.Header().Set("Content-Encoding", "gzip") + } else { + if n.Data, err = storage.UnGzipData(n.Data); err != nil { + debug("lookup error:", err, r.URL.Path) + } + } + } + } + w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) + w.Write(n.Data) +} +func PostHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + vid, _, _ := parseURLPath(r.URL.Path) + volumeId, e := storage.NewVolumeId(vid) + if e != nil { + writeJson(w, r, e) + } else { + needle, filename, ne := storage.NewNeedle(r) + if ne != nil { + writeJson(w, r, ne) + } else { + ret, err := store.Write(volumeId, needle) + errorStatus := "" + needToReplicate := !store.HasVolume(volumeId) + if err != nil { + errorStatus = "Failed to write to local disk (" + err.Error() + ")" + } else if ret > 0 { + needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate() + } else { + errorStatus = "Failed to write to local disk" + } + if !needToReplicate && ret > 0 { + needToReplicate = store.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "standard" { + if !distributedOperation(volumeId, func(location operation.Location) bool { + _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) + return err == nil + }) { + ret = 0 + errorStatus = "Failed to write to replicas for volume " + volumeId.String() + } + } + } + m := make(map[string]interface{}) + if errorStatus == "" { + w.WriteHeader(http.StatusCreated) + } else { + store.Delete(volumeId, needle) + distributedOperation(volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) + w.WriteHeader(http.StatusInternalServerError) + m["error"] = errorStatus + } + m["size"] = ret + writeJson(w, r, m) + } + } +} +func DeleteHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, _ := parseURLPath(r.URL.Path) + volumeId, _ := storage.NewVolumeId(vid) + n.ParsePath(fid) + + debug("deleting", n) + + cookie := n.Cookie + count, ok := store.Read(volumeId, n) + + if ok != nil { + m := make(map[string]uint32) + m["size"] = 0 + writeJson(w, r, m) + return + } + + if n.Cookie != cookie { + log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + + n.Size = 0 + ret, err := store.Delete(volumeId, n) + if err != nil { + log.Printf("delete error: %s\n", err) + return + } + + needToReplicate := !store.HasVolume(volumeId) + if !needToReplicate && ret > 0 { + needToReplicate = store.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "standard" { + if !distributedOperation(volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) { + ret = 0 + } + } + } + + if ret != 0 { + w.WriteHeader(http.StatusAccepted) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + + m := make(map[string]uint32) + m["size"] = uint32(count) + writeJson(w, r, m) +} + +func parseURLPath(path string) (vid, fid, ext string) { + + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + if commaIndex <= 0 { + if "favicon.ico" != path[sepIndex+1:] { + log.Println("unknown file id", path[sepIndex+1:]) + } + return + } + dotIndex := strings.LastIndex(path[sepIndex:], ".") + vid = path[sepIndex+1 : commaIndex] + fid = path[commaIndex+1:] + ext = "" + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + ext = path[dotIndex:] + } + return +} + +func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool { + if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { + length := 0 + selfUrl := (*ip + ":" + strconv.Itoa(*vport)) + results := make(chan bool) + for _, location := range lookupResult.Locations { + if location.Url != selfUrl { + length++ + go func(location operation.Location, results chan bool) { + results <- op(location) + }(location, results) + } + } + ret := true + for i := 0; i < length; i++ { + ret = ret && <-results + } + return ret + } else { + log.Println("Failed to lookup for", volumeId, lookupErr.Error()) + } + return false +} + +func runVolume(cmd *Command, args []string) bool { + if *vMaxCpu < 1 { + *vMaxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*vMaxCpu) + fileInfo, err := os.Stat(*volumeFolder) + if err != nil { + log.Fatalf("No Existing Folder:%s", *volumeFolder) + } + if !fileInfo.IsDir() { + log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder) + } + perm := fileInfo.Mode().Perm() + log.Println("Volume Folder permission:", perm) + + if *publicUrl == "" { + *publicUrl = *ip + ":" + strconv.Itoa(*vport) + } + + store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount) + defer store.Close() + http.HandleFunc("/", storeHandler) + http.HandleFunc("/status", statusHandler) + http.HandleFunc("/admin/assign_volume", assignVolumeHandler) + http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler) + http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler) + http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler) + + go func() { + connected := true + store.SetMaster(*masterNode) + for { + err := store.Join() + if err == nil { + if !connected { + connected = true + log.Println("Reconnected with master") + } + } else { + if connected { + connected = false + } + } + time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + log.Println("store joined at", *masterNode) + + log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport)) + srv := &http.Server{ + Addr: ":" + strconv.Itoa(*vport), + Handler: http.DefaultServeMux, + ReadTimeout: (time.Duration(*vReadTimeout) * time.Second), + } + e := srv.ListenAndServe() + if e != nil { + log.Fatalf("Fail to start:%s", e.Error()) + } + return true +} diff --git a/src/weed.go b/src/weed.go new file mode 100644 index 000000000..c03cb68ac --- /dev/null +++ b/src/weed.go @@ -0,0 +1,199 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "math/rand" + "net/http" + "os" + "strings" + "sync" + "text/template" + "time" + "unicode" + "unicode/utf8" +) + +var IsDebug *bool +var server *string + +var commands = []*Command{ + cmdFix, + cmdMaster, + cmdUpload, + cmdShell, + cmdVersion, + cmdVolume, + cmdExport, +} + +var exitStatus = 0 +var exitMu sync.Mutex + +func setExitStatus(n int) { + exitMu.Lock() + if exitStatus < n { + exitStatus = n + } + exitMu.Unlock() +} + +func main() { + rand.Seed(time.Now().UnixNano()) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 1 { + usage() + } + + if args[0] == "help" { + help(args[1:]) + for _, cmd := range commands { + if len(args) >= 2 && cmd.Name() == args[1] && cmd.Run != nil { + fmt.Fprintf(os.Stderr, "Default Parameters:\n") + cmd.Flag.PrintDefaults() + } + } + return + } + + for _, cmd := range commands { + if cmd.Name() == args[0] && cmd.Run != nil { + cmd.Flag.Usage = func() { cmd.Usage() } + cmd.Flag.Parse(args[1:]) + args = cmd.Flag.Args() + IsDebug = cmd.IsDebug + if !cmd.Run(cmd, args) { + fmt.Fprintf(os.Stderr, "\n") + cmd.Flag.Usage() + fmt.Fprintf(os.Stderr, "Default Parameters:\n") + cmd.Flag.PrintDefaults() + } + exit() + return + } + } + + fmt.Fprintf(os.Stderr, "weed: unknown subcommand %q\nRun 'weed help' for usage.\n", args[0]) + setExitStatus(2) + exit() +} + +var usageTemplate = `WeedFS is a software to store billions of files and serve them fast! + +Usage: + + weed command [arguments] + +The commands are: +{{range .}}{{if .Runnable}} + {{.Name | printf "%-11s"}} {{.Short}}{{end}}{{end}} + +Use "weed help [command]" for more information about a command. + +` + +var helpTemplate = `{{if .Runnable}}Usage: weed {{.UsageLine}} +{{end}} + {{.Long}} +` + +// tmpl executes the given template text on data, writing the result to w. +func tmpl(w io.Writer, text string, data interface{}) { + t := template.New("top") + t.Funcs(template.FuncMap{"trim": strings.TrimSpace, "capitalize": capitalize}) + template.Must(t.Parse(text)) + if err := t.Execute(w, data); err != nil { + panic(err) + } +} + +func capitalize(s string) string { + if s == "" { + return s + } + r, n := utf8.DecodeRuneInString(s) + return string(unicode.ToTitle(r)) + s[n:] +} + +func printUsage(w io.Writer) { + tmpl(w, usageTemplate, commands) +} + +func usage() { + printUsage(os.Stderr) + os.Exit(2) +} + +// help implements the 'help' command. +func help(args []string) { + if len(args) == 0 { + printUsage(os.Stdout) + // not exit 2: succeeded at 'weed help'. + return + } + if len(args) != 1 { + fmt.Fprintf(os.Stderr, "usage: weed help command\n\nToo many arguments given.\n") + os.Exit(2) // failed at 'weed help' + } + + arg := args[0] + + for _, cmd := range commands { + if cmd.Name() == arg { + tmpl(os.Stdout, helpTemplate, cmd) + // not exit 2: succeeded at 'weed help cmd'. + return + } + } + + fmt.Fprintf(os.Stderr, "Unknown help topic %#q. Run 'weed help'.\n", arg) + os.Exit(2) // failed at 'weed help cmd' +} + +var atexitFuncs []func() + +func atexit(f func()) { + atexitFuncs = append(atexitFuncs, f) +} + +func exit() { + for _, f := range atexitFuncs { + f() + } + os.Exit(exitStatus) +} + +func exitIfErrors() { + if exitStatus != 0 { + exit() + } +} +func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { + w.Header().Set("Content-Type", "application/javascript") + var bytes []byte + if r.FormValue("pretty") != "" { + bytes, _ = json.MarshalIndent(obj, "", " ") + } else { + bytes, _ = json.Marshal(obj) + } + callback := r.FormValue("callback") + if callback == "" { + w.Write(bytes) + } else { + w.Write([]uint8(callback)) + w.Write([]uint8("(")) + fmt.Fprint(w, string(bytes)) + w.Write([]uint8(")")) + } +} + +func debug(params ...interface{}) { + if *IsDebug { + fmt.Println(params) + } +}