mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
webdav: retryable data chunk upload
This commit is contained in:
parent
f8fa430257
commit
3bf8e772f8
|
@ -376,61 +376,34 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
|
||||||
|
|
||||||
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
|
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
var fileId, host string
|
fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
|
||||||
var auth security.EncodedJwt
|
f.fs,
|
||||||
|
&filer_pb.AssignVolumeRequest{
|
||||||
|
Count: 1,
|
||||||
|
Replication: f.fs.option.Replication,
|
||||||
|
Collection: f.fs.option.Collection,
|
||||||
|
DiskType: f.fs.option.DiskType,
|
||||||
|
Path: name,
|
||||||
|
},
|
||||||
|
&operation.UploadOption{
|
||||||
|
Filename: f.name,
|
||||||
|
Cipher: f.fs.option.Cipher,
|
||||||
|
IsInputCompressed: false,
|
||||||
|
MimeType: "",
|
||||||
|
PairMap: nil,
|
||||||
|
},
|
||||||
|
func(host, fileId string) string {
|
||||||
|
return fmt.Sprintf("http://%s/%s", host, fileId)
|
||||||
|
},
|
||||||
|
reader,
|
||||||
|
)
|
||||||
|
|
||||||
if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
assignErr := util.Retry("assignVolume", func() error {
|
|
||||||
request := &filer_pb.AssignVolumeRequest{
|
|
||||||
Count: 1,
|
|
||||||
Replication: f.fs.option.Replication,
|
|
||||||
Collection: f.fs.option.Collection,
|
|
||||||
DiskType: f.fs.option.DiskType,
|
|
||||||
Path: name,
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := client.AssignVolume(ctx, request)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if resp.Error != "" {
|
|
||||||
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if assignErr != nil {
|
|
||||||
return assignErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}); flushErr != nil {
|
|
||||||
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
|
||||||
uploadOption := &operation.UploadOption{
|
|
||||||
UploadUrl: fileUrl,
|
|
||||||
Filename: f.name,
|
|
||||||
Cipher: f.fs.option.Cipher,
|
|
||||||
IsInputCompressed: false,
|
|
||||||
MimeType: "",
|
|
||||||
PairMap: nil,
|
|
||||||
Jwt: auth,
|
|
||||||
}
|
|
||||||
uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
|
|
||||||
if flushErr != nil {
|
if flushErr != nil {
|
||||||
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
|
glog.V(0).Infof("upload data %v: %v", f.name, flushErr)
|
||||||
return nil, fmt.Errorf("upload data: %v", flushErr)
|
return nil, fmt.Errorf("upload data: %v", flushErr)
|
||||||
}
|
}
|
||||||
if uploadResult.Error != "" {
|
if uploadResult.Error != "" {
|
||||||
glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
|
glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
|
||||||
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
|
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
|
||||||
}
|
}
|
||||||
return uploadResult.ToPbFileChunk(fileId, offset), nil
|
return uploadResult.ToPbFileChunk(fileId, offset), nil
|
||||||
|
|
Loading…
Reference in a new issue