From cdae9fc680c8e7f99c0ef47dd98f674df76e5078 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 20 Jul 2016 23:45:55 -0700 Subject: [PATCH] add "weed copy" command to copy files to filer --- weed/command/command.go | 1 + weed/command/copy.go | 147 +++++++++++++++++++++ weed/command/server.go | 3 +- weed/command/upload.go | 10 +- weed/operation/filer/register.go | 31 +++++ weed/server/filer_server.go | 12 +- weed/server/filer_server_handlers_admin.go | 12 ++ weed/util/http_util.go | 11 +- 8 files changed, 217 insertions(+), 10 deletions(-) create mode 100644 weed/command/copy.go create mode 100644 weed/operation/filer/register.go diff --git a/weed/command/command.go b/weed/command/command.go index d654f57cd..c451936e5 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -11,6 +11,7 @@ var Commands = []*Command{ cmdBenchmark, cmdBackup, cmdCompact, + cmdCopy, cmdFix, cmdServer, cmdMaster, diff --git a/weed/command/copy.go b/weed/command/copy.go new file mode 100644 index 000000000..0e7109daf --- /dev/null +++ b/weed/command/copy.go @@ -0,0 +1,147 @@ +package command + +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "path/filepath" + "strings" + + "github.com/chrislusf/seaweedfs/weed/operation" + filer_operation "github.com/chrislusf/seaweedfs/weed/operation/filer" + "github.com/chrislusf/seaweedfs/weed/security" +) + +var ( + copy CopyOptions +) + +type CopyOptions struct { + master *string + include *string + replication *string + collection *string + ttl *string + maxMB *int + secretKey *string + + secret security.Secret +} + +func init() { + cmdCopy.Run = runCopy // break init cycle + cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information") + copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location") + copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") + copy.replication = cmdCopy.Flag.String("replication", "", "replication type") + copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") + copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") + copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit") + copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") +} + +var cmdCopy = &Command{ + UsageLine: "copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/", + Short: "copy one or a list of files to a filer folder", + Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder + + It can copy one or a list of files or folders. + + If copying a whole folder recursively: + All files under the folder and subfolders will be copyed. + Optional parameter "-include" allows you to specify the file name patterns. + + If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is. + This can save volume server's gzipped processing and allow customizable gzip compression level. + The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js". + + If "maxMB" is set to a positive number, files larger than it would be split into chunks and copyed separatedly. + The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned. + + `, +} + +func runCopy(cmd *Command, args []string) bool { + copy.secret = security.Secret(*copy.secretKey) + if len(args) <= 1 { + return false + } + filerDestination := args[len(args)-1] + fileOrDirs := args[0 : len(args)-1] + + filerUrl, err := url.Parse(filerDestination) + if err != nil { + fmt.Printf("The last argument should be a URL on filer: %v\n", err) + return false + } + if len(fileOrDirs) > 1 && !strings.HasSuffix(filerUrl.Path, "/") { + fmt.Println("Can not copy multiple items to a file. The last argument must be a folder ended with \"/\"") + return false + } + + for _, fileOrDir := range fileOrDirs { + if !doEachCopy(fileOrDir, filerUrl.Host, filerUrl.Path) { + return false + } + } + return true +} + +func doEachCopy(fileOrDir string, host string, path string) bool { + f, err := os.Open(fileOrDir) + if err != nil { + fmt.Printf("Failed to open file %s: %v", fileOrDir, err) + return false + } + defer f.Close() + + fi, err := f.Stat() + if err != nil { + fmt.Printf("Failed to get stat for file %s: %v", fileOrDir, err) + return false + } + + mode := fi.Mode() + if mode.IsDir() { + files, _ := ioutil.ReadDir(fileOrDir) + for _, subFileOrDir := range files { + if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), host, path+fi.Name()+"/") { + return false + } + } + return true + } + + // this is a regular file + if *copy.include != "" { + if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok { + return true + } + } + + parts, err := operation.NewFileParts([]string{fileOrDir}) + if err != nil { + fmt.Printf("Failed to read file %s: %v", fileOrDir, err) + } + + results, err := operation.SubmitFiles(*copy.master, parts, + *copy.replication, *copy.collection, + *copy.ttl, *copy.maxMB, copy.secret) + if err != nil { + fmt.Printf("Failed to submit file %s: %v", fileOrDir, err) + } + + if strings.HasSuffix(path, "/") { + path = path + fi.Name() + } + + if err = filer_operation.RegisterFile(host, path, results[0].Fid, copy.secret); err != nil { + fmt.Printf("Failed to register file %s on %s: %v", fileOrDir, host, err) + return false + } + + fmt.Printf("Copy %s => http://%s/%s\n", fileOrDir, host, path) + + return true +} diff --git a/weed/command/server.go b/weed/command/server.go index 6ed1e5228..1211c7137 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -82,7 +82,7 @@ func init() { filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") - filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") + filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -dir is specified") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") @@ -164,6 +164,7 @@ func runServer(cmd *Command, args []string) bool { if *isStartingFiler { go func() { + time.Sleep(1 * time.Second) r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, diff --git a/weed/command/upload.go b/weed/command/upload.go index 0dfa115bb..1f0696f70 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -15,7 +15,7 @@ var ( ) type UploadOptions struct { - server *string + master *string dir *string include *string replication *string @@ -28,7 +28,7 @@ type UploadOptions struct { func init() { cmdUpload.Run = runUpload // break init cycle cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") - upload.server = cmdUpload.Flag.String("server", "localhost:9333", "SeaweedFS master location") + upload.master = cmdUpload.Flag.String("master", "localhost:9333", "SeaweedFS master location") upload.dir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.") upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") upload.replication = cmdUpload.Flag.String("replication", "", "replication type") @@ -39,7 +39,7 @@ func init() { } var cmdUpload = &Command{ - UsageLine: "upload -server=localhost:9333 file1 [file2 file3]\n weed upload -server=localhost:9333 -dir=one_directory -include=*.pdf", + UsageLine: "upload -master=localhost:9333 file1 [file2 file3]\n weed upload -server=localhost:9333 -dir=one_directory -include=*.pdf", Short: "upload one or a list of files", Long: `upload one or a list of files, or batch upload one whole folder recursively. @@ -79,7 +79,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*upload.server, parts, + results, e := operation.SubmitFiles(*upload.master, parts, *upload.replication, *upload.collection, *upload.ttl, *upload.maxMB, secret) bytes, _ := json.Marshal(results) @@ -98,7 +98,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*upload.server, parts, + results, _ := operation.SubmitFiles(*upload.master, parts, *upload.replication, *upload.collection, *upload.ttl, *upload.maxMB, secret) bytes, _ := json.Marshal(results) diff --git a/weed/operation/filer/register.go b/weed/operation/filer/register.go new file mode 100644 index 000000000..2c4703680 --- /dev/null +++ b/weed/operation/filer/register.go @@ -0,0 +1,31 @@ +package operation + +import ( + "fmt" + "net/url" + + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type SubmitResult struct { + FileName string `json:"fileName,omitempty"` + FileUrl string `json:"fileUrl,omitempty"` + Fid string `json:"fid,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` +} + +func RegisterFile(filer string, path string, fileId string, secret security.Secret) error { + // TODO: jwt need to be used + _ = security.GenJwt(secret, fileId) + + values := make(url.Values) + values.Add("path", path) + values.Add("fileId", fileId) + _, err := util.Post("http://"+filer+"/admin/register", values) + if err != nil { + return fmt.Errorf("Failed to register path:%s on filer:%s to file id:%s", path, filer, fileId) + } + return nil +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 1b54f0840..b99bbd7c9 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -62,6 +62,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st } r.HandleFunc("/admin/mv", fs.moveHandler) + r.HandleFunc("/admin/register", fs.registerHandler) } r.HandleFunc("/", fs.filerHandler) @@ -73,9 +74,14 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode()) //force initialize with all available master nodes - _, err := fs.masterNodes.FindMaster() - if err != nil { - glog.Fatalf("filer server failed to get master cluster info:%s", err.Error()) + for { + _, err := fs.masterNodes.FindMaster() + if err != nil { + glog.Infof("filer server failed to get master cluster info:%s", err.Error()) + time.Sleep(3 * time.Second) + } else { + break + } } for { diff --git a/weed/server/filer_server_handlers_admin.go b/weed/server/filer_server_handlers_admin.go index 979ad517b..aa7c09986 100644 --- a/weed/server/filer_server_handlers_admin.go +++ b/weed/server/filer_server_handlers_admin.go @@ -27,3 +27,15 @@ func (fs *FilerServer) moveHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } } + +func (fs *FilerServer) registerHandler(w http.ResponseWriter, r *http.Request) { + path := r.FormValue("path") + fileId := r.FormValue("fileId") + err := fs.filer.CreateFile(path, fileId) + if err != nil { + glog.V(4).Infof("register %s to %s error: %v", fileId, path, err) + writeJsonError(w, r, http.StatusInternalServerError, err) + } else { + w.WriteHeader(http.StatusOK) + } +} diff --git a/weed/util/http_util.go b/weed/util/http_util.go index a54fc8779..2379e3b2b 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -32,6 +32,9 @@ func PostBytes(url string, body []byte) ([]byte, error) { return nil, fmt.Errorf("Post to %s: %v", url, err) } defer r.Body.Close() + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } b, err := ioutil.ReadAll(r.Body) if err != nil { return nil, fmt.Errorf("Read response body: %v", err) @@ -45,6 +48,9 @@ func Post(url string, values url.Values) ([]byte, error) { return nil, err } defer r.Body.Close() + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } b, err := ioutil.ReadAll(r.Body) if err != nil { return nil, err @@ -59,7 +65,7 @@ func Get(url string) ([]byte, error) { } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) - if r.StatusCode != 200 { + if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", url, r.Status) } if err != nil { @@ -81,6 +87,9 @@ func Delete(url string, jwt security.EncodedJwt) error { return e } defer resp.Body.Close() + if resp.StatusCode >= 400 { + return fmt.Errorf("%s: %s", url, resp.Status) + } body, err := ioutil.ReadAll(resp.Body) if err != nil { return err