diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 7b3e22b90..d1a5d7ebd 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -1,7 +1,6 @@ package filersink import ( - "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/util" "sync" @@ -12,7 +11,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/security" ) func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) { @@ -67,62 +65,41 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) } defer util.CloseResponse(resp) - var host string - var auth security.EncodedJwt - - if err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return util.Retry("assignVolume", func() error { - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: fs.replication, - Collection: fs.collection, - TtlSec: fs.ttlSec, - DataCenter: fs.dataCenter, - DiskType: fs.diskType, - Path: path, + fileId, uploadResult, err, _ := operation.UploadWithRetry( + fs, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: fs.replication, + Collection: fs.collection, + TtlSec: fs.ttlSec, + DataCenter: fs.dataCenter, + DiskType: fs.diskType, + Path: path, + }, + &operation.UploadOption{ + Filename: filename, + Cipher: false, + IsInputCompressed: "gzip" == header.Get("Content-Encoding"), + MimeType: header.Get("Content-Type"), + PairMap: nil, + }, + func(host, fileId string) string { + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if fs.writeChunkByFiler { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) } + glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) + return fileUrl + }, + resp.Body, + ) - resp, err := client.AssignVolume(context.Background(), request) - if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) - - return nil - }) - }); err != nil { - return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) - } - - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - if fs.writeChunkByFiler { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) - } - - glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) - - // fetch data as is, regardless whether it is encrypted or not - uploadOption := &operation.UploadOption{ - UploadUrl: fileUrl, - Filename: filename, - Cipher: false, - IsInputCompressed: "gzip" == header.Get("Content-Encoding"), - MimeType: header.Get("Content-Type"), - PairMap: nil, - Jwt: auth, - } - uploadResult, err, _ := operation.Upload(resp.Body, uploadOption) if err != nil { - glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err) + glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) return "", fmt.Errorf("upload data: %v", err) } if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + glog.V(0).Infof("upload failure %v: %v", filename, err) return "", fmt.Errorf("upload result: %v", uploadResult.Error) }