refactoring

This commit is contained in:
Chris Lu 2020-03-28 13:41:58 -07:00
parent 005b4ab3fe
commit eedd33dda3
6 changed files with 15 additions and 13 deletions

View file

@ -425,7 +425,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
replication = assignResult.Replication replication = assignResult.Replication
} }
uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth)) uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
if err != nil { if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return return

View file

@ -174,7 +174,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
} }
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
uploadResult, err := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil { if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload data: %v", err) return nil, fmt.Errorf("upload data: %v", err)

View file

@ -191,7 +191,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
cm.DeleteChunks(master, usePublicUrl, grpcDialOption) cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
} }
} else { } else {
ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
if e != nil { if e != nil {
return 0, e return 0, e
} }
@ -204,7 +204,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
fileUrl string, jwt security.EncodedJwt, fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) { ) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, false, reader, false, "", nil, jwt) uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt)
if uploadError != nil { if uploadError != nil {
return 0, uploadError return 0, uploadError
} }

View file

@ -55,23 +55,24 @@ func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isI
} }
// Upload sends a POST request to a volume server to upload the content with fast compression // Upload sends a POST request to a volume server to upload the content with fast compression
func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
hash := md5.New() hash := md5.New()
reader = io.TeeReader(reader, hash) reader = io.TeeReader(reader, hash)
uploadResult, err = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, jwt) uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, jwt)
if uploadResult != nil { if uploadResult != nil {
uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
} }
return return
} }
func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
data, readErr := ioutil.ReadAll(reader) data, err = ioutil.ReadAll(reader)
if readErr != nil { if err != nil {
err = fmt.Errorf("read input: %v", readErr) err = fmt.Errorf("read input: %v", err)
return return
} }
return doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt) uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
return uploadResult, uploadErr, data
} }
func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {

View file

@ -100,7 +100,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string)
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
// fetch data as is, regardless whether it is encrypted or not // fetch data as is, regardless whether it is encrypted or not
uploadResult, err := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) uploadResult, err, _ := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth)
if err != nil { if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
return "", fmt.Errorf("upload data: %v", err) return "", fmt.Errorf("upload data: %v", err)

View file

@ -185,5 +185,6 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds()) stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
}() }()
return operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
return uploadResult, err
} }