diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 7bfd484f0..e20087ce4 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -585,59 +585,57 @@ func detectMimeType(f *os.File) string { func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) { - var fileId, host string - var auth security.EncodedJwt + var finalFileId string + uploadResult, flushErr, _ := operation.UploadWithRetry( + worker, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + DiskType: *worker.options.diskType, + Path: name, + }, + &operation.UploadOption{ + Filename: name, + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + }, + func(host, fileId string) string { + finalFileId = fileId + return fmt.Sprintf("http://%s/%s", host, fileId) + }, + reader, + ) - if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx := context.Background() - - assignErr := util.Retry("assignVolume", func() error { - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - TtlSec: worker.options.ttlSec, - DiskType: *worker.options.diskType, - Path: name, - } - - resp, err := client.AssignVolume(ctx, request) - if err != nil { - return fmt.Errorf("assign volume failure %v: %v", request, err) - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) - - return nil - }) - if assignErr != nil { - return assignErr - } - - return nil - }); flushErr != nil { - return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr) - } - - uploadOption := &operation.UploadOption{ - UploadUrl: fmt.Sprintf("http://%s/%s", host, fileId), - Filename: name, - Cipher: worker.options.cipher, - IsInputCompressed: false, - MimeType: "", - PairMap: nil, - Jwt: auth, - } - uploadResult, flushErr, _ := operation.Upload(reader, uploadOption) if flushErr != nil { return nil, fmt.Errorf("upload data: %v", flushErr) } if uploadResult.Error != "" { return nil, fmt.Errorf("upload result: %v", uploadResult.Error) } - return uploadResult.ToPbFileChunk(fileId, offset), nil + return uploadResult.ToPbFileChunk(finalFileId, offset), nil +} + +var _ = filer_pb.FilerClient(&FileCopyWorker{}) + +func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { + + filerGrpcAddress := worker.filerAddress.ToGrpcAddress() + err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, worker.options.grpcDialOption) + + return +} + +func (worker *FileCopyWorker) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +func (worker *FileCopyWorker) GetDataCenter() string { + return "" } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index cafba5ce9..76f55523a 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -2,6 +2,7 @@ package operation import ( "bytes" + "context" "encoding/json" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -28,6 +29,7 @@ type UploadOption struct { MimeType string PairMap map[string]string Jwt security.EncodedJwt + RetryForever bool } type UploadResult struct { @@ -76,6 +78,53 @@ func init() { }} } +// UploadWithRetry will retry both assigning volume request and uploading content +// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. +func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (uploadResult *UploadResult, err error, data []byte) { + doUploadFunc := func() error { + + var fileId, host string + var auth security.EncodedJwt + + // grpc assign volume + if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, assignErr := client.AssignVolume(context.Background(), assignRequest) + if assignErr != nil { + glog.V(0).Infof("assign volume failure %v: %v", assignRequest, assignErr) + return assignErr + } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", assignRequest, resp.Error) + } + + fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth) + loc := resp.Location + host = filerClient.AdjustedUrl(loc) + + return nil + }); grpcAssignErr != nil { + return fmt.Errorf("filerGrpcAddress assign volume: %v", grpcAssignErr) + } + + uploadOption.UploadUrl = genFileUrlFn(host, fileId) + uploadOption.Jwt = auth + + var uploadErr error + uploadResult, uploadErr, data = doUpload(reader, uploadOption) + return uploadErr + } + if uploadOption.RetryForever { + util.RetryForever("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) { + glog.V(0).Infof("upload content: %v", err) + return true + }) + } else { + err = util.Retry("uploadWithRetry", doUploadFunc) + } + + return +} + var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") // Upload sends a POST request to a volume server to upload the content with adjustable compression level