able to sync the changes

This commit is contained in:
Chris Lu 2018-09-21 01:54:29 -07:00
parent a8cd7df44a
commit a6cfaba018
6 changed files with 221 additions and 69 deletions

View file

@ -55,7 +55,7 @@ func downloadToFile(server, fileId, saveDir string) error {
if lookupError != nil {
return lookupError
}
filename, rc, err := util.DownloadUrl(fileUrl)
filename, _, rc, err := util.DownloadFile(fileUrl)
if err != nil {
return err
}
@ -108,7 +108,7 @@ func fetchContent(server string, fileId string) (filename string, content []byte
return "", nil, lookupError
}
var rc io.ReadCloser
if filename, rc, e = util.DownloadUrl(fileUrl); e != nil {
if filename, _, rc, e = util.DownloadFile(fileUrl); e != nil {
return "", nil, e
}
content, e = ioutil.ReadAll(rc)

View file

@ -30,6 +30,8 @@ func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator {
}
}
sink.SetSourceFiler(source)
return &Replicator{
sink: sink,
source: source,

View file

@ -0,0 +1,122 @@
package sink
import (
"context"
"sync"
"strings"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) {
if len(sourceChunks) == 0 {
return
}
var wg sync.WaitGroup
for _, sourceChunk := range sourceChunks {
wg.Add(1)
go func(chunk *filer_pb.FileChunk) {
defer wg.Done()
replicatedChunk, e := fs.replicateOneChunk(chunk)
if e != nil {
err = e
}
replicatedChunks = append(replicatedChunks, replicatedChunk)
}(sourceChunk)
}
wg.Wait()
return
}
func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) {
fileId, err := fs.fetchAndWrite(sourceChunk)
if err != nil {
return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
}
return &filer_pb.FileChunk{
FileId: fileId,
Offset: sourceChunk.Offset,
Size: sourceChunk.Size,
Mtime: sourceChunk.Mtime,
ETag: sourceChunk.ETag,
SourceFileId: sourceChunk.FileId,
}, nil
}
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.FileId)
if err != nil {
return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err)
}
defer readCloser.Close()
var host string
if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: fs.replication,
Collection: fs.collection,
TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter,
}
resp, err := client.AssignVolume(context.Background(), request)
if err != nil {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
fileId, host = resp.FileId, resp.Url
return nil
}); err != nil {
return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
glog.V(3).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
uploadResult, err := operation.Upload(fileUrl, filename, readCloser,
"gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, "")
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
return "", fmt.Errorf("upload data: %v", err)
}
if uploadResult.Error != "" {
glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
return "", fmt.Errorf("upload result: %v", uploadResult.Error)
}
return
}
func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
grpcConnection, err := util.GrpcDial(fs.grpcAddress)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
}
defer grpcConnection.Close()
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}
func volumeId(fileId string) string {
lastCommaIndex := strings.LastIndex(fileId, ",")
if lastCommaIndex > 0 {
return fileId[:lastCommaIndex]
}
return fileId
}

View file

