write back remote entry to local entry after uploading to remote

This commit is contained in:
Chris Lu 2021-08-08 17:55:03 -07:00
parent dcf614a8c3
commit 7412ccdf88
3 changed files with 46 additions and 6 deletions

View file

@ -1,6 +1,7 @@
package command package command
import ( import (
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -157,7 +158,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
} }
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
return client.WriteFile(dest, message.NewEntry, reader) remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
} }
if message.OldEntry != nil && message.NewEntry == nil { if message.OldEntry != nil && message.NewEntry == nil {
fmt.Printf("delete: %+v\n", resp) fmt.Printf("delete: %+v\n", resp)
@ -182,7 +187,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return err return err
} }
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
return client.WriteFile(dest, message.NewEntry, reader) remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
} }
return nil return nil
@ -238,3 +247,14 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool {
} }
return false return false
} }
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
entry.RemoteEntry = remoteEntry
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
})
return err
})
}

View file

@ -32,7 +32,7 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file
type RemoteStorageClient interface { type RemoteStorageClient interface {
Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error
ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error)
UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error)
} }

View file

@ -116,7 +116,7 @@ func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, of
return writerAt.Bytes(), nil return writerAt.Bytes(), nil
} }
func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) { func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
fileSize := int64(filer.FileSize(entry)) fileSize := int64(filer.FileSize(entry))
@ -153,10 +153,12 @@ func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, e
//in case it fails to upload //in case it fails to upload
if err != nil { if err != nil {
return fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err) return nil, fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
} }
return nil // read back the remote entry
return s.readFileRemoteEntry(loc)
} }
func toTagging(attributes map[string][]byte) *s3.Tagging { func toTagging(attributes map[string][]byte) *s3.Tagging {
@ -170,6 +172,24 @@ func toTagging(attributes map[string][]byte) *s3.Tagging {
return tagging return tagging
} }
func (s *s3RemoteStorageClient) readFileRemoteEntry(loc *filer_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
resp, err := s.conn.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(loc.Bucket),
Key: aws.String(loc.Path[1:]),
})
if err != nil {
return nil, err
}
return &filer_pb.RemoteEntry{
LastModifiedAt: resp.LastModified.Unix(),
Size: *resp.ContentLength,
ETag: *resp.ETag,
StorageName: s.conf.Name,
}, nil
}
func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
tagging := toTagging(entry.Extended) tagging := toTagging(entry.Extended)
if len(tagging.TagSet) > 0 { if len(tagging.TagSet) > 0 {