add option to split large files into parts and then upload

This commit is contained in:
Chris Lu 2013-11-18 21:47:31 -08:00
parent aed74b5568
commit 0e5e0a3754
2 changed files with 58 additions and 12 deletions

View file

@ -1,6 +1,7 @@
package operation package operation
import ( import (
"bytes"
"code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/glog"
"io" "io"
"mime" "mime"
@ -11,8 +12,9 @@ import (
) )
type FilePart struct { type FilePart struct {
Reader io.Reader //required, all rest are optional Reader io.Reader
FileName string FileName string
FileSize int64
IsGzipped bool IsGzipped bool
MimeType string MimeType string
ModTime int64 //in seconds ModTime int64 //in seconds
@ -26,7 +28,7 @@ type SubmitResult struct {
Error string `json:"error"` Error string `json:"error"`
} }
func SubmitFiles(master string, files []FilePart, replication string) ([]SubmitResult, error) { func SubmitFiles(master string, files []FilePart, replication string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files)) results := make([]SubmitResult, len(files))
for index, file := range files { for index, file := range files {
results[index].FileName = file.FileName results[index].FileName = file.FileName
@ -43,7 +45,7 @@ func SubmitFiles(master string, files []FilePart, replication string) ([]SubmitR
if index > 0 { if index > 0 {
fid = fid + "_" + strconv.Itoa(index) fid = fid + "_" + strconv.Itoa(index)
} }
results[index].Size, err = file.Upload(ret.PublicUrl, fid) results[index].Size, err = file.upload(ret.PublicUrl, fid, maxMB, master, replication)
if err != nil { if err != nil {
fid = "" fid = ""
results[index].Error = err.Error() results[index].Error = err.Error()
@ -57,13 +59,13 @@ func SubmitFiles(master string, files []FilePart, replication string) ([]SubmitR
func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) { func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
ret = make([]FilePart, len(fullPathFilenames)) ret = make([]FilePart, len(fullPathFilenames))
for index, file := range fullPathFilenames { for index, file := range fullPathFilenames {
if ret[index], err = NewFilePart(file); err != nil { if ret[index], err = newFilePart(file); err != nil {
return return
} }
} }
return return
} }
func NewFilePart(fullPathFilename string) (ret FilePart, err error) { func newFilePart(fullPathFilename string) (ret FilePart, err error) {
fh, openErr := os.Open(fullPathFilename) fh, openErr := os.Open(fullPathFilename)
if openErr != nil { if openErr != nil {
glog.V(0).Info("Failed to open file: ", fullPathFilename) glog.V(0).Info("Failed to open file: ", fullPathFilename)
@ -76,6 +78,7 @@ func NewFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, fiErr return ret, fiErr
} else { } else {
ret.ModTime = fi.ModTime().UTC().Unix() ret.ModTime = fi.ModTime().UTC().Unix()
ret.FileSize = fi.Size()
} }
ext := strings.ToLower(path.Ext(fullPathFilename)) ext := strings.ToLower(path.Ext(fullPathFilename))
ret.IsGzipped = ext == ".gz" ret.IsGzipped = ext == ".gz"
@ -90,7 +93,7 @@ func NewFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil return ret, nil
} }
func (fi FilePart) Upload(server string, fid string) (int, error) { func (fi FilePart) upload(server string, fid string, maxMB int, master, replication string) (retSize int, err error) {
fileUrl := "http://" + server + "/" + fid fileUrl := "http://" + server + "/" + fid
if fi.ModTime != 0 { if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@ -98,9 +101,47 @@ func (fi FilePart) Upload(server string, fid string) (int, error) {
if closer, ok := fi.Reader.(io.Closer); ok { if closer, ok := fi.Reader.(io.Closer); ok {
defer closer.Close() defer closer.Close()
} }
ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType) if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
if e != nil { chunkSize := int64(maxMB * 1024 * 1024)
return 0, e chunks := fi.FileSize/chunkSize + 1
fids := make([]string, 0)
for i := int64(0); i < chunks; i++ {
id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, replication)
if e != nil {
return 0, e
}
fids = append(fids, id)
retSize += count
if e = upload_file_id_list(fileUrl, fi.FileName+"-list", fids); e != nil {
return 0, e
}
}
return retSize, nil
} else {
ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType)
if e != nil {
return 0, e
}
return ret.Size, e
} }
return ret.Size, e return 0, nil
}
func upload_one_chunk(filename string, reader io.Reader, master, replication string) (fid string, size int, e error) {
ret, err := Assign(master, 1, replication)
if err != nil {
return "", 0, err
}
fileUrl, fid := "http://"+ret.PublicUrl+"/"+ret.Fid, ret.Fid
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream")
return fid, uploadResult.Size, uploadError
}
func upload_file_id_list(fileUrl, filename string, fids []string) error {
var buf bytes.Buffer
buf.WriteString(strings.Join(fids, "\n"))
glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
_, e := Upload(fileUrl, filename, &buf, false, "text/plain")
return e
} }

View file

@ -12,6 +12,7 @@ var (
uploadReplication *string uploadReplication *string
uploadDir *string uploadDir *string
include *string include *string
maxMB *int
) )
func init() { func init() {
@ -21,6 +22,7 @@ func init() {
uploadDir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.") uploadDir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.")
include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
uploadReplication = cmdUpload.Flag.String("replication", "", "replication type(000,001,010,100,110,200)") uploadReplication = cmdUpload.Flag.String("replication", "", "replication type(000,001,010,100,110,200)")
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
} }
var cmdUpload = &Command{ var cmdUpload = &Command{
@ -39,6 +41,9 @@ var cmdUpload = &Command{
If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is. If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is.
This can save volume server's gzipped processing and allow customizable gzip compression level. This can save volume server's gzipped processing and allow customizable gzip compression level.
The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js". The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js".
If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly.
The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
`, `,
} }
@ -60,7 +65,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil { if e != nil {
return e return e
} }
results, e := operation.SubmitFiles(*server, parts, *uploadReplication) results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *maxMB)
bytes, _ := json.Marshal(results) bytes, _ := json.Marshal(results)
fmt.Println(string(bytes)) fmt.Println(string(bytes))
if e != nil { if e != nil {
@ -77,7 +82,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil { if e != nil {
fmt.Println(e.Error()) fmt.Println(e.Error())
} }
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication) results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *maxMB)
bytes, _ := json.Marshal(results) bytes, _ := json.Marshal(results)
fmt.Println(string(bytes)) fmt.Println(string(bytes))
} }