@ -1,14 +1,14 @@
package sink
import (
"fmt"
"context"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"fmt"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"context"
"sync"
"github.com/chrislusf/seaweedfs/weed/replication/source"
)
type ReplicationSink interface {
@ -16,11 +16,17 @@ type ReplicationSink interface {
CreateEntry(key string, entry *filer_pb.Entry) error
UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error
GetDirectory() string
SetSourceFiler(s *source.FilerSource)
}
type FilerSink struct {
grpcAddress string
dir string
filerSource *source.FilerSource
replication string
collection string
ttlSec int32
dataCenter string
}
func (fs *FilerSink) GetDirectory() string {
@ -34,6 +40,10 @@ func (fs *FilerSink) Initialize(configuration util.Configuration) error {
)
}
func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
fs.filerSource = s
}
func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) {
fs.grpcAddress = grpcAddress
fs.dir = dir
@ -65,13 +75,15 @@ func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteInclud
func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
replicatedChunks, err := replicateChunks(entry.Chunks)
replicatedChunks, err := fs.replicateChunks(entry.Chunks)
if err != nil {
glog.V(0).Infof("replicate entry chunks %s: %v", key, err)
return fmt.Errorf("replicate entry chunks %s: %v", key, err)
}
glog.V(0).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir, name := filer2.FullPath(key).DirAndName()
@ -96,70 +108,84 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
})
}
func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error {
func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (err error) {
ctx := context.Background()
dir, name := filer2.FullPath(key).DirAndName()
// find out what changed
deletedChunks, newChunks := compareChunks(oldEntry, newEntry)
// read existing entry
var entry *filer_pb.Entry
err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
}
glog.V(4).Infof("lookup directory entry: %v", request)
resp, err := client.LookupDirectoryEntry(ctx, request)
if err != nil {
glog.V(0).Infof("lookup %s: %v", key, err)
return err
}
entry = resp.Entry
return nil
}
})
func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
grpcConnection, err := util.GrpcDial(fs.grpcAddress)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
}
defer grpcConnection.Close()
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
return err
}
func volumeId(fileId string) string {
lastCommaIndex := strings.LastIndex(fileId, ",")
if lastCommaIndex > 0 {
return fileId[:lastCommaIndex]
}
return fileId
// delete the chunks that are deleted from the source
if deleteIncludeChunks {
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
entry.Chunks = minusChunks(entry.Chunks, deletedChunks)
}
func replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) {
if len(sourceChunks) == 0 {
return
}
var wg sync.WaitGroup
for _, s := range sourceChunks {
wg.Add(1)
go func(chunk *filer_pb.FileChunk) {
defer wg.Done()
replicatedChunk, e := replicateOneChunk(chunk)
if e != nil {
err = e
}
replicatedChunks = append(replicatedChunks, replicatedChunk)
}(s)
}
wg.Wait()
// replicate the chunks that are new in the source
replicatedChunks, err := fs.replicateChunks(newChunks)
entry.Chunks = append(entry.Chunks, replicatedChunks...)
// save updated meta data
return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
}
if _, err := client.UpdateEntry(ctx, request); err != nil {
return fmt.Errorf("update entry %s: %v", key, err)
}
return nil
})
}
func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) {
deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks)
newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks)
return
}
func replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) {
fileId, err := fetchAndWrite(sourceChunk)
if err != nil {
return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
for _, a := range as {
found := false
for _, b := range bs {
if a.FileId == b.FileId {
found = true
break
}
}
if !found {
delta = append(delta, a)
}
return &filer_pb.FileChunk{
FileId: fileId,
Offset: sourceChunk.Offset,
Size: sourceChunk.Size,
Mtime: sourceChunk.Mtime,
ETag: sourceChunk.ETag,
SourceFileId: sourceChunk.FileId,
}, nil
}
func fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
return
}

View file

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"strings"
"context"
"net/http"
)
type ReplicationSource interface {
@ -32,7 +33,7 @@ func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) {
return nil
}
func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err error) {
func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) {
vid2Locations := make(map[string]*filer_pb.Locations)
@ -55,21 +56,21 @@ func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err erro
if err != nil {
glog.V(1).Infof("replication lookup volume id %s: %v", vid, err)
return nil, fmt.Errorf("replication lookup volume id %s: %v", vid, err)
return "", nil, nil, fmt.Errorf("replication lookup volume id %s: %v", vid, err)
}
locations := vid2Locations[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(1).Infof("replication locate volume id %s: %v", vid, err)
return nil, fmt.Errorf("replication locate volume id %s: %v", vid, err)
return "", nil, nil, fmt.Errorf("replication locate volume id %s: %v", vid, err)
}
fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part)
_, readCloser, err = util.DownloadUrl(fileUrl)
filename, header, readCloser, err = util.DownloadFile(fileUrl)
return readCloser, err
return filename, header, readCloser, err
}
func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {

View file

@ -155,11 +155,12 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
return readFn(r.Body)
}
func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) {
func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.ReadCloser, e error) {
response, err := client.Get(fileUrl)
if err != nil {
return "", nil, err
return "", nil, nil, err
}
header = response.Header
contentDisposition := response.Header["Content-Disposition"]
if len(contentDisposition) > 0 {
idx := strings.Index(contentDisposition[0], "filename=")