diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 777e52ab6..c18b9f055 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -29,16 +29,17 @@ var ( ) type CopyOptions struct { - filerGrpcPort *int - master *string - include *string - replication *string - collection *string - ttl *string - maxMB *int - grpcDialOption grpc.DialOption - masterClient *wdclient.MasterClient - concurrency *int + filerGrpcPort *int + master *string + include *string + replication *string + collection *string + ttl *string + maxMB *int + grpcDialOption grpc.DialOption + masterClient *wdclient.MasterClient + concurrency *int + compressionLevel *int } func init() { @@ -52,6 +53,7 @@ func init() { copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000") copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9") } var cmdCopy = &Command{ @@ -265,7 +267,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth) + uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel) if err != nil { return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index dcab1a0ae..4417a0e70 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -39,8 +39,23 @@ func init() { var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") -// Upload sends a POST request to a volume server to upload the content +// Upload sends a POST request to a volume server to upload the content with adjustable compression level +func UploadWithLocalCompressionLevel(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt, compressionLevel int) (*UploadResult, error) { + if compressionLevel < 1 { + compressionLevel = 1 + } + if compressionLevel > 9 { + compressionLevel = 9 + } + return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, compressionLevel, jwt) +} + +// Upload sends a POST request to a volume server to upload the content with fast compression func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { + return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, flate.BestSpeed, jwt) +} + +func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (*UploadResult, error) { contentIsGzipped := isGzipped shouldGzipNow := false if !isGzipped { @@ -51,7 +66,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, } return upload_content(uploadUrl, func(w io.Writer) (err error) { if shouldGzipNow { - gzWriter, _ := gzip.NewWriterLevel(w, flate.BestSpeed) + gzWriter, _ := gzip.NewWriterLevel(w, compression) _, err = io.Copy(gzWriter, reader) gzWriter.Close() } else { @@ -60,6 +75,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, return }, filename, contentIsGzipped, mtype, pairMap, jwt) } + func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { body_buf := bytes.NewBufferString("") body_writer := multipart.NewWriter(body_buf)