filer.backup: backup small files if the file is saved in filer (saveToFilerLimit > 0)

fix https://github.com/seaweedfs/seaweedfs/issues/3468
This commit is contained in:
chrislu 2022-08-19 23:00:56 -07:00
parent fdd8c5d5e0
commit 11f99836c3
6 changed files with 34 additions and 1 deletions

View file

@ -119,6 +119,10 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
return writeErr return writeErr
} }
if len(entry.Content) > 0 {
return writeFunc(entry.Content)
}
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err return err
} }

View file

@ -101,13 +101,16 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
targetObject := bucket.Object(key) targetObject := bucket.Object(key)
writer := targetObject.NewWriter(context.Background()) writer := targetObject.NewWriter(context.Background())
defer writer.Close()
writeFunc := func(data []byte) error { writeFunc := func(data []byte) error {
_, writeErr := writer.Write(data) _, writeErr := writer.Write(data)
return writeErr return writeErr
} }
defer writer.Close() if len(entry.Content) > 0 {
return writeFunc(entry.Content)
}
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err return err

View file

@ -107,6 +107,10 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in
return writeErr return writeErr
} }
if len(entry.Content) > 0 {
return writeFunc(entry.Content)
}
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err return err
} }

View file

@ -101,6 +101,10 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
return writeErr return writeErr
} }
if len(entry.Content) > 0 {
return writeFunc(entry.Content)
}
if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil { if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
return err return err
} }

View file

@ -1,6 +1,7 @@
package S3Sink package S3Sink
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"strings" "strings"
@ -121,6 +122,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
} }
totalSize := filer.FileSize(entry) totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) chunkViews := filer.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
parts := make([]*s3.CompletedPart, len(chunkViews)) parts := make([]*s3.CompletedPart, len(chunkViews))
@ -141,6 +143,17 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
} }
wg.Wait() wg.Wait()
// for small files
if len(entry.Content) > 0 {
parts = make([]*s3.CompletedPart, 1)
if part, uploadErr := s3sink.doUploadPart(key, uploadId, 1, bytes.NewReader(entry.Content)); uploadErr != nil {
err = uploadErr
glog.Errorf("uploadPart: %v", uploadErr)
} else {
parts[0] = part
}
}
if err != nil { if err != nil {
s3sink.abortMultipartUpload(key, uploadId) s3sink.abortMultipartUpload(key, uploadId)
return fmt.Errorf("uploadPart: %v", err) return fmt.Errorf("uploadPart: %v", err)

View file

@ -116,6 +116,11 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.
return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
} }
return s3sink.doUploadPart(key, uploadId, partId, readSeeker)
}
func (s3sink *S3Sink) doUploadPart(key, uploadId string, partId int, readSeeker io.ReadSeeker) (*s3.CompletedPart, error) {
input := &s3.UploadPartInput{ input := &s3.UploadPartInput{
Body: readSeeker, Body: readSeeker,
Bucket: aws.String(s3sink.bucket), Bucket: aws.String(s3sink.bucket),