This commit is contained in:
Chris Lu 2021-06-06 18:43:04 -07:00
parent 6c82326575
commit 44f1ba6894

View file

@ -56,54 +56,27 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
break break
} }
} }
dataReader := util.NewBytesReader(bytesBuffer.Bytes())
// retry to assign a different file id chunk, uploadErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), chunkOffset, so, md5Hash)
var fileId, urlLocation string
var auth security.EncodedJwt
var assignErr, uploadErr error
var uploadResult *operation.UploadResult
for i := 0; i < 3; i++ {
// assign one file id for one chunk
fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
if assignErr != nil {
return nil, nil, 0, assignErr, nil
}
// upload the chunk to the volume server
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
time.Sleep(251 * time.Millisecond)
continue
}
break
}
if uploadErr != nil { if uploadErr != nil {
return nil, nil, 0, uploadErr, nil return nil, nil, 0, uploadErr, nil
} }
// if last chunk exhausted the reader exactly at the border // if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 { if chunk == nil {
break break
} }
if chunkOffset == 0 {
uploadedMd5 := util.Base64Md5ToBytes(uploadResult.ContentMd5)
readedMd5 := md5Hash.Sum(nil)
if !bytes.Equal(uploadedMd5, readedMd5) {
glog.Errorf("md5 %x does not match %x uploaded chunk %s to the volume server", readedMd5, uploadedMd5, uploadResult.Name)
}
}
// Save to chunk manifest structure // Save to chunk manifest structure
fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) fileChunks = append(fileChunks, chunk)
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, chunkOffset, chunkOffset+int64(chunk.Size))
// reset variables for the next chunk // reset variables for the next chunk
chunkOffset = chunkOffset + int64(uploadResult.Size) chunkOffset = chunkOffset + int64(chunk.Size)
// if last chunk was not at full chunk size, but already exhausted the reader // if last chunk was not at full chunk size, but already exhausted the reader
if int64(uploadResult.Size) < int64(chunkSize) { if int64(chunk.Size) < int64(chunkSize) {
break break
} }
} }
@ -111,7 +84,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
return fileChunks, md5Hash, chunkOffset, nil, smallContent return fileChunks, md5Hash, chunkOffset, nil, smallContent
} }
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc() stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
start := time.Now() start := time.Now()
@ -125,3 +98,49 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
} }
return uploadResult, err, data return uploadResult, err, data
} }
func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption, md5Hash hash.Hash) (*filer_pb.FileChunk, error) {
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
var fileId, urlLocation string
var auth security.EncodedJwt
var uploadErr error
var uploadResult *operation.UploadResult
for i := 0; i < 3; i++ {
// assign one file id for one chunk
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
if uploadErr != nil {
glog.V(4).Infof("retry later due to assign error: %v", uploadErr)
time.Sleep(time.Duration(i+1) * 251 * time.Millisecond)
continue
}
// upload the chunk to the volume server
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
time.Sleep(time.Duration(i+1) * 251 * time.Millisecond)
continue
}
break
}
if uploadErr != nil {
glog.Errorf("upload error: %v", uploadErr)
return nil, uploadErr
}
// if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 {
return nil, nil
}
if chunkOffset == 0 {
uploadedMd5 := util.Base64Md5ToBytes(uploadResult.ContentMd5)
readedMd5 := md5Hash.Sum(nil)
if !bytes.Equal(uploadedMd5, readedMd5) {
glog.Errorf("md5 %x does not match %x uploaded chunk %s to the volume server", readedMd5, uploadedMd5, uploadResult.Name)
}
}
return uploadResult.ToPbFileChunk(fileId, chunkOffset), nil
}