retry both assign volume and uploading data

fix https://github.com/chrislusf/seaweedfs/issues/2351
This commit is contained in:
Chris Lu 2021-10-01 23:23:39 -07:00
parent ec3351a4ec
commit af207bbaf0

View file

@ -344,9 +344,9 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
return err
}
// assign a volume
err = util.Retry("assignVolume", func() error {
return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = util.Retry("upload", func() error {
// assign a volume
assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@ -369,35 +369,41 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
return nil
})
if assignErr != nil {
return assignErr
}
// uplaod data
targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
uploadOption := &operation.UploadOption{
UploadUrl: targetUrl,
Filename: fileName,
Cipher: worker.options.cipher,
IsInputCompressed: false,
MimeType: mimeType,
PairMap: nil,
Jwt: security.EncodedJwt(assignResult.Auth),
}
uploadResult, err := operation.UploadData(data, uploadOption)
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
if uploadResult.Error != "" {
return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
if *worker.options.verbose {
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
}
fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
return nil
})
if err != nil {
return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
return fmt.Errorf("upload %v: %v\n", fileName, err)
}
targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
uploadOption := &operation.UploadOption{
UploadUrl: targetUrl,
Filename: fileName,
Cipher: worker.options.cipher,
IsInputCompressed: false,
MimeType: mimeType,
PairMap: nil,
Jwt: security.EncodedJwt(assignResult.Auth),
}
uploadResult, err := operation.UploadData(data, uploadOption)
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
if uploadResult.Error != "" {
return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
if *worker.options.verbose {
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
}
chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
}
if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {