diff --git a/go/operation/submit.go b/go/operation/submit.go index d24a89b2b..e922a8834 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -1,6 +1,7 @@ package operation import ( + "bytes" "code.google.com/p/weed-fs/go/glog" "io" "mime" @@ -11,8 +12,9 @@ import ( ) type FilePart struct { - Reader io.Reader //required, all rest are optional + Reader io.Reader FileName string + FileSize int64 IsGzipped bool MimeType string ModTime int64 //in seconds @@ -26,7 +28,7 @@ type SubmitResult struct { Error string `json:"error"` } -func SubmitFiles(master string, files []FilePart, replication string) ([]SubmitResult, error) { +func SubmitFiles(master string, files []FilePart, replication string, maxMB int) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName @@ -43,7 +45,7 @@ func SubmitFiles(master string, files []FilePart, replication string) ([]SubmitR if index > 0 { fid = fid + "_" + strconv.Itoa(index) } - results[index].Size, err = file.Upload(ret.PublicUrl, fid) + results[index].Size, err = file.upload(ret.PublicUrl, fid, maxMB, master, replication) if err != nil { fid = "" results[index].Error = err.Error() @@ -57,13 +59,13 @@ func SubmitFiles(master string, files []FilePart, replication string) ([]SubmitR func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) { ret = make([]FilePart, len(fullPathFilenames)) for index, file := range fullPathFilenames { - if ret[index], err = NewFilePart(file); err != nil { + if ret[index], err = newFilePart(file); err != nil { return } } return } -func NewFilePart(fullPathFilename string) (ret FilePart, err error) { +func newFilePart(fullPathFilename string) (ret FilePart, err error) { fh, openErr := os.Open(fullPathFilename) if openErr != nil { glog.V(0).Info("Failed to open file: ", fullPathFilename) @@ -76,6 +78,7 @@ func NewFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, fiErr } else { ret.ModTime = fi.ModTime().UTC().Unix() + ret.FileSize = fi.Size() } ext := strings.ToLower(path.Ext(fullPathFilename)) ret.IsGzipped = ext == ".gz" @@ -90,7 +93,7 @@ func NewFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(server string, fid string) (int, error) { +func (fi FilePart) upload(server string, fid string, maxMB int, master, replication string) (retSize int, err error) { fileUrl := "http://" + server + "/" + fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -98,9 +101,47 @@ func (fi FilePart) Upload(server string, fid string) (int, error) { if closer, ok := fi.Reader.(io.Closer); ok { defer closer.Close() } - ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType) - if e != nil { - return 0, e + if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) { + chunkSize := int64(maxMB * 1024 * 1024) + chunks := fi.FileSize/chunkSize + 1 + fids := make([]string, 0) + for i := int64(0); i < chunks; i++ { + id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, replication) + if e != nil { + return 0, e + } + fids = append(fids, id) + retSize += count + if e = upload_file_id_list(fileUrl, fi.FileName+"-list", fids); e != nil { + return 0, e + } + } + return retSize, nil + } else { + ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType) + if e != nil { + return 0, e + } + return ret.Size, e } - return ret.Size, e + return 0, nil +} + +func upload_one_chunk(filename string, reader io.Reader, master, replication string) (fid string, size int, e error) { + ret, err := Assign(master, 1, replication) + if err != nil { + return "", 0, err + } + fileUrl, fid := "http://"+ret.PublicUrl+"/"+ret.Fid, ret.Fid + glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") + uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream") + return fid, uploadResult.Size, uploadError +} + +func upload_file_id_list(fileUrl, filename string, fids []string) error { + var buf bytes.Buffer + buf.WriteString(strings.Join(fids, "\n")) + glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...") + _, e := Upload(fileUrl, filename, &buf, false, "text/plain") + return e } diff --git a/go/weed/upload.go b/go/weed/upload.go index 215a320a5..c8ad41c64 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -12,6 +12,7 @@ var ( uploadReplication *string uploadDir *string include *string + maxMB *int ) func init() { @@ -21,6 +22,7 @@ func init() { uploadDir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.") include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") uploadReplication = cmdUpload.Flag.String("replication", "", "replication type(000,001,010,100,110,200)") + maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") } var cmdUpload = &Command{ @@ -39,6 +41,9 @@ var cmdUpload = &Command{ If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is. This can save volume server's gzipped processing and allow customizable gzip compression level. The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js". + + If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly. + The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned. `, } @@ -60,7 +65,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*server, parts, *uploadReplication) + results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -77,7 +82,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*server, parts, *uploadReplication) + results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) }