filer.copy: "check.size" before copying files

fix https://github.com/chrislusf/seaweedfs/issues/2067
This commit is contained in:
Chris Lu 2021-05-12 21:45:39 -07:00
parent 534c4202a5
commit b430d1b6ee

View file

@ -3,6 +3,7 @@ package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"io"
"io/ioutil"
"net/http"
@ -46,6 +47,8 @@ type CopyOptions struct {
masters []string
cipher bool
ttlSec int32
checkSize *bool
verbose *bool
}
func init() {
@ -59,6 +62,8 @@ func init() {
copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
copy.checkSize = cmdCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file")
copy.verbose = cmdCopy.Flag.Bool("verbose", false, "print out details during copying")
}
var cmdCopy = &Command{
@ -220,9 +225,9 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
println("checking directory", fileOrDir)
for _, subFileOrDir := range files {
if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
cleanedDestDirectory := filepath.Clean(destPath + fi.Name())
if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), cleanedDestDirectory+"/", fileCopyTaskChan); err != nil {
return err
}
}
@ -275,6 +280,15 @@ func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
}
}
if shouldCopy, err := worker.checkExistingFileFirst(task, f); err != nil {
return fmt.Errorf("check existing file: %v", err)
} else if !shouldCopy {
if *worker.options.verbose {
fmt.Printf("skipping copied file: %v\n", f.Name())
}
return nil
}
// find the chunk count
chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
chunkCount := 1
@ -289,6 +303,42 @@ func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}
func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.File) (shouldCopy bool, err error) {
shouldCopy = true
if !*worker.options.checkSize {
return
}
fileStat, err := f.Stat()
if err != nil {
shouldCopy = false
return
}
err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: task.destinationUrlPath,
Name: filepath.Base(f.Name()),
}
resp, lookupErr := client.LookupDirectoryEntry(context.Background(), request)
if lookupErr != nil {
// mostly not found error
return nil
}
if fileStat.Size() == int64(filer.FileSize(resp.Entry)) {
shouldCopy = false
}
return nil
})
return
}
func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
// upload the file content
@ -343,11 +393,13 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if uploadResult.Error != "" {
return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
if *worker.options.verbose {
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
}
chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
}
if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@ -501,7 +553,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
return nil
}