diff --git a/src/command.go b/src/command.go deleted file mode 100644 index c8d86ca66..000000000 --- a/src/command.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "os" - "strings" -) - -type Command struct { - // Run runs the command. - // The args are the arguments after the command name. - Run func(cmd *Command, args []string) bool - - // UsageLine is the one-line usage message. - // The first word in the line is taken to be the command name. - UsageLine string - - // Short is the short description shown in the 'go help' output. - Short string - - // Long is the long message shown in the 'go help ' output. - Long string - - // Flag is a set of flags specific to this command. - Flag flag.FlagSet - - IsDebug *bool -} - -// Name returns the command's name: the first word in the usage line. -func (c *Command) Name() string { - name := c.UsageLine - i := strings.Index(name, " ") - if i >= 0 { - name = name[:i] - } - return name -} - -func (c *Command) Usage() { - fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) - fmt.Fprintf(os.Stderr, "Default Usage:\n") - c.Flag.PrintDefaults() - fmt.Fprintf(os.Stderr, "Description:\n") - fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) - os.Exit(2) -} - -// Runnable reports whether the command can be run; otherwise -// it is a documentation pseudo-command such as importpath. -func (c *Command) Runnable() bool { - return c.Run != nil -} diff --git a/src/export.go b/src/export.go deleted file mode 100644 index 44cf85c24..000000000 --- a/src/export.go +++ /dev/null @@ -1,164 +0,0 @@ -package main - -import ( - "archive/tar" - "bytes" - "fmt" - "log" - "os" - "path" - "code.google.com/p/weed-fs/weed/directory" - "code.google.com/p/weed-fs/weed/storage" - "strconv" - "strings" - "text/template" - "time" -) - -func init() { - cmdExport.Run = runExport // break init cycle - cmdExport.IsDebug = cmdExport.Flag.Bool("debug", false, "enable debug mode") -} - -const ( - defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}` -) - -var cmdExport = &Command{ - UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}}", - Short: "list or export files from one volume data file", - Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified. - - The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{Key}}. - - `, -} - -var ( - exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files") - exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") - dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") - format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}") - tarFh *tar.Writer - tarHeader tar.Header - fnTmpl *template.Template - fnTmplBuf = bytes.NewBuffer(nil) -) - -func runExport(cmd *Command, args []string) bool { - - if *exportVolumeId == -1 { - return false - } - - var err error - if *dest != "" { - if *dest != "-" && !strings.HasSuffix(*dest, ".tar") { - fmt.Println("the output file", *dest, "should be '-' or end with .tar") - return false - } - - if fnTmpl, err = template.New("name").Parse(*format); err != nil { - fmt.Println("cannot parse format " + *format + ": " + err.Error()) - return false - } - - var fh *os.File - if *dest == "-" { - fh = os.Stdout - } else { - if fh, err = os.Create(*dest); err != nil { - log.Fatalf("cannot open output tar %s: %s", *dest, err) - } - } - defer fh.Close() - tarFh = tar.NewWriter(fh) - defer tarFh.Close() - t := time.Now() - tarHeader = tar.Header{Mode: 0644, - ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), - Typeflag: tar.TypeReg, - AccessTime: t, ChangeTime: t} - } - - fileName := strconv.Itoa(*exportVolumeId) - vid := storage.VolumeId(*exportVolumeId) - indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644) - if err != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - - nm := storage.LoadNeedleMap(indexFile) - - var version storage.Version - - err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error { - version = superBlock.Version - return nil - }, func(n *storage.Needle, offset uint32) error { - debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) - nv, ok := nm.Get(n.Id) - if ok && nv.Size > 0 { - return walker(vid, n, version) - } else { - if !ok { - debug("This seems deleted", n.Id) - } else { - debug("Id", n.Id, "size", n.Size) - } - } - return nil - }) - if err != nil { - log.Fatalf("Export Volume File [ERROR] %s\n", err) - } - return true -} - -type nameParams struct { - Name string - Id uint64 - Mime string - Key string -} - -func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) { - key := directory.NewFileId(vid, n.Id, n.Cookie).String() - if tarFh != nil { - fnTmplBuf.Reset() - if err = fnTmpl.Execute(fnTmplBuf, - nameParams{Name: string(n.Name), - Id: n.Id, - Mime: string(n.Mime), - Key: key, - }, - ); err != nil { - return err - } - nm := fnTmplBuf.String() - - if n.IsGzipped() && path.Ext(nm) != ".gz" { - nm = nm + ".gz" - } - - tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) - if err = tarFh.WriteHeader(&tarHeader); err != nil { - return err - } - _, err = tarFh.Write(n.Data) - } else { - size := n.DataSize - if version == storage.Version1 { - size = n.Size - } - fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n", - key, - n.Name, - size, - n.IsGzipped(), - n.Mime, - ) - } - return -} diff --git a/src/fix.go b/src/fix.go deleted file mode 100644 index 85693d9b1..000000000 --- a/src/fix.go +++ /dev/null @@ -1,64 +0,0 @@ -package main - -import ( - "log" - "os" - "path" - "code.google.com/p/weed-fs/weed/storage" - "strconv" -) - -func init() { - cmdFix.Run = runFix // break init cycle - cmdFix.IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") -} - -var cmdFix = &Command{ - UsageLine: "fix -dir=/tmp -volumeId=234", - Short: "run weed tool fix on index file if corrupted", - Long: `Fix runs the WeedFS fix command to re-create the index .idx file. - - `, -} - -var ( - fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") - fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") -) - -func runFix(cmd *Command, args []string) bool { - - if *fixVolumeId == -1 { - return false - } - - fileName := strconv.Itoa(*fixVolumeId) - indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - - nm := storage.NewNeedleMap(indexFile) - defer nm.Close() - - vid := storage.VolumeId(*fixVolumeId) - err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error { - return nil - }, func(n *storage.Needle, offset uint32) error { - debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) - if n.Size > 0 { - count, pe := nm.Put(n.Id, offset/storage.NeedlePaddingSize, n.Size) - debug("saved", count, "with error", pe) - } else { - debug("skipping deleted file ...") - nm.Delete(n.Id) - } - return nil - }) - if err != nil { - log.Fatalf("Export Volume File [ERROR] %s\n", err) - } - - return true -} diff --git a/src/master.go b/src/master.go deleted file mode 100644 index 78db929f2..000000000 --- a/src/master.go +++ /dev/null @@ -1,217 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" - "log" - "net/http" - "code.google.com/p/weed-fs/weed/replication" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/topology" - "runtime" - "strconv" - "strings" - "time" -) - -func init() { - cmdMaster.Run = runMaster // break init cycle - cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") -} - -var cmdMaster = &Command{ - UsageLine: "master -port=9333", - Short: "start a master server", - Long: `start a master server to provide volume=>location mapping service - and sequence number of file ids - - `, -} - -var ( - mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") - volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") - mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") - defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") - mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds") - mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") -) - -var topo *topology.Topology -var vg *replication.VolumeGrowth - -func dirLookupHandler(w http.ResponseWriter, r *http.Request) { - vid := r.FormValue("volumeId") - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] - } - volumeId, err := storage.NewVolumeId(vid) - if err == nil { - machines := topo.Lookup(volumeId) - if machines != nil { - ret := []map[string]string{} - for _, dn := range machines { - ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) - } - writeJson(w, r, map[string]interface{}{"locations": ret}) - } else { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid}) - } -} - -func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - c, e := strconv.Atoi(r.FormValue("count")) - if e != nil { - c = 1 - } - repType := r.FormValue("replication") - if repType == "" { - repType = *defaultRepType - } - rt, err := storage.NewReplicationTypeFromString(repType) - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) - return - } - if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { - if topo.FreeSpace() <= 0 { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "No free volumes left!"}) - return - } else { - vg.GrowByType(rt, topo) - } - } - fid, count, dn, err := topo.PickForWrite(rt, c) - if err == nil { - writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) - } -} - -func dirJoinHandler(w http.ResponseWriter, r *http.Request) { - init := r.FormValue("init") == "true" - ip := r.FormValue("ip") - if ip == "" { - ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] - } - port, _ := strconv.Atoi(r.FormValue("port")) - maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) - s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") - publicUrl := r.FormValue("publicUrl") - volumes := new([]storage.VolumeInfo) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) - debug(s, "volumes", r.FormValue("volumes")) - topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) - m := make(map[string]interface{}) - m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 - writeJson(w, r, m) -} - -func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Topology"] = topo.ToMap() - writeJson(w, r, m) -} - -func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { - gcThreshold := r.FormValue("garbageThreshold") - if gcThreshold == "" { - gcThreshold = *garbageThreshold - } - debug("garbageThreshold =", gcThreshold) - topo.Vacuum(gcThreshold) - dirStatusHandler(w, r) -} - -func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - count := 0 - rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) - if err == nil { - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if topo.FreeSpace() < count*rt.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) - } else { - count, err = vg.GrowByCountAndType(count, rt, topo) - } - } else { - err = errors.New("parameter count is not found") - } - } - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]interface{}{"count": count}) - } -} - -func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = topo.ToVolumeMap() - writeJson(w, r, m) -} - -func redirectHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - machines := topo.Lookup(volumeId) - if machines != nil && len(machines) > 0 { - http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } -} - -func runMaster(cmd *Command, args []string) bool { - if *mMaxCpu < 1 { - *mMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*mMaxCpu) - topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) - vg = replication.NewDefaultVolumeGrowth() - log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") - http.HandleFunc("/dir/assign", dirAssignHandler) - http.HandleFunc("/dir/lookup", dirLookupHandler) - http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirStatusHandler) - http.HandleFunc("/vol/grow", volumeGrowHandler) - http.HandleFunc("/vol/status", volumeStatusHandler) - http.HandleFunc("/vol/vacuum", volumeVacuumHandler) - - http.HandleFunc("/", redirectHandler) - - topo.StartRefreshWritableVolumes(*garbageThreshold) - - log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport)) - srv := &http.Server{ - Addr: ":" + strconv.Itoa(*mport), - Handler: http.DefaultServeMux, - ReadTimeout: time.Duration(*mReadTimeout) * time.Second, - } - e := srv.ListenAndServe() - if e != nil { - log.Fatalf("Fail to start:%s", e.Error()) - } - return true -} diff --git a/src/shell.go b/src/shell.go deleted file mode 100644 index daf0b7e1f..000000000 --- a/src/shell.go +++ /dev/null @@ -1,53 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "os" -) - -func init() { - cmdShell.Run = runShell // break init cycle -} - -var cmdShell = &Command{ - UsageLine: "shell", - Short: "run interactive commands, now just echo", - Long: `run interactive commands. - - `, -} - -var () - -func runShell(command *Command, args []string) bool { - r := bufio.NewReader(os.Stdin) - o := bufio.NewWriter(os.Stdout) - e := bufio.NewWriter(os.Stderr) - prompt := func() { - o.WriteString("> ") - o.Flush() - } - readLine := func() string { - ret, err := r.ReadString('\n') - if err != nil { - fmt.Fprint(e, err) - os.Exit(1) - } - return ret - } - execCmd := func(cmd string) int { - if cmd != "" { - o.WriteString(cmd) - } - return 0 - } - - cmd := "" - for { - prompt() - cmd = readLine() - execCmd(cmd) - } - return true -} diff --git a/src/upload.go b/src/upload.go deleted file mode 100644 index e1e296bf2..000000000 --- a/src/upload.go +++ /dev/null @@ -1,113 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" - "fmt" - "net/url" - "os" - "path" - "code.google.com/p/weed-fs/weed/operation" - "code.google.com/p/weed-fs/weed/util" - "strconv" -) - -var uploadReplication *string - -func init() { - cmdUpload.Run = runUpload // break init cycle - cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") - server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") - uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)") -} - -var cmdUpload = &Command{ - UsageLine: "upload -server=localhost:9333 file1 [file2 file3]", - Short: "upload one or a list of files", - Long: `upload one or a list of files. - It uses consecutive file keys for the list of files. - e.g. If the file1 uses key k, file2 can be read via k_1 - - `, -} - -type AssignResult struct { - Fid string "fid" - Url string "url" - PublicUrl string "publicUrl" - Count int - Error string "error" -} - -func assign(count int) (*AssignResult, error) { - values := make(url.Values) - values.Add("count", strconv.Itoa(count)) - values.Add("replication", *uploadReplication) - jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values) - debug("assign result :", string(jsonBlob)) - if err != nil { - return nil, err - } - var ret AssignResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Count <= 0 { - return nil, errors.New(ret.Error) - } - return &ret, nil -} - -func upload(filename string, server string, fid string) (int, error) { - debug("Start uploading file:", filename) - fh, err := os.Open(filename) - if err != nil { - debug("Failed to open file:", filename) - return 0, err - } - ret, e := operation.Upload("http://"+server+"/"+fid, path.Base(filename), fh) - if e != nil { - return 0, e - } - return ret.Size, e -} - -type SubmitResult struct { - Fid string "fid" - Size int "size" - Error string "error" -} - -func submit(files []string) []SubmitResult { - ret, err := assign(len(files)) - if err != nil { - fmt.Println(err) - return nil - } - results := make([]SubmitResult, len(files)) - for index, file := range files { - fid := ret.Fid - if index > 0 { - fid = fid + "_" + strconv.Itoa(index) - } - results[index].Size, err = upload(file, ret.PublicUrl, fid) - if err != nil { - fid = "" - results[index].Error = err.Error() - } - results[index].Fid = fid - } - return results -} - -func runUpload(cmd *Command, args []string) bool { - *IsDebug = true - if len(cmdUpload.Flag.Args()) == 0 { - return false - } - results := submit(args) - bytes, _ := json.Marshal(results) - fmt.Print(string(bytes)) - return true -} diff --git a/src/version.go b/src/version.go deleted file mode 100644 index b418126a4..000000000 --- a/src/version.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "fmt" - "runtime" -) - -const ( - VERSION = "0.28 beta" -) - -var cmdVersion = &Command{ - Run: runVersion, - UsageLine: "version", - Short: "print Weed File System version", - Long: `Version prints the Weed File System version`, -} - -func runVersion(cmd *Command, args []string) bool { - if len(args) != 0 { - cmd.Usage() - } - - fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH) - return true -} diff --git a/src/volume.go b/src/volume.go deleted file mode 100644 index 8bfc7681a..000000000 --- a/src/volume.go +++ /dev/null @@ -1,378 +0,0 @@ -package main - -import ( - "bytes" - "log" - "math/rand" - "mime" - "net/http" - "os" - "code.google.com/p/weed-fs/weed/operation" - "code.google.com/p/weed-fs/weed/storage" - "runtime" - "strconv" - "strings" - "time" -) - -func init() { - cmdVolume.Run = runVolume // break init cycle - cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode") -} - -var cmdVolume = &Command{ - UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333", - Short: "start a volume server", - Long: `start a volume server to provide storage spaces - - `, -} - -var ( - vport = cmdVolume.Flag.Int("port", 8080, "http listen port") - volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files") - ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name") - publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible :") - masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") - vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting") - maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes") - vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds") - vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - - store *storage.Store -) - -var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") - -func statusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = store.Status() - writeJson(w, r, m) -} -func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) - if err == nil { - writeJson(w, r, map[string]string{"error": ""}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } - debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) -} -func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { - err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) - if err == nil { - writeJson(w, r, map[string]interface{}{"error": "", "result": ret}) - } else { - writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false}) - } - debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) -} -func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { - err := store.CompactVolume(r.FormValue("volume")) - if err == nil { - writeJson(w, r, map[string]string{"error": ""}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } - debug("compacted volume =", r.FormValue("volume"), ", error =", err) -} -func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { - err := store.CommitCompactVolume(r.FormValue("volume")) - if err == nil { - writeJson(w, r, map[string]interface{}{"error": ""}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } - debug("commit compact volume =", r.FormValue("volume"), ", error =", err) -} -func storeHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - GetHandler(w, r) - case "DELETE": - DeleteHandler(w, r) - case "POST": - PostHandler(w, r) - } -} -func GetHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, ext := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - n.ParsePath(fid) - - debug("volume", volumeId, "reading", n) - if !store.HasVolume(volumeId) { - lookupResult, err := operation.Lookup(*masterNode, volumeId) - debug("volume", volumeId, "found on", lookupResult, "error", err) - if err == nil { - http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - debug("lookup error:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - } - return - } - cookie := n.Cookie - count, e := store.Read(volumeId, n) - debug("read bytes", count, "error", e) - if e != nil || count <= 0 { - debug("read error:", e, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - if n.Cookie != cookie { - log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - w.WriteHeader(http.StatusNotFound) - return - } - if n.NameSize > 0 { - fname := string(n.Name) - dotIndex := strings.LastIndex(fname, ".") - if dotIndex > 0 { - ext = fname[dotIndex:] - } - } - mtype := "" - if ext != "" { - mtype = mime.TypeByExtension(ext) - } - if n.MimeSize > 0 { - mtype = string(n.Mime) - } - if mtype != "" { - w.Header().Set("Content-Type", mtype) - } - if n.NameSize > 0 { - w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(string(n.Name))) - } - if ext != ".gz" { - if n.IsGzipped() { - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - w.Header().Set("Content-Encoding", "gzip") - } else { - if n.Data, err = storage.UnGzipData(n.Data); err != nil { - debug("lookup error:", err, r.URL.Path) - } - } - } - } - w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) - w.Write(n.Data) -} -func PostHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - vid, _, _ := parseURLPath(r.URL.Path) - volumeId, e := storage.NewVolumeId(vid) - if e != nil { - writeJson(w, r, e) - } else { - needle, filename, ne := storage.NewNeedle(r) - if ne != nil { - writeJson(w, r, ne) - } else { - ret, err := store.Write(volumeId, needle) - errorStatus := "" - needToReplicate := !store.HasVolume(volumeId) - if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate() - } else { - errorStatus = "Failed to write to local disk" - } - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "standard" { - if !distributedOperation(volumeId, func(location operation.Location) bool { - _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) - return err == nil - }) { - ret = 0 - errorStatus = "Failed to write to replicas for volume " + volumeId.String() - } - } - } - m := make(map[string]interface{}) - if errorStatus == "" { - w.WriteHeader(http.StatusCreated) - } else { - store.Delete(volumeId, needle) - distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) - w.WriteHeader(http.StatusInternalServerError) - m["error"] = errorStatus - } - m["size"] = ret - writeJson(w, r, m) - } - } -} -func DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) - n.ParsePath(fid) - - debug("deleting", n) - - cookie := n.Cookie - count, ok := store.Read(volumeId, n) - - if ok != nil { - m := make(map[string]uint32) - m["size"] = 0 - writeJson(w, r, m) - return - } - - if n.Cookie != cookie { - log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - - n.Size = 0 - ret, err := store.Delete(volumeId, n) - if err != nil { - log.Printf("delete error: %s\n", err) - return - } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "standard" { - if !distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) { - ret = 0 - } - } - } - - if ret != 0 { - w.WriteHeader(http.StatusAccepted) - } else { - w.WriteHeader(http.StatusInternalServerError) - } - - m := make(map[string]uint32) - m["size"] = uint32(count) - writeJson(w, r, m) -} - -func parseURLPath(path string) (vid, fid, ext string) { - - sepIndex := strings.LastIndex(path, "/") - commaIndex := strings.LastIndex(path[sepIndex:], ",") - if commaIndex <= 0 { - if "favicon.ico" != path[sepIndex+1:] { - log.Println("unknown file id", path[sepIndex+1:]) - } - return - } - dotIndex := strings.LastIndex(path[sepIndex:], ".") - vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] - ext = "" - if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] - ext = path[dotIndex:] - } - return -} - -func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { - length := 0 - selfUrl := (*ip + ":" + strconv.Itoa(*vport)) - results := make(chan bool) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan bool) { - results <- op(location) - }(location, results) - } - } - ret := true - for i := 0; i < length; i++ { - ret = ret && <-results - } - return ret - } else { - log.Println("Failed to lookup for", volumeId, lookupErr.Error()) - } - return false -} - -func runVolume(cmd *Command, args []string) bool { - if *vMaxCpu < 1 { - *vMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*vMaxCpu) - fileInfo, err := os.Stat(*volumeFolder) - if err != nil { - log.Fatalf("No Existing Folder:%s", *volumeFolder) - } - if !fileInfo.IsDir() { - log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder) - } - perm := fileInfo.Mode().Perm() - log.Println("Volume Folder permission:", perm) - - if *publicUrl == "" { - *publicUrl = *ip + ":" + strconv.Itoa(*vport) - } - - store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount) - defer store.Close() - http.HandleFunc("/", storeHandler) - http.HandleFunc("/status", statusHandler) - http.HandleFunc("/admin/assign_volume", assignVolumeHandler) - http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler) - http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler) - http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler) - - go func() { - connected := true - store.SetMaster(*masterNode) - for { - err := store.Join() - if err == nil { - if !connected { - connected = true - log.Println("Reconnected with master") - } - } else { - if connected { - connected = false - } - } - time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - log.Println("store joined at", *masterNode) - - log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport)) - srv := &http.Server{ - Addr: ":" + strconv.Itoa(*vport), - Handler: http.DefaultServeMux, - ReadTimeout: (time.Duration(*vReadTimeout) * time.Second), - } - e := srv.ListenAndServe() - if e != nil { - log.Fatalf("Fail to start:%s", e.Error()) - } - return true -} diff --git a/src/weed.go b/src/weed.go deleted file mode 100644 index c03cb68ac..000000000 --- a/src/weed.go +++ /dev/null @@ -1,199 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "fmt" - "io" - "math/rand" - "net/http" - "os" - "strings" - "sync" - "text/template" - "time" - "unicode" - "unicode/utf8" -) - -var IsDebug *bool -var server *string - -var commands = []*Command{ - cmdFix, - cmdMaster, - cmdUpload, - cmdShell, - cmdVersion, - cmdVolume, - cmdExport, -} - -var exitStatus = 0 -var exitMu sync.Mutex - -func setExitStatus(n int) { - exitMu.Lock() - if exitStatus < n { - exitStatus = n - } - exitMu.Unlock() -} - -func main() { - rand.Seed(time.Now().UnixNano()) - flag.Usage = usage - flag.Parse() - - args := flag.Args() - if len(args) < 1 { - usage() - } - - if args[0] == "help" { - help(args[1:]) - for _, cmd := range commands { - if len(args) >= 2 && cmd.Name() == args[1] && cmd.Run != nil { - fmt.Fprintf(os.Stderr, "Default Parameters:\n") - cmd.Flag.PrintDefaults() - } - } - return - } - - for _, cmd := range commands { - if cmd.Name() == args[0] && cmd.Run != nil { - cmd.Flag.Usage = func() { cmd.Usage() } - cmd.Flag.Parse(args[1:]) - args = cmd.Flag.Args() - IsDebug = cmd.IsDebug - if !cmd.Run(cmd, args) { - fmt.Fprintf(os.Stderr, "\n") - cmd.Flag.Usage() - fmt.Fprintf(os.Stderr, "Default Parameters:\n") - cmd.Flag.PrintDefaults() - } - exit() - return - } - } - - fmt.Fprintf(os.Stderr, "weed: unknown subcommand %q\nRun 'weed help' for usage.\n", args[0]) - setExitStatus(2) - exit() -} - -var usageTemplate = `WeedFS is a software to store billions of files and serve them fast! - -Usage: - - weed command [arguments] - -The commands are: -{{range .}}{{if .Runnable}} - {{.Name | printf "%-11s"}} {{.Short}}{{end}}{{end}} - -Use "weed help [command]" for more information about a command. - -` - -var helpTemplate = `{{if .Runnable}}Usage: weed {{.UsageLine}} -{{end}} - {{.Long}} -` - -// tmpl executes the given template text on data, writing the result to w. -func tmpl(w io.Writer, text string, data interface{}) { - t := template.New("top") - t.Funcs(template.FuncMap{"trim": strings.TrimSpace, "capitalize": capitalize}) - template.Must(t.Parse(text)) - if err := t.Execute(w, data); err != nil { - panic(err) - } -} - -func capitalize(s string) string { - if s == "" { - return s - } - r, n := utf8.DecodeRuneInString(s) - return string(unicode.ToTitle(r)) + s[n:] -} - -func printUsage(w io.Writer) { - tmpl(w, usageTemplate, commands) -} - -func usage() { - printUsage(os.Stderr) - os.Exit(2) -} - -// help implements the 'help' command. -func help(args []string) { - if len(args) == 0 { - printUsage(os.Stdout) - // not exit 2: succeeded at 'weed help'. - return - } - if len(args) != 1 { - fmt.Fprintf(os.Stderr, "usage: weed help command\n\nToo many arguments given.\n") - os.Exit(2) // failed at 'weed help' - } - - arg := args[0] - - for _, cmd := range commands { - if cmd.Name() == arg { - tmpl(os.Stdout, helpTemplate, cmd) - // not exit 2: succeeded at 'weed help cmd'. - return - } - } - - fmt.Fprintf(os.Stderr, "Unknown help topic %#q. Run 'weed help'.\n", arg) - os.Exit(2) // failed at 'weed help cmd' -} - -var atexitFuncs []func() - -func atexit(f func()) { - atexitFuncs = append(atexitFuncs, f) -} - -func exit() { - for _, f := range atexitFuncs { - f() - } - os.Exit(exitStatus) -} - -func exitIfErrors() { - if exitStatus != 0 { - exit() - } -} -func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { - w.Header().Set("Content-Type", "application/javascript") - var bytes []byte - if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") - } else { - bytes, _ = json.Marshal(obj) - } - callback := r.FormValue("callback") - if callback == "" { - w.Write(bytes) - } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) - fmt.Fprint(w, string(bytes)) - w.Write([]uint8(")")) - } -} - -func debug(params ...interface{}) { - if *IsDebug { - fmt.Println(params) - } -}