mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
7743ddd7db
54
src/command.go
Normal file
54
src/command.go
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Command struct {
|
||||||
|
// Run runs the command.
|
||||||
|
// The args are the arguments after the command name.
|
||||||
|
Run func(cmd *Command, args []string) bool
|
||||||
|
|
||||||
|
// UsageLine is the one-line usage message.
|
||||||
|
// The first word in the line is taken to be the command name.
|
||||||
|
UsageLine string
|
||||||
|
|
||||||
|
// Short is the short description shown in the 'go help' output.
|
||||||
|
Short string
|
||||||
|
|
||||||
|
// Long is the long message shown in the 'go help <this-command>' output.
|
||||||
|
Long string
|
||||||
|
|
||||||
|
// Flag is a set of flags specific to this command.
|
||||||
|
Flag flag.FlagSet
|
||||||
|
|
||||||
|
IsDebug *bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name returns the command's name: the first word in the usage line.
|
||||||
|
func (c *Command) Name() string {
|
||||||
|
name := c.UsageLine
|
||||||
|
i := strings.Index(name, " ")
|
||||||
|
if i >= 0 {
|
||||||
|
name = name[:i]
|
||||||
|
}
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Command) Usage() {
|
||||||
|
fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
|
||||||
|
fmt.Fprintf(os.Stderr, "Default Usage:\n")
|
||||||
|
c.Flag.PrintDefaults()
|
||||||
|
fmt.Fprintf(os.Stderr, "Description:\n")
|
||||||
|
fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
|
||||||
|
os.Exit(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Runnable reports whether the command can be run; otherwise
|
||||||
|
// it is a documentation pseudo-command such as importpath.
|
||||||
|
func (c *Command) Runnable() bool {
|
||||||
|
return c.Run != nil
|
||||||
|
}
|
164
src/export.go
Normal file
164
src/export.go
Normal file
|
@ -0,0 +1,164 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/tar"
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"code.google.com/p/weed-fs/weed/directory"
|
||||||
|
"code.google.com/p/weed-fs/weed/storage"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"text/template"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdExport.Run = runExport // break init cycle
|
||||||
|
cmdExport.IsDebug = cmdExport.Flag.Bool("debug", false, "enable debug mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}`
|
||||||
|
)
|
||||||
|
|
||||||
|
var cmdExport = &Command{
|
||||||
|
UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}}",
|
||||||
|
Short: "list or export files from one volume data file",
|
||||||
|
Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified.
|
||||||
|
|
||||||
|
The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{Key}}.
|
||||||
|
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files")
|
||||||
|
exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
|
||||||
|
dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
|
||||||
|
format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}")
|
||||||
|
tarFh *tar.Writer
|
||||||
|
tarHeader tar.Header
|
||||||
|
fnTmpl *template.Template
|
||||||
|
fnTmplBuf = bytes.NewBuffer(nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
func runExport(cmd *Command, args []string) bool {
|
||||||
|
|
||||||
|
if *exportVolumeId == -1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if *dest != "" {
|
||||||
|
if *dest != "-" && !strings.HasSuffix(*dest, ".tar") {
|
||||||
|
fmt.Println("the output file", *dest, "should be '-' or end with .tar")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if fnTmpl, err = template.New("name").Parse(*format); err != nil {
|
||||||
|
fmt.Println("cannot parse format " + *format + ": " + err.Error())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var fh *os.File
|
||||||
|
if *dest == "-" {
|
||||||
|
fh = os.Stdout
|
||||||
|
} else {
|
||||||
|
if fh, err = os.Create(*dest); err != nil {
|
||||||
|
log.Fatalf("cannot open output tar %s: %s", *dest, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer fh.Close()
|
||||||
|
tarFh = tar.NewWriter(fh)
|
||||||
|
defer tarFh.Close()
|
||||||
|
t := time.Now()
|
||||||
|
tarHeader = tar.Header{Mode: 0644,
|
||||||
|
ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(),
|
||||||
|
Typeflag: tar.TypeReg,
|
||||||
|
AccessTime: t, ChangeTime: t}
|
||||||
|
}
|
||||||
|
|
||||||
|
fileName := strconv.Itoa(*exportVolumeId)
|
||||||
|
vid := storage.VolumeId(*exportVolumeId)
|
||||||
|
indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Create Volume Index [ERROR] %s\n", err)
|
||||||
|
}
|
||||||
|
defer indexFile.Close()
|
||||||
|
|
||||||
|
nm := storage.LoadNeedleMap(indexFile)
|
||||||
|
|
||||||
|
var version storage.Version
|
||||||
|
|
||||||
|
err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error {
|
||||||
|
version = superBlock.Version
|
||||||
|
return nil
|
||||||
|
}, func(n *storage.Needle, offset uint32) error {
|
||||||
|
debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
|
||||||
|
nv, ok := nm.Get(n.Id)
|
||||||
|
if ok && nv.Size > 0 {
|
||||||
|
return walker(vid, n, version)
|
||||||
|
} else {
|
||||||
|
if !ok {
|
||||||
|
debug("This seems deleted", n.Id)
|
||||||
|
} else {
|
||||||
|
debug("Id", n.Id, "size", n.Size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Export Volume File [ERROR] %s\n", err)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
type nameParams struct {
|
||||||
|
Name string
|
||||||
|
Id uint64
|
||||||
|
Mime string
|
||||||
|
Key string
|
||||||
|
}
|
||||||
|
|
||||||
|
func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) {
|
||||||
|
key := directory.NewFileId(vid, n.Id, n.Cookie).String()
|
||||||
|
if tarFh != nil {
|
||||||
|
fnTmplBuf.Reset()
|
||||||
|
if err = fnTmpl.Execute(fnTmplBuf,
|
||||||
|
nameParams{Name: string(n.Name),
|
||||||
|
Id: n.Id,
|
||||||
|
Mime: string(n.Mime),
|
||||||
|
Key: key,
|
||||||
|
},
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nm := fnTmplBuf.String()
|
||||||
|
|
||||||
|
if n.IsGzipped() && path.Ext(nm) != ".gz" {
|
||||||
|
nm = nm + ".gz"
|
||||||
|
}
|
||||||
|
|
||||||
|
tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data))
|
||||||
|
if err = tarFh.WriteHeader(&tarHeader); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = tarFh.Write(n.Data)
|
||||||
|
} else {
|
||||||
|
size := n.DataSize
|
||||||
|
if version == storage.Version1 {
|
||||||
|
size = n.Size
|
||||||
|
}
|
||||||
|
fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n",
|
||||||
|
key,
|
||||||
|
n.Name,
|
||||||
|
size,
|
||||||
|
n.IsGzipped(),
|
||||||
|
n.Mime,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
64
src/fix.go
Normal file
64
src/fix.go
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"code.google.com/p/weed-fs/weed/storage"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdFix.Run = runFix // break init cycle
|
||||||
|
cmdFix.IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
var cmdFix = &Command{
|
||||||
|
UsageLine: "fix -dir=/tmp -volumeId=234",
|
||||||
|
Short: "run weed tool fix on index file if corrupted",
|
||||||
|
Long: `Fix runs the WeedFS fix command to re-create the index .idx file.
|
||||||
|
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files")
|
||||||
|
fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
|
||||||
|
)
|
||||||
|
|
||||||
|
func runFix(cmd *Command, args []string) bool {
|
||||||
|
|
||||||
|
if *fixVolumeId == -1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
fileName := strconv.Itoa(*fixVolumeId)
|
||||||
|
indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Create Volume Index [ERROR] %s\n", err)
|
||||||
|
}
|
||||||
|
defer indexFile.Close()
|
||||||
|
|
||||||
|
nm := storage.NewNeedleMap(indexFile)
|
||||||
|
defer nm.Close()
|
||||||
|
|
||||||
|
vid := storage.VolumeId(*fixVolumeId)
|
||||||
|
err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error {
|
||||||
|
return nil
|
||||||
|
}, func(n *storage.Needle, offset uint32) error {
|
||||||
|
debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
|
||||||
|
if n.Size > 0 {
|
||||||
|
count, pe := nm.Put(n.Id, offset/storage.NeedlePaddingSize, n.Size)
|
||||||
|
debug("saved", count, "with error", pe)
|
||||||
|
} else {
|
||||||
|
debug("skipping deleted file ...")
|
||||||
|
nm.Delete(n.Id)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Export Volume File [ERROR] %s\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
217
src/master.go
Normal file
217
src/master.go
Normal file
|
@ -0,0 +1,217 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"code.google.com/p/weed-fs/weed/replication"
|
||||||
|
"code.google.com/p/weed-fs/weed/storage"
|
||||||
|
"code.google.com/p/weed-fs/weed/topology"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdMaster.Run = runMaster // break init cycle
|
||||||
|
cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
var cmdMaster = &Command{
|
||||||
|
UsageLine: "master -port=9333",
|
||||||
|
Short: "start a master server",
|
||||||
|
Long: `start a master server to provide volume=>location mapping service
|
||||||
|
and sequence number of file ids
|
||||||
|
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
|
||||||
|
metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings")
|
||||||
|
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
|
||||||
|
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
|
||||||
|
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
|
||||||
|
defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
|
||||||
|
mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
|
||||||
|
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
||||||
|
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
|
||||||
|
)
|
||||||
|
|
||||||
|
var topo *topology.Topology
|
||||||
|
var vg *replication.VolumeGrowth
|
||||||
|
|
||||||
|
func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
vid := r.FormValue("volumeId")
|
||||||
|
commaSep := strings.Index(vid, ",")
|
||||||
|
if commaSep > 0 {
|
||||||
|
vid = vid[0:commaSep]
|
||||||
|
}
|
||||||
|
volumeId, err := storage.NewVolumeId(vid)
|
||||||
|
if err == nil {
|
||||||
|
machines := topo.Lookup(volumeId)
|
||||||
|
if machines != nil {
|
||||||
|
ret := []map[string]string{}
|
||||||
|
for _, dn := range machines {
|
||||||
|
ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl})
|
||||||
|
}
|
||||||
|
writeJson(w, r, map[string]interface{}{"locations": ret})
|
||||||
|
} else {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
w.WriteHeader(http.StatusNotAcceptable)
|
||||||
|
writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
c, e := strconv.Atoi(r.FormValue("count"))
|
||||||
|
if e != nil {
|
||||||
|
c = 1
|
||||||
|
}
|
||||||
|
repType := r.FormValue("replication")
|
||||||
|
if repType == "" {
|
||||||
|
repType = *defaultRepType
|
||||||
|
}
|
||||||
|
rt, err := storage.NewReplicationTypeFromString(repType)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusNotAcceptable)
|
||||||
|
writeJson(w, r, map[string]string{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
|
||||||
|
if topo.FreeSpace() <= 0 {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
writeJson(w, r, map[string]string{"error": "No free volumes left!"})
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
vg.GrowByType(rt, topo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fid, count, dn, err := topo.PickForWrite(rt, c)
|
||||||
|
if err == nil {
|
||||||
|
writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
|
||||||
|
} else {
|
||||||
|
w.WriteHeader(http.StatusNotAcceptable)
|
||||||
|
writeJson(w, r, map[string]string{"error": err.Error()})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
init := r.FormValue("init") == "true"
|
||||||
|
ip := r.FormValue("ip")
|
||||||
|
if ip == "" {
|
||||||
|
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
|
||||||
|
}
|
||||||
|
port, _ := strconv.Atoi(r.FormValue("port"))
|
||||||
|
maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
|
||||||
|
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
|
||||||
|
publicUrl := r.FormValue("publicUrl")
|
||||||
|
volumes := new([]storage.VolumeInfo)
|
||||||
|
json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
|
||||||
|
debug(s, "volumes", r.FormValue("volumes"))
|
||||||
|
topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
|
||||||
|
writeJson(w, r, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
m["Version"] = VERSION
|
||||||
|
m["Topology"] = topo.ToMap()
|
||||||
|
writeJson(w, r, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
gcThreshold := r.FormValue("garbageThreshold")
|
||||||
|
if gcThreshold == "" {
|
||||||
|
gcThreshold = *garbageThreshold
|
||||||
|
}
|
||||||
|
debug("garbageThreshold =", gcThreshold)
|
||||||
|
topo.Vacuum(gcThreshold)
|
||||||
|
dirStatusHandler(w, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
count := 0
|
||||||
|
rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
|
||||||
|
if err == nil {
|
||||||
|
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
|
||||||
|
if topo.FreeSpace() < count*rt.GetCopyCount() {
|
||||||
|
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
|
||||||
|
} else {
|
||||||
|
count, err = vg.GrowByCountAndType(count, rt, topo)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = errors.New("parameter count is not found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusNotAcceptable)
|
||||||
|
writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()})
|
||||||
|
} else {
|
||||||
|
w.WriteHeader(http.StatusNotAcceptable)
|
||||||
|
writeJson(w, r, map[string]interface{}{"count": count})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
m["Version"] = VERSION
|
||||||
|
m["Volumes"] = topo.ToVolumeMap()
|
||||||
|
writeJson(w, r, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func redirectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
vid, _, _ := parseURLPath(r.URL.Path)
|
||||||
|
volumeId, err := storage.NewVolumeId(vid)
|
||||||
|
if err != nil {
|
||||||
|
debug("parsing error:", err, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
machines := topo.Lookup(volumeId)
|
||||||
|
if machines != nil && len(machines) > 0 {
|
||||||
|
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
|
||||||
|
} else {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runMaster(cmd *Command, args []string) bool {
|
||||||
|
if *mMaxCpu < 1 {
|
||||||
|
*mMaxCpu = runtime.NumCPU()
|
||||||
|
}
|
||||||
|
runtime.GOMAXPROCS(*mMaxCpu)
|
||||||
|
topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
|
||||||
|
vg = replication.NewDefaultVolumeGrowth()
|
||||||
|
log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
|
||||||
|
http.HandleFunc("/dir/assign", dirAssignHandler)
|
||||||
|
http.HandleFunc("/dir/lookup", dirLookupHandler)
|
||||||
|
http.HandleFunc("/dir/join", dirJoinHandler)
|
||||||
|
http.HandleFunc("/dir/status", dirStatusHandler)
|
||||||
|
http.HandleFunc("/vol/grow", volumeGrowHandler)
|
||||||
|
http.HandleFunc("/vol/status", volumeStatusHandler)
|
||||||
|
http.HandleFunc("/vol/vacuum", volumeVacuumHandler)
|
||||||
|
|
||||||
|
http.HandleFunc("/", redirectHandler)
|
||||||
|
|
||||||
|
topo.StartRefreshWritableVolumes(*garbageThreshold)
|
||||||
|
|
||||||
|
log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport))
|
||||||
|
srv := &http.Server{
|
||||||
|
Addr: ":" + strconv.Itoa(*mport),
|
||||||
|
Handler: http.DefaultServeMux,
|
||||||
|
ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
|
||||||
|
}
|
||||||
|
e := srv.ListenAndServe()
|
||||||
|
if e != nil {
|
||||||
|
log.Fatalf("Fail to start:%s", e.Error())
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
53
src/shell.go
Normal file
53
src/shell.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdShell.Run = runShell // break init cycle
|
||||||
|
}
|
||||||
|
|
||||||
|
var cmdShell = &Command{
|
||||||
|
UsageLine: "shell",
|
||||||
|
Short: "run interactive commands, now just echo",
|
||||||
|
Long: `run interactive commands.
|
||||||
|
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
var ()
|
||||||
|
|
||||||
|
func runShell(command *Command, args []string) bool {
|
||||||
|
r := bufio.NewReader(os.Stdin)
|
||||||
|
o := bufio.NewWriter(os.Stdout)
|
||||||
|
e := bufio.NewWriter(os.Stderr)
|
||||||
|
prompt := func() {
|
||||||
|
o.WriteString("> ")
|
||||||
|
o.Flush()
|
||||||
|
}
|
||||||
|
readLine := func() string {
|
||||||
|
ret, err := r.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprint(e, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
execCmd := func(cmd string) int {
|
||||||
|
if cmd != "" {
|
||||||
|
o.WriteString(cmd)
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := ""
|
||||||
|
for {
|
||||||
|
prompt()
|
||||||
|
cmd = readLine()
|
||||||
|
execCmd(cmd)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
113
src/upload.go
Normal file
113
src/upload.go
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"code.google.com/p/weed-fs/weed/operation"
|
||||||
|
"code.google.com/p/weed-fs/weed/util"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
var uploadReplication *string
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdUpload.Run = runUpload // break init cycle
|
||||||
|
cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information")
|
||||||
|
server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location")
|
||||||
|
uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)")
|
||||||
|
}
|
||||||
|
|
||||||
|
var cmdUpload = &Command{
|
||||||
|
UsageLine: "upload -server=localhost:9333 file1 [file2 file3]",
|
||||||
|
Short: "upload one or a list of files",
|
||||||
|
Long: `upload one or a list of files.
|
||||||
|
It uses consecutive file keys for the list of files.
|
||||||
|
e.g. If the file1 uses key k, file2 can be read via k_1
|
||||||
|
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
type AssignResult struct {
|
||||||
|
Fid string "fid"
|
||||||
|
Url string "url"
|
||||||
|
PublicUrl string "publicUrl"
|
||||||
|
Count int
|
||||||
|
Error string "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
func assign(count int) (*AssignResult, error) {
|
||||||
|
values := make(url.Values)
|
||||||
|
values.Add("count", strconv.Itoa(count))
|
||||||
|
values.Add("replication", *uploadReplication)
|
||||||
|
jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values)
|
||||||
|
debug("assign result :", string(jsonBlob))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var ret AssignResult
|
||||||
|
err = json.Unmarshal(jsonBlob, &ret)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if ret.Count <= 0 {
|
||||||
|
return nil, errors.New(ret.Error)
|
||||||
|
}
|
||||||
|
return &ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func upload(filename string, server string, fid string) (int, error) {
|
||||||
|
debug("Start uploading file:", filename)
|
||||||
|
fh, err := os.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
debug("Failed to open file:", filename)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
ret, e := operation.Upload("http://"+server+"/"+fid, path.Base(filename), fh)
|
||||||
|
if e != nil {
|
||||||
|
return 0, e
|
||||||
|
}
|
||||||
|
return ret.Size, e
|
||||||
|
}
|
||||||
|
|
||||||
|
type SubmitResult struct {
|
||||||
|
Fid string "fid"
|
||||||
|
Size int "size"
|
||||||
|
Error string "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
func submit(files []string) []SubmitResult {
|
||||||
|
ret, err := assign(len(files))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
results := make([]SubmitResult, len(files))
|
||||||
|
for index, file := range files {
|
||||||
|
fid := ret.Fid
|
||||||
|
if index > 0 {
|
||||||
|
fid = fid + "_" + strconv.Itoa(index)
|
||||||
|
}
|
||||||
|
results[index].Size, err = upload(file, ret.PublicUrl, fid)
|
||||||
|
if err != nil {
|
||||||
|
fid = ""
|
||||||
|
results[index].Error = err.Error()
|
||||||
|
}
|
||||||
|
results[index].Fid = fid
|
||||||
|
}
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
|
func runUpload(cmd *Command, args []string) bool {
|
||||||
|
*IsDebug = true
|
||||||
|
if len(cmdUpload.Flag.Args()) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
results := submit(args)
|
||||||
|
bytes, _ := json.Marshal(results)
|
||||||
|
fmt.Print(string(bytes))
|
||||||
|
return true
|
||||||
|
}
|
26
src/version.go
Normal file
26
src/version.go
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
VERSION = "0.28 beta"
|
||||||
|
)
|
||||||
|
|
||||||
|
var cmdVersion = &Command{
|
||||||
|
Run: runVersion,
|
||||||
|
UsageLine: "version",
|
||||||
|
Short: "print Weed File System version",
|
||||||
|
Long: `Version prints the Weed File System version`,
|
||||||
|
}
|
||||||
|
|
||||||
|
func runVersion(cmd *Command, args []string) bool {
|
||||||
|
if len(args) != 0 {
|
||||||
|
cmd.Usage()
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
|
||||||
|
return true
|
||||||
|
}
|
378
src/volume.go
Normal file
378
src/volume.go
Normal file
|
@ -0,0 +1,378 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"mime"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"code.google.com/p/weed-fs/weed/operation"
|
||||||
|
"code.google.com/p/weed-fs/weed/storage"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdVolume.Run = runVolume // break init cycle
|
||||||
|
cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
var cmdVolume = &Command{
|
||||||
|
UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333",
|
||||||
|
Short: "start a volume server",
|
||||||
|
Long: `start a volume server to provide storage spaces
|
||||||
|
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
|
||||||
|
volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files")
|
||||||
|
ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
|
||||||
|
publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
|
||||||
|
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
|
||||||
|
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
|
||||||
|
maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes")
|
||||||
|
vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
|
||||||
|
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
||||||
|
|
||||||
|
store *storage.Store
|
||||||
|
)
|
||||||
|
|
||||||
|
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
|
||||||
|
|
||||||
|
func statusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
m["Version"] = VERSION
|
||||||
|
m["Volumes"] = store.Status()
|
||||||
|
writeJson(w, r, m)
|
||||||
|
}
|
||||||
|
func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
|
||||||
|
if err == nil {
|
||||||
|
writeJson(w, r, map[string]string{"error": ""})
|
||||||
|
} else {
|
||||||
|
writeJson(w, r, map[string]string{"error": err.Error()})
|
||||||
|
}
|
||||||
|
debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
|
||||||
|
}
|
||||||
|
func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
|
||||||
|
if err == nil {
|
||||||
|
writeJson(w, r, map[string]interface{}{"error": "", "result": ret})
|
||||||
|
} else {
|
||||||
|
writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false})
|
||||||
|
}
|
||||||
|
debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
|
||||||
|
}
|
||||||
|
func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
err := store.CompactVolume(r.FormValue("volume"))
|
||||||
|
if err == nil {
|
||||||
|
writeJson(w, r, map[string]string{"error": ""})
|
||||||
|
} else {
|
||||||
|
writeJson(w, r, map[string]string{"error": err.Error()})
|
||||||
|
}
|
||||||
|
debug("compacted volume =", r.FormValue("volume"), ", error =", err)
|
||||||
|
}
|
||||||
|
func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
err := store.CommitCompactVolume(r.FormValue("volume"))
|
||||||
|
if err == nil {
|
||||||
|
writeJson(w, r, map[string]interface{}{"error": ""})
|
||||||
|
} else {
|
||||||
|
writeJson(w, r, map[string]string{"error": err.Error()})
|
||||||
|
}
|
||||||
|
debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
|
||||||
|
}
|
||||||
|
func storeHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.Method {
|
||||||
|
case "GET":
|
||||||
|
GetHandler(w, r)
|
||||||
|
case "DELETE":
|
||||||
|
DeleteHandler(w, r)
|
||||||
|
case "POST":
|
||||||
|
PostHandler(w, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func GetHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
n := new(storage.Needle)
|
||||||
|
vid, fid, ext := parseURLPath(r.URL.Path)
|
||||||
|
volumeId, err := storage.NewVolumeId(vid)
|
||||||
|
if err != nil {
|
||||||
|
debug("parsing error:", err, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n.ParsePath(fid)
|
||||||
|
|
||||||
|
debug("volume", volumeId, "reading", n)
|
||||||
|
if !store.HasVolume(volumeId) {
|
||||||
|
lookupResult, err := operation.Lookup(*masterNode, volumeId)
|
||||||
|
debug("volume", volumeId, "found on", lookupResult, "error", err)
|
||||||
|
if err == nil {
|
||||||
|
http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
|
||||||
|
} else {
|
||||||
|
debug("lookup error:", err, r.URL.Path)
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cookie := n.Cookie
|
||||||
|
count, e := store.Read(volumeId, n)
|
||||||
|
debug("read bytes", count, "error", e)
|
||||||
|
if e != nil || count <= 0 {
|
||||||
|
debug("read error:", e, r.URL.Path)
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if n.Cookie != cookie {
|
||||||
|
log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if n.NameSize > 0 {
|
||||||
|
fname := string(n.Name)
|
||||||
|
dotIndex := strings.LastIndex(fname, ".")
|
||||||
|
if dotIndex > 0 {
|
||||||
|
ext = fname[dotIndex:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mtype := ""
|
||||||
|
if ext != "" {
|
||||||
|
mtype = mime.TypeByExtension(ext)
|
||||||
|
}
|
||||||
|
if n.MimeSize > 0 {
|
||||||
|
mtype = string(n.Mime)
|
||||||
|
}
|
||||||
|
if mtype != "" {
|
||||||
|
w.Header().Set("Content-Type", mtype)
|
||||||
|
}
|
||||||
|
if n.NameSize > 0 {
|
||||||
|
w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(string(n.Name)))
|
||||||
|
}
|
||||||
|
if ext != ".gz" {
|
||||||
|
if n.IsGzipped() {
|
||||||
|
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
|
||||||
|
w.Header().Set("Content-Encoding", "gzip")
|
||||||
|
} else {
|
||||||
|
if n.Data, err = storage.UnGzipData(n.Data); err != nil {
|
||||||
|
debug("lookup error:", err, r.URL.Path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
|
||||||
|
w.Write(n.Data)
|
||||||
|
}
|
||||||
|
func PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
r.ParseForm()
|
||||||
|
vid, _, _ := parseURLPath(r.URL.Path)
|
||||||
|
volumeId, e := storage.NewVolumeId(vid)
|
||||||
|
if e != nil {
|
||||||
|
writeJson(w, r, e)
|
||||||
|
} else {
|
||||||
|
needle, filename, ne := storage.NewNeedle(r)
|
||||||
|
if ne != nil {
|
||||||
|
writeJson(w, r, ne)
|
||||||
|
} else {
|
||||||
|
ret, err := store.Write(volumeId, needle)
|
||||||
|
errorStatus := ""
|
||||||
|
needToReplicate := !store.HasVolume(volumeId)
|
||||||
|
if err != nil {
|
||||||
|
errorStatus = "Failed to write to local disk (" + err.Error() + ")"
|
||||||
|
} else if ret > 0 {
|
||||||
|
needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate()
|
||||||
|
} else {
|
||||||
|
errorStatus = "Failed to write to local disk"
|
||||||
|
}
|
||||||
|
if !needToReplicate && ret > 0 {
|
||||||
|
needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
|
||||||
|
}
|
||||||
|
if needToReplicate { //send to other replica locations
|
||||||
|
if r.FormValue("type") != "standard" {
|
||||||
|
if !distributedOperation(volumeId, func(location operation.Location) bool {
|
||||||
|
_, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
|
||||||
|
return err == nil
|
||||||
|
}) {
|
||||||
|
ret = 0
|
||||||
|
errorStatus = "Failed to write to replicas for volume " + volumeId.String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
if errorStatus == "" {
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
} else {
|
||||||
|
store.Delete(volumeId, needle)
|
||||||
|
distributedOperation(volumeId, func(location operation.Location) bool {
|
||||||
|
return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
|
||||||
|
})
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
m["error"] = errorStatus
|
||||||
|
}
|
||||||
|
m["size"] = ret
|
||||||
|
writeJson(w, r, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
n := new(storage.Needle)
|
||||||
|
vid, fid, _ := parseURLPath(r.URL.Path)
|
||||||
|
volumeId, _ := storage.NewVolumeId(vid)
|
||||||
|
n.ParsePath(fid)
|
||||||
|
|
||||||
|
debug("deleting", n)
|
||||||
|
|
||||||
|
cookie := n.Cookie
|
||||||
|
count, ok := store.Read(volumeId, n)
|
||||||
|
|
||||||
|
if ok != nil {
|
||||||
|
m := make(map[string]uint32)
|
||||||
|
m["size"] = 0
|
||||||
|
writeJson(w, r, m)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if n.Cookie != cookie {
|
||||||
|
log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n.Size = 0
|
||||||
|
ret, err := store.Delete(volumeId, n)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("delete error: %s\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
needToReplicate := !store.HasVolume(volumeId)
|
||||||
|
if !needToReplicate && ret > 0 {
|
||||||
|
needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
|
||||||
|
}
|
||||||
|
if needToReplicate { //send to other replica locations
|
||||||
|
if r.FormValue("type") != "standard" {
|
||||||
|
if !distributedOperation(volumeId, func(location operation.Location) bool {
|
||||||
|
return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
|
||||||
|
}) {
|
||||||
|
ret = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ret != 0 {
|
||||||
|
w.WriteHeader(http.StatusAccepted)
|
||||||
|
} else {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
|
||||||
|
m := make(map[string]uint32)
|
||||||
|
m["size"] = uint32(count)
|
||||||
|
writeJson(w, r, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseURLPath(path string) (vid, fid, ext string) {
|
||||||
|
|
||||||
|
sepIndex := strings.LastIndex(path, "/")
|
||||||
|
commaIndex := strings.LastIndex(path[sepIndex:], ",")
|
||||||
|
if commaIndex <= 0 {
|
||||||
|
if "favicon.ico" != path[sepIndex+1:] {
|
||||||
|
log.Println("unknown file id", path[sepIndex+1:])
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dotIndex := strings.LastIndex(path[sepIndex:], ".")
|
||||||
|
vid = path[sepIndex+1 : commaIndex]
|
||||||
|
fid = path[commaIndex+1:]
|
||||||
|
ext = ""
|
||||||
|
if dotIndex > 0 {
|
||||||
|
fid = path[commaIndex+1 : dotIndex]
|
||||||
|
ext = path[dotIndex:]
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
|
||||||
|
if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
|
||||||
|
length := 0
|
||||||
|
selfUrl := (*ip + ":" + strconv.Itoa(*vport))
|
||||||
|
results := make(chan bool)
|
||||||
|
for _, location := range lookupResult.Locations {
|
||||||
|
if location.Url != selfUrl {
|
||||||
|
length++
|
||||||
|
go func(location operation.Location, results chan bool) {
|
||||||
|
results <- op(location)
|
||||||
|
}(location, results)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret := true
|
||||||
|
for i := 0; i < length; i++ {
|
||||||
|
ret = ret && <-results
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
} else {
|
||||||
|
log.Println("Failed to lookup for", volumeId, lookupErr.Error())
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func runVolume(cmd *Command, args []string) bool {
|
||||||
|
if *vMaxCpu < 1 {
|
||||||
|
*vMaxCpu = runtime.NumCPU()
|
||||||
|
}
|
||||||
|
runtime.GOMAXPROCS(*vMaxCpu)
|
||||||
|
fileInfo, err := os.Stat(*volumeFolder)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("No Existing Folder:%s", *volumeFolder)
|
||||||
|
}
|
||||||
|
if !fileInfo.IsDir() {
|
||||||
|
log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
|
||||||
|
}
|
||||||
|
perm := fileInfo.Mode().Perm()
|
||||||
|
log.Println("Volume Folder permission:", perm)
|
||||||
|
|
||||||
|
if *publicUrl == "" {
|
||||||
|
*publicUrl = *ip + ":" + strconv.Itoa(*vport)
|
||||||
|
}
|
||||||
|
|
||||||
|
store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
|
||||||
|
defer store.Close()
|
||||||
|
http.HandleFunc("/", storeHandler)
|
||||||
|
http.HandleFunc("/status", statusHandler)
|
||||||
|
http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
|
||||||
|
http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
|
||||||
|
http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
|
||||||
|
http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
connected := true
|
||||||
|
store.SetMaster(*masterNode)
|
||||||
|
for {
|
||||||
|
err := store.Join()
|
||||||
|
if err == nil {
|
||||||
|
if !connected {
|
||||||
|
connected = true
|
||||||
|
log.Println("Reconnected with master")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if connected {
|
||||||
|
connected = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
log.Println("store joined at", *masterNode)
|
||||||
|
|
||||||
|
log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
|
||||||
|
srv := &http.Server{
|
||||||
|
Addr: ":" + strconv.Itoa(*vport),
|
||||||
|
Handler: http.DefaultServeMux,
|
||||||
|
ReadTimeout: (time.Duration(*vReadTimeout) * time.Second),
|
||||||
|
}
|
||||||
|
e := srv.ListenAndServe()
|
||||||
|
if e != nil {
|
||||||
|
log.Fatalf("Fail to start:%s", e.Error())
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
199
src/weed.go
Normal file
199
src/weed.go
Normal file
|
@ -0,0 +1,199 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"text/template"
|
||||||
|
"time"
|
||||||
|
"unicode"
|
||||||
|
"unicode/utf8"
|
||||||
|
)
|
||||||
|
|
||||||
|
var IsDebug *bool
|
||||||
|
var server *string
|
||||||
|
|
||||||
|
var commands = []*Command{
|
||||||
|
cmdFix,
|
||||||
|
cmdMaster,
|
||||||
|
cmdUpload,
|
||||||
|
cmdShell,
|
||||||
|
cmdVersion,
|
||||||
|
cmdVolume,
|
||||||
|
cmdExport,
|
||||||
|
}
|
||||||
|
|
||||||
|
var exitStatus = 0
|
||||||
|
var exitMu sync.Mutex
|
||||||
|
|
||||||
|
func setExitStatus(n int) {
|
||||||
|
exitMu.Lock()
|
||||||
|
if exitStatus < n {
|
||||||
|
exitStatus = n
|
||||||
|
}
|
||||||
|
exitMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
flag.Usage = usage
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
args := flag.Args()
|
||||||
|
if len(args) < 1 {
|
||||||
|
usage()
|
||||||
|
}
|
||||||
|
|
||||||
|
if args[0] == "help" {
|
||||||
|
help(args[1:])
|
||||||
|
for _, cmd := range commands {
|
||||||
|
if len(args) >= 2 && cmd.Name() == args[1] && cmd.Run != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Default Parameters:\n")
|
||||||
|
cmd.Flag.PrintDefaults()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, cmd := range commands {
|
||||||
|
if cmd.Name() == args[0] && cmd.Run != nil {
|
||||||
|
cmd.Flag.Usage = func() { cmd.Usage() }
|
||||||
|
cmd.Flag.Parse(args[1:])
|
||||||
|
args = cmd.Flag.Args()
|
||||||
|
IsDebug = cmd.IsDebug
|
||||||
|
if !cmd.Run(cmd, args) {
|
||||||
|
fmt.Fprintf(os.Stderr, "\n")
|
||||||
|
cmd.Flag.Usage()
|
||||||
|
fmt.Fprintf(os.Stderr, "Default Parameters:\n")
|
||||||
|
cmd.Flag.PrintDefaults()
|
||||||
|
}
|
||||||
|
exit()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(os.Stderr, "weed: unknown subcommand %q\nRun 'weed help' for usage.\n", args[0])
|
||||||
|
setExitStatus(2)
|
||||||
|
exit()
|
||||||
|
}
|
||||||
|
|
||||||
|
var usageTemplate = `WeedFS is a software to store billions of files and serve them fast!
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
|
||||||
|
weed command [arguments]
|
||||||
|
|
||||||
|
The commands are:
|
||||||
|
{{range .}}{{if .Runnable}}
|
||||||
|
{{.Name | printf "%-11s"}} {{.Short}}{{end}}{{end}}
|
||||||
|
|
||||||
|
Use "weed help [command]" for more information about a command.
|
||||||
|
|
||||||
|
`
|
||||||
|
|
||||||
|
var helpTemplate = `{{if .Runnable}}Usage: weed {{.UsageLine}}
|
||||||
|
{{end}}
|
||||||
|
{{.Long}}
|
||||||
|
`
|
||||||
|
|
||||||
|
// tmpl executes the given template text on data, writing the result to w.
|
||||||
|
func tmpl(w io.Writer, text string, data interface{}) {
|
||||||
|
t := template.New("top")
|
||||||
|
t.Funcs(template.FuncMap{"trim": strings.TrimSpace, "capitalize": capitalize})
|
||||||
|
template.Must(t.Parse(text))
|
||||||
|
if err := t.Execute(w, data); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func capitalize(s string) string {
|
||||||
|
if s == "" {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
r, n := utf8.DecodeRuneInString(s)
|
||||||
|
return string(unicode.ToTitle(r)) + s[n:]
|
||||||
|
}
|
||||||
|
|
||||||
|
func printUsage(w io.Writer) {
|
||||||
|
tmpl(w, usageTemplate, commands)
|
||||||
|
}
|
||||||
|
|
||||||
|
func usage() {
|
||||||
|
printUsage(os.Stderr)
|
||||||
|
os.Exit(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// help implements the 'help' command.
|
||||||
|
func help(args []string) {
|
||||||
|
if len(args) == 0 {
|
||||||
|
printUsage(os.Stdout)
|
||||||
|
// not exit 2: succeeded at 'weed help'.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(args) != 1 {
|
||||||
|
fmt.Fprintf(os.Stderr, "usage: weed help command\n\nToo many arguments given.\n")
|
||||||
|
os.Exit(2) // failed at 'weed help'
|
||||||
|
}
|
||||||
|
|
||||||
|
arg := args[0]
|
||||||
|
|
||||||
|
for _, cmd := range commands {
|
||||||
|
if cmd.Name() == arg {
|
||||||
|
tmpl(os.Stdout, helpTemplate, cmd)
|
||||||
|
// not exit 2: succeeded at 'weed help cmd'.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(os.Stderr, "Unknown help topic %#q. Run 'weed help'.\n", arg)
|
||||||
|
os.Exit(2) // failed at 'weed help cmd'
|
||||||
|
}
|
||||||
|
|
||||||
|
var atexitFuncs []func()
|
||||||
|
|
||||||
|
func atexit(f func()) {
|
||||||
|
atexitFuncs = append(atexitFuncs, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func exit() {
|
||||||
|
for _, f := range atexitFuncs {
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
os.Exit(exitStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
func exitIfErrors() {
|
||||||
|
if exitStatus != 0 {
|
||||||
|
exit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
|
||||||
|
w.Header().Set("Content-Type", "application/javascript")
|
||||||
|
var bytes []byte
|
||||||
|
if r.FormValue("pretty") != "" {
|
||||||
|
bytes, _ = json.MarshalIndent(obj, "", " ")
|
||||||
|
} else {
|
||||||
|
bytes, _ = json.Marshal(obj)
|
||||||
|
}
|
||||||
|
callback := r.FormValue("callback")
|
||||||
|
if callback == "" {
|
||||||
|
w.Write(bytes)
|
||||||
|
} else {
|
||||||
|
w.Write([]uint8(callback))
|
||||||
|
w.Write([]uint8("("))
|
||||||
|
fmt.Fprint(w, string(bytes))
|
||||||
|
w.Write([]uint8(")"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func debug(params ...interface{}) {
|
||||||
|
if *IsDebug {
|
||||||
|
fmt.Println(params)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue