From eedd33dda32e2725c333f25d414231a8c7688485 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Mar 2020 13:41:58 -0700 Subject: [PATCH] refactoring --- weed/command/filer_copy.go | 2 +- weed/filesys/dirty_page.go | 2 +- weed/operation/submit.go | 4 ++-- weed/operation/upload_content.go | 15 ++++++++------- weed/replication/sink/filersink/fetch_write.go | 2 +- .../filer_server_handlers_write_autochunk.go | 3 ++- 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index a8aea36bc..f596ab23d 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -425,7 +425,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, replication = assignResult.Replication } - uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth)) + uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth)) if err != nil { uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) return diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index aa37b0841..d1f28520a 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -174,7 +174,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, err := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) + uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload data: %v", err) diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 0552ab9de..d1c4706ce 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -191,7 +191,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur cm.DeleteChunks(master, usePublicUrl, grpcDialOption) } } else { - ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) + ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) if e != nil { return 0, e } @@ -204,7 +204,7 @@ func upload_one_chunk(filename string, reader io.Reader, master, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") - uploadResult, uploadError := Upload(fileUrl, filename, false, reader, false, "", nil, jwt) + uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt) if uploadError != nil { return 0, uploadError } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 56c820f00..14e0f7cd4 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -55,23 +55,24 @@ func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isI } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { +func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { hash := md5.New() reader = io.TeeReader(reader, hash) - uploadResult, err = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, jwt) + uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, jwt) if uploadResult != nil { uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) } return } -func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { - data, readErr := ioutil.ReadAll(reader) - if readErr != nil { - err = fmt.Errorf("read input: %v", readErr) +func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { + data, err = ioutil.ReadAll(reader) + if err != nil { + err = fmt.Errorf("read input: %v", err) return } - return doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt) + uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt) + return uploadResult, uploadErr, data } func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 07b091073..3c7a36fa0 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -100,7 +100,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) // fetch data as is, regardless whether it is encrypted or not - uploadResult, err := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) + uploadResult, err, _ := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) return "", fmt.Errorf("upload data: %v", err) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index c78efcc52..e70826b5d 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -185,5 +185,6 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds()) }() - return operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + return uploadResult, err }