diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 19aceb211..1dd831cb6 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -29,16 +29,17 @@ var ( ) type CopyOptions struct { - include *string - replication *string - collection *string - ttl *string - maxMB *int - masterClient *wdclient.MasterClient - concurrency *int - compressionLevel *int - grpcDialOption grpc.DialOption - masters []string + include *string + replication *string + collection *string + ttl *string + maxMB *int + masterClient *wdclient.MasterClient + concurrenctFiles *int + concurrenctChunks *int + compressionLevel *int + grpcDialOption grpc.DialOption + masters []string } func init() { @@ -49,7 +50,8 @@ func init() { copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") - copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9") } @@ -131,7 +133,7 @@ func runCopy(cmd *Command, args []string) bool { util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") } - fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency) + fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles) go func() { defer close(fileCopyTaskChan) @@ -142,7 +144,7 @@ func runCopy(cmd *Command, args []string) bool { } } }() - for i := 0; i < *copy.concurrency; i++ { + for i := 0; i < *copy.concurrenctFiles; i++ { waitGroup.Add(1) go func() { defer waitGroup.Done() @@ -345,41 +347,66 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) + chunksChan := make(chan *filer_pb.FileChunk, chunkCount) + + concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks) + var wg sync.WaitGroup + var uploadError error + + fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount) + for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ { + wg.Add(1) + concurrentChunks <- struct{}{} + go func(i int64) { + defer func() { + wg.Done() + <-concurrentChunks + }() + // assign a volume + assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + Ttl: *worker.options.ttl, + }) + if err != nil { + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + } + + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + + uploadResult, err := operation.Upload(targetUrl, + fileName+"-"+strconv.FormatInt(i+1, 10), + io.NewSectionReader(f, i*chunkSize, chunkSize), + false, "application/octet-stream", nil, assignResult.Auth) + if err != nil { + uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) + return + } + if uploadResult.Error != "" { + uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + return + } + chunksChan <- &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: i * chunkSize, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + } + fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) + }(i) + } + wg.Wait() + close(chunksChan) + + if uploadError != nil { + return uploadError + } + var chunks []*filer_pb.FileChunk - - for i := int64(0); i < int64(chunkCount); i++ { - - // assign a volume - assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - Ttl: *worker.options.ttl, - }) - if err != nil { - fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) - } - - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - - uploadResult, err := operation.Upload(targetUrl, - fileName+"-"+strconv.FormatInt(i+1, 10), - io.LimitReader(f, chunkSize), - false, "application/octet-stream", nil, assignResult.Auth) - if err != nil { - return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) - } - if uploadResult.Error != "" { - return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - } - chunks = append(chunks, &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: i * chunkSize, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - }) - fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) + for chunk := range chunksChan { + chunks = append(chunks, chunk) } if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {