diff --git a/weed/command/download.go b/weed/command/download.go index 39ed2b38e..b3e33defd 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -55,7 +55,7 @@ func downloadToFile(server, fileId, saveDir string) error { if lookupError != nil { return lookupError } - filename, rc, err := util.DownloadUrl(fileUrl) + filename, _, rc, err := util.DownloadFile(fileUrl) if err != nil { return err } @@ -108,7 +108,7 @@ func fetchContent(server string, fileId string) (filename string, content []byte return "", nil, lookupError } var rc io.ReadCloser - if filename, rc, e = util.DownloadUrl(fileUrl); e != nil { + if filename, _, rc, e = util.DownloadFile(fileUrl); e != nil { return "", nil, e } content, e = ioutil.ReadAll(rc) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 4f5d5203e..add7f5e2b 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -30,6 +30,8 @@ func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { } } + sink.SetSourceFiler(source) + return &Replicator{ sink: sink, source: source, diff --git a/weed/replication/sink/fetch_write.go b/weed/replication/sink/fetch_write.go new file mode 100644 index 000000000..e432aa45c --- /dev/null +++ b/weed/replication/sink/fetch_write.go @@ -0,0 +1,122 @@ +package sink + +import ( + "context" + "sync" + "strings" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { + if len(sourceChunks) == 0 { + return + } + var wg sync.WaitGroup + for _, sourceChunk := range sourceChunks { + wg.Add(1) + go func(chunk *filer_pb.FileChunk) { + defer wg.Done() + replicatedChunk, e := fs.replicateOneChunk(chunk) + if e != nil { + err = e + } + replicatedChunks = append(replicatedChunks, replicatedChunk) + }(sourceChunk) + } + wg.Wait() + + return +} + +func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { + + fileId, err := fs.fetchAndWrite(sourceChunk) + if err != nil { + return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) + } + + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: sourceChunk.Offset, + Size: sourceChunk.Size, + Mtime: sourceChunk.Mtime, + ETag: sourceChunk.ETag, + SourceFileId: sourceChunk.FileId, + }, nil +} + +func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { + + filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.FileId) + if err != nil { + return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err) + } + defer readCloser.Close() + + var host string + + if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: fs.replication, + Collection: fs.collection, + TtlSec: fs.ttlSec, + DataCenter: fs.dataCenter, + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + + fileId, host = resp.FileId, resp.Url + + return nil + }); err != nil { + return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + + glog.V(3).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) + + uploadResult, err := operation.Upload(fileUrl, filename, readCloser, + "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, "") + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) + return "", fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + return "", fmt.Errorf("upload result: %v", uploadResult.Error) + } + + return +} + +func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + grpcConnection, err := util.GrpcDial(fs.grpcAddress) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) + } + defer grpcConnection.Close() + + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + + return fn(client) +} + +func volumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go index 387bffb58..a39402aeb 100644 --- a/weed/replication/sink/filer_sink.go +++ b/weed/replication/sink/filer_sink.go @@ -1,14 +1,14 @@ package sink import ( + "fmt" + "context" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "fmt" - "strings" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "context" - "sync" + "github.com/chrislusf/seaweedfs/weed/replication/source" ) type ReplicationSink interface { @@ -16,11 +16,17 @@ type ReplicationSink interface { CreateEntry(key string, entry *filer_pb.Entry) error UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error GetDirectory() string + SetSourceFiler(s *source.FilerSource) } type FilerSink struct { grpcAddress string dir string + filerSource *source.FilerSource + replication string + collection string + ttlSec int32 + dataCenter string } func (fs *FilerSink) GetDirectory() string { @@ -34,6 +40,10 @@ func (fs *FilerSink) Initialize(configuration util.Configuration) error { ) } +func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) { + fs.filerSource = s +} + func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress fs.dir = dir @@ -65,13 +75,15 @@ func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteInclud func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { - replicatedChunks, err := replicateChunks(entry.Chunks) + replicatedChunks, err := fs.replicateChunks(entry.Chunks) if err != nil { glog.V(0).Infof("replicate entry chunks %s: %v", key, err) return fmt.Errorf("replicate entry chunks %s: %v", key, err) } + glog.V(0).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -96,70 +108,84 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { }) } -func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { - return nil -} +func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (err error) { -func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + ctx := context.Background() + + dir, name := filer2.FullPath(key).DirAndName() + + // find out what changed + deletedChunks, newChunks := compareChunks(oldEntry, newEntry) + + // read existing entry + var entry *filer_pb.Entry + err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + } + + glog.V(4).Infof("lookup directory entry: %v", request) + resp, err := client.LookupDirectoryEntry(ctx, request) + if err != nil { + glog.V(0).Infof("lookup %s: %v", key, err) + return err + } + + entry = resp.Entry + + return nil + }) - grpcConnection, err := util.GrpcDial(fs.grpcAddress) if err != nil { - return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) + return err } - defer grpcConnection.Close() - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + // delete the chunks that are deleted from the source + if deleteIncludeChunks { + // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks + entry.Chunks = minusChunks(entry.Chunks, deletedChunks) + } - return fn(client) + // replicate the chunks that are new in the source + replicatedChunks, err := fs.replicateChunks(newChunks) + entry.Chunks = append(entry.Chunks, replicatedChunks...) + + // save updated meta data + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + } + + if _, err := client.UpdateEntry(ctx, request); err != nil { + return fmt.Errorf("update entry %s: %v", key, err) + } + + return nil + }) + +} +func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { + deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks) + newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks) + return } -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} - -func replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { - if len(sourceChunks) == 0 { - return - } - var wg sync.WaitGroup - for _, s := range sourceChunks { - wg.Add(1) - go func(chunk *filer_pb.FileChunk) { - defer wg.Done() - replicatedChunk, e := replicateOneChunk(chunk) - if e != nil { - err = e +func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { + for _, a := range as { + found := false + for _, b := range bs { + if a.FileId == b.FileId { + found = true + break } - replicatedChunks = append(replicatedChunks, replicatedChunk) - }(s) + } + if !found { + delta = append(delta, a) + } } - wg.Wait() - - return -} - -func replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { - - fileId, err := fetchAndWrite(sourceChunk) - if err != nil { - return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) - } - - return &filer_pb.FileChunk{ - FileId: fileId, - Offset: sourceChunk.Offset, - Size: sourceChunk.Size, - Mtime: sourceChunk.Mtime, - ETag: sourceChunk.ETag, - SourceFileId: sourceChunk.FileId, - }, nil -} - -func fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { - return } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index f6a4cc55f..8d10dc59a 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "strings" "context" + "net/http" ) type ReplicationSource interface { @@ -32,7 +33,7 @@ func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { return nil } -func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err error) { +func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { vid2Locations := make(map[string]*filer_pb.Locations) @@ -55,21 +56,21 @@ func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err erro if err != nil { glog.V(1).Infof("replication lookup volume id %s: %v", vid, err) - return nil, fmt.Errorf("replication lookup volume id %s: %v", vid, err) + return "", nil, nil, fmt.Errorf("replication lookup volume id %s: %v", vid, err) } locations := vid2Locations[vid] if locations == nil || len(locations.Locations) == 0 { glog.V(1).Infof("replication locate volume id %s: %v", vid, err) - return nil, fmt.Errorf("replication locate volume id %s: %v", vid, err) + return "", nil, nil, fmt.Errorf("replication locate volume id %s: %v", vid, err) } fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) - _, readCloser, err = util.DownloadUrl(fileUrl) + filename, header, readCloser, err = util.DownloadFile(fileUrl) - return readCloser, err + return filename, header, readCloser, err } func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 56c294f0d..7ae5713bb 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -155,11 +155,12 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e return readFn(r.Body) } -func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) { +func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.ReadCloser, e error) { response, err := client.Get(fileUrl) if err != nil { - return "", nil, err + return "", nil, nil, err } + header = response.Header contentDisposition := response.Header["Content-Disposition"] if len(contentDisposition) > 0 { idx := strings.Index(contentDisposition[0], "filename=")