add "weed copy" command to copy files to filer

This commit is contained in:
Chris Lu 2016-07-20 23:45:55 -07:00
parent 40ba6d2a6f
commit cdae9fc680
8 changed files with 217 additions and 10 deletions

View file

@ -11,6 +11,7 @@ var Commands = []*Command{
cmdBenchmark, cmdBenchmark,
cmdBackup, cmdBackup,
cmdCompact, cmdCompact,
cmdCopy,
cmdFix, cmdFix,
cmdServer, cmdServer,
cmdMaster, cmdMaster,

147
weed/command/copy.go Normal file
View file

@ -0,0 +1,147 @@
package command
import (
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/operation"
filer_operation "github.com/chrislusf/seaweedfs/weed/operation/filer"
"github.com/chrislusf/seaweedfs/weed/security"
)
var (
copy CopyOptions
)
type CopyOptions struct {
master *string
include *string
replication *string
collection *string
ttl *string
maxMB *int
secretKey *string
secret security.Secret
}
func init() {
cmdCopy.Run = runCopy // break init cycle
cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
}
var cmdCopy = &Command{
UsageLine: "copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
Short: "copy one or a list of files to a filer folder",
Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
It can copy one or a list of files or folders.
If copying a whole folder recursively:
All files under the folder and subfolders will be copyed.
Optional parameter "-include" allows you to specify the file name patterns.
If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is.
This can save volume server's gzipped processing and allow customizable gzip compression level.
The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js".
If "maxMB" is set to a positive number, files larger than it would be split into chunks and copyed separatedly.
The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
`,
}
func runCopy(cmd *Command, args []string) bool {
copy.secret = security.Secret(*copy.secretKey)
if len(args) <= 1 {
return false
}
filerDestination := args[len(args)-1]
fileOrDirs := args[0 : len(args)-1]
filerUrl, err := url.Parse(filerDestination)
if err != nil {
fmt.Printf("The last argument should be a URL on filer: %v\n", err)
return false
}
if len(fileOrDirs) > 1 && !strings.HasSuffix(filerUrl.Path, "/") {
fmt.Println("Can not copy multiple items to a file. The last argument must be a folder ended with \"/\"")
return false
}
for _, fileOrDir := range fileOrDirs {
if !doEachCopy(fileOrDir, filerUrl.Host, filerUrl.Path) {
return false
}
}
return true
}
func doEachCopy(fileOrDir string, host string, path string) bool {
f, err := os.Open(fileOrDir)
if err != nil {
fmt.Printf("Failed to open file %s: %v", fileOrDir, err)
return false
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
fmt.Printf("Failed to get stat for file %s: %v", fileOrDir, err)
return false
}
mode := fi.Mode()
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
for _, subFileOrDir := range files {
if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), host, path+fi.Name()+"/") {
return false
}
}
return true
}
// this is a regular file
if *copy.include != "" {
if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
return true
}
}
parts, err := operation.NewFileParts([]string{fileOrDir})
if err != nil {
fmt.Printf("Failed to read file %s: %v", fileOrDir, err)
}
results, err := operation.SubmitFiles(*copy.master, parts,
*copy.replication, *copy.collection,
*copy.ttl, *copy.maxMB, copy.secret)
if err != nil {
fmt.Printf("Failed to submit file %s: %v", fileOrDir, err)
}
if strings.HasSuffix(path, "/") {
path = path + fi.Name()
}
if err = filer_operation.RegisterFile(host, path, results[0].Fid, copy.secret); err != nil {
fmt.Printf("Failed to register file %s on %s: %v", fileOrDir, host, err)
return false
}
fmt.Printf("Copy %s => http://%s/%s\n", fileOrDir, host, path)
return true
}

View file

@ -82,7 +82,7 @@ func init() {
filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server") filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -dir is specified")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
@ -164,6 +164,7 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingFiler { if *isStartingFiler {
go func() { go func() {
time.Sleep(1 * time.Second)
r := http.NewServeMux() r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement, *filerOptions.defaultReplicaPlacement,

View file

@ -15,7 +15,7 @@ var (
) )
type UploadOptions struct { type UploadOptions struct {
server *string master *string
dir *string dir *string
include *string include *string
replication *string replication *string
@ -28,7 +28,7 @@ type UploadOptions struct {
func init() { func init() {
cmdUpload.Run = runUpload // break init cycle cmdUpload.Run = runUpload // break init cycle
cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information")
upload.server = cmdUpload.Flag.String("server", "localhost:9333", "SeaweedFS master location") upload.master = cmdUpload.Flag.String("master", "localhost:9333", "SeaweedFS master location")
upload.dir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.") upload.dir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.")
upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
upload.replication = cmdUpload.Flag.String("replication", "", "replication type") upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
@ -39,7 +39,7 @@ func init() {
} }
var cmdUpload = &Command{ var cmdUpload = &Command{
UsageLine: "upload -server=localhost:9333 file1 [file2 file3]\n weed upload -server=localhost:9333 -dir=one_directory -include=*.pdf", UsageLine: "upload -master=localhost:9333 file1 [file2 file3]\n weed upload -server=localhost:9333 -dir=one_directory -include=*.pdf",
Short: "upload one or a list of files", Short: "upload one or a list of files",
Long: `upload one or a list of files, or batch upload one whole folder recursively. Long: `upload one or a list of files, or batch upload one whole folder recursively.
@ -79,7 +79,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil { if e != nil {
return e return e
} }
results, e := operation.SubmitFiles(*upload.server, parts, results, e := operation.SubmitFiles(*upload.master, parts,
*upload.replication, *upload.collection, *upload.replication, *upload.collection,
*upload.ttl, *upload.maxMB, secret) *upload.ttl, *upload.maxMB, secret)
bytes, _ := json.Marshal(results) bytes, _ := json.Marshal(results)
@ -98,7 +98,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil { if e != nil {
fmt.Println(e.Error()) fmt.Println(e.Error())
} }
results, _ := operation.SubmitFiles(*upload.server, parts, results, _ := operation.SubmitFiles(*upload.master, parts,
*upload.replication, *upload.collection, *upload.replication, *upload.collection,
*upload.ttl, *upload.maxMB, secret) *upload.ttl, *upload.maxMB, secret)
bytes, _ := json.Marshal(results) bytes, _ := json.Marshal(results)

View file

@ -0,0 +1,31 @@
package operation
import (
"fmt"
"net/url"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
type SubmitResult struct {
FileName string `json:"fileName,omitempty"`
FileUrl string `json:"fileUrl,omitempty"`
Fid string `json:"fid,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
}
func RegisterFile(filer string, path string, fileId string, secret security.Secret) error {
// TODO: jwt need to be used
_ = security.GenJwt(secret, fileId)
values := make(url.Values)
values.Add("path", path)
values.Add("fileId", fileId)
_, err := util.Post("http://"+filer+"/admin/register", values)
if err != nil {
return fmt.Errorf("Failed to register path:%s on filer:%s to file id:%s", path, filer, fileId)
}
return nil
}

View file

@ -62,6 +62,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
} }
r.HandleFunc("/admin/mv", fs.moveHandler) r.HandleFunc("/admin/mv", fs.moveHandler)
r.HandleFunc("/admin/register", fs.registerHandler)
} }
r.HandleFunc("/", fs.filerHandler) r.HandleFunc("/", fs.filerHandler)
@ -73,9 +74,14 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode()) glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
//force initialize with all available master nodes //force initialize with all available master nodes
for {
_, err := fs.masterNodes.FindMaster() _, err := fs.masterNodes.FindMaster()
if err != nil { if err != nil {
glog.Fatalf("filer server failed to get master cluster info:%s", err.Error()) glog.Infof("filer server failed to get master cluster info:%s", err.Error())
time.Sleep(3 * time.Second)
} else {
break
}
} }
for { for {

View file

@ -27,3 +27,15 @@ func (fs *FilerServer) moveHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
} }
func (fs *FilerServer) registerHandler(w http.ResponseWriter, r *http.Request) {
path := r.FormValue("path")
fileId := r.FormValue("fileId")
err := fs.filer.CreateFile(path, fileId)
if err != nil {
glog.V(4).Infof("register %s to %s error: %v", fileId, path, err)
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
w.WriteHeader(http.StatusOK)
}
}

View file

@ -32,6 +32,9 @@ func PostBytes(url string, body []byte) ([]byte, error) {
return nil, fmt.Errorf("Post to %s: %v", url, err) return nil, fmt.Errorf("Post to %s: %v", url, err)
} }
defer r.Body.Close() defer r.Body.Close()
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
b, err := ioutil.ReadAll(r.Body) b, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
return nil, fmt.Errorf("Read response body: %v", err) return nil, fmt.Errorf("Read response body: %v", err)
@ -45,6 +48,9 @@ func Post(url string, values url.Values) ([]byte, error) {
return nil, err return nil, err
} }
defer r.Body.Close() defer r.Body.Close()
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
b, err := ioutil.ReadAll(r.Body) b, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
return nil, err return nil, err
@ -59,7 +65,7 @@ func Get(url string) ([]byte, error) {
} }
defer r.Body.Close() defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body) b, err := ioutil.ReadAll(r.Body)
if r.StatusCode != 200 { if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status) return nil, fmt.Errorf("%s: %s", url, r.Status)
} }
if err != nil { if err != nil {
@ -81,6 +87,9 @@ func Delete(url string, jwt security.EncodedJwt) error {
return e return e
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("%s: %s", url, resp.Status)
}
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return err return err