From 13e45e16054d16e8d8161a8ddb02fde3cd4cde8f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Aug 2021 01:21:42 -0700 Subject: [PATCH] filer.remote.sync can work now --- weed/Makefile | 4 + weed/command/command.go | 1 + weed/command/filer.go | 1 - weed/command/filer_remote_sync.go | 219 ++++++++++++++++++++ weed/command/filer_replication.go | 6 - weed/command/imports.go | 31 +++ weed/filer/filer_remote_storage.go | 1 - weed/filer/stream.go | 32 +-- weed/remote_storage/remote_storage.go | 6 +- weed/remote_storage/s3/s3_storage_client.go | 94 ++++++++- weed/shell/command_remote_configure.go | 11 +- weed/shell/command_remote_mount.go | 5 +- 12 files changed, 381 insertions(+), 30 deletions(-) create mode 100644 weed/command/filer_remote_sync.go create mode 100644 weed/command/imports.go diff --git a/weed/Makefile b/weed/Makefile index c82735a0e..ad1f0b6f9 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -37,3 +37,7 @@ debug_s3: debug_filer_copy: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h + +debug_filer_remote_sync: + go build -gcflags="all=-N -l" + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.remote.sync -filer="localhost:8888" -dir=/buckets/b2 -timeAgo=10000h diff --git a/weed/command/command.go b/weed/command/command.go index 9ae93fe61..02de2bd35 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -21,6 +21,7 @@ var Commands = []*Command{ cmdFilerCopy, cmdFilerMetaBackup, cmdFilerMetaTail, + cmdFilerRemoteSynchronize, cmdFilerReplicate, cmdFilerSynchronize, cmdFix, diff --git a/weed/command/filer.go b/weed/command/filer.go index 4fd2f9c72..ddee0852c 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -3,7 +3,6 @@ package command import ( "fmt" "net/http" - _ "net/http/pprof" "os" "strconv" "strings" diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go new file mode 100644 index 000000000..8d176ce2a --- /dev/null +++ b/weed/command/filer_remote_sync.go @@ -0,0 +1,219 @@ +package command + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "strings" + "time" +) + +type RemoteSyncOptions struct { + filerAddress *string + grpcDialOption grpc.DialOption + readChunkFromFiler *bool + debug *bool + timeAgo *time.Duration + dir *string +} + +const ( + RemoteSyncKeyPrefix = "remote.sync." +) + +var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) + +func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return fn(client) + }) +} +func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +var ( + remoteSyncOptions RemoteSyncOptions +) + +func init() { + cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle + remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") + remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") + remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") + remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") + remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +} + +var cmdFilerRemoteSynchronize = &Command{ + UsageLine: "filer.remote.sync -filer=: -dir=/mount/s3_on_cloud", + Short: "resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage", + Long: `resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage + + filer.remote.sync listens on filer update events. + If any mounted remote file is updated, it will fetch the updated content, + and write to the remote storage. +`, +} + +func runFilerRemoteSynchronize(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + remoteSyncOptions.grpcDialOption = grpcDialOption + + // read filer remote storage mount mappings + mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress) + if readErr != nil { + fmt.Printf("read mount mapping: %v", readErr) + return false + } + + filerSource := &source.FilerSource{} + filerSource.DoInitialize( + *remoteSyncOptions.filerAddress, + pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress), + "/", // does not matter + *remoteSyncOptions.readChunkFromFiler, + ) + + var found bool + for dir, remoteStorageMountLocation := range mappings.Mappings { + if *remoteSyncOptions.dir == dir { + found = true + storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name) + if readErr != nil { + fmt.Printf("read remote storage configuration for %s: %v", dir, readErr) + continue + } + fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir) + if err := util.Retry("filer.remote.sync "+dir, func() error { + return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) + }); err != nil { + fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err) + } + break + } + } + if !found { + fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir) + return false + } + + return true +} + +func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error { + + dirHash := util.HashStringToLong(mountedDir) + + // 1. specified by timeAgo + // 2. last offset timestamp for this directory + // 3. directory creation time + var lastOffsetTs time.Time + if *option.timeAgo == 0 { + mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) + if err != nil { + return fmt.Errorf("lookup %s: %v", mountedDir, err) + } + + lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) + if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { + lastOffsetTs = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resume from %v", lastOffsetTs) + } else { + lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) + } + } else { + lastOffsetTs = time.Now().Add(-*option.timeAgo) + } + + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + if len(message.NewEntry.Chunks) == 0 { + return nil + } + fmt.Printf("create: %+v\n", resp) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) + return client.WriteFile(dest, message.NewEntry, reader) + } + if message.OldEntry != nil && message.NewEntry == nil { + fmt.Printf("delete: %+v\n", resp) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) { + fmt.Printf("update meta: %+v\n", resp) + return client.UpdateFileMetadata(dest, message.NewEntry) + } + } + fmt.Printf("update: %+v\n", resp) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) + return client.WriteFile(dest, message.NewEntry, reader) + } + + return nil + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) + }) + + return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, + "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) +} + +func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation { + var dest string + source := string(sourcePath[len(mountDir):]) + if strings.HasSuffix(remoteMountLocation.Path, "/") { + dest = remoteMountLocation.Path + source[1:] + } else { + dest = remoteMountLocation.Path + source + } + return &filer_pb.RemoteStorageLocation{ + Name: remoteMountLocation.Name, + Bucket: remoteMountLocation.Bucket, + Path: dest, + } +} + +func isSameChunks(a, b []*filer_pb.FileChunk) bool { + if len(a) != len(b) { + return false + } + for i := 0; i < len(a); i++ { + x, y := a[i], b[i] + if !proto.Equal(x, y) { + return false + } + } + return true +} diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 885c95540..bf0a3e140 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -7,12 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/replication" "github.com/chrislusf/seaweedfs/weed/replication/sink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/util" ) diff --git a/weed/command/imports.go b/weed/command/imports.go new file mode 100644 index 000000000..d7ade1379 --- /dev/null +++ b/weed/command/imports.go @@ -0,0 +1,31 @@ +package command + +import ( + _ "net/http/pprof" + + _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" + + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" + + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" + +) \ No newline at end of file diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go index b1ee96a42..573dcf3e7 100644 --- a/weed/filer/filer_remote_storage.go +++ b/weed/filer/filer_remote_storage.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" - _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" "google.golang.org/grpc" diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 3859f9a67..503e6b23f 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -91,6 +91,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) type ChunkStreamReader struct { chunkViews []*ChunkView totalSize int64 + logicOffset int64 buffer []byte bufferOffset int64 bufferPos int @@ -137,8 +138,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F } func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { - _, err = c.Seek(off, io.SeekStart) - if err != nil { + if err = c.prepareBufferFor(c.logicOffset); err != nil { return } return c.Read(p) @@ -151,12 +151,15 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { return n, io.EOF } chunkView := c.chunkViews[c.nextChunkViewIndex] - c.fetchChunkToBuffer(chunkView) + if err = c.fetchChunkToBuffer(chunkView); err != nil { + return + } c.nextChunkViewIndex++ } t := copy(p[n:], c.buffer[c.bufferPos:]) c.bufferPos += t n += t + c.logicOffset += int64(t) } return } @@ -171,19 +174,26 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { switch whence { case io.SeekStart: case io.SeekCurrent: - offset += c.bufferOffset + int64(c.bufferPos) + offset += c.logicOffset case io.SeekEnd: offset = c.totalSize + offset } if offset > c.totalSize { err = io.ErrUnexpectedEOF + } else { + c.logicOffset = offset } + return offset, err + +} + +func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { // stay in the same chunk if !c.isBufferEmpty() { if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { c.bufferPos = int(offset - c.bufferOffset) - return offset, nil + return nil } } @@ -192,23 +202,21 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { return c.chunkViews[i].LogicOffset <= offset }) if currentChunkIndex == len(c.chunkViews) { - return 0, io.EOF + return io.EOF } // positioning within the new chunk chunk := c.chunkViews[currentChunkIndex] if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) + if err = c.fetchChunkToBuffer(chunk); err != nil { + return + } c.nextChunkViewIndex = currentChunkIndex + 1 } c.bufferPos = int(offset - c.bufferOffset) - } else { - return 0, io.ErrUnexpectedEOF } - - return offset, err - + return } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go index 06c089d7a..608d158ad 100644 --- a/weed/remote_storage/remote_storage.go +++ b/weed/remote_storage/remote_storage.go @@ -3,6 +3,7 @@ package remote_storage import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" "strings" "sync" ) @@ -30,7 +31,10 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file type RemoteStorageClient interface { Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error - ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) + ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) + WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) + UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) + DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) } type RemoteStorageClientMaker interface { diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go index 2263054f3..316751227 100644 --- a/weed/remote_storage/s3/s3_storage_client.go +++ b/weed/remote_storage/s3/s3_storage_client.go @@ -8,9 +8,11 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" + "io" ) func init() { @@ -45,7 +47,9 @@ type s3RemoteStorageClient struct { conn s3iface.S3API } -func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { +var _ = remote_storage.RemoteStorageClient(&s3RemoteStorageClient{}) + +func (s *s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { pathKey := remote.Path[1:] @@ -91,19 +95,19 @@ func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, } return } -func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) { +func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) { u.PartSize = int64(4 * 1024 * 1024) u.Concurrency = 1 }) - + dataSlice := make([]byte, int(size)) writerAt := aws.NewWriteAtBuffer(dataSlice) _, err = downloader.Download(writerAt, &s3.GetObjectInput{ - Bucket: aws.String(loc.Bucket), - Key: aws.String(loc.Path[1:]), - Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)), + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)), }) if err != nil { return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err) @@ -111,3 +115,81 @@ func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, off return writerAt.Bytes(), nil } + +func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) { + + fileSize := int64(filer.FileSize(entry)) + + partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB + for partSize*1000 < fileSize { + partSize *= 4 + } + + // Create an uploader with the session and custom options + uploader := s3manager.NewUploaderWithClient(s.conn, func(u *s3manager.Uploader) { + u.PartSize = partSize + u.Concurrency = 5 + }) + + // process tagging + tags := "" + for k, v := range entry.Extended { + if len(tags) > 0 { + tags = tags + "&" + } + tags = tags + k + "=" + string(v) + } + + // Upload the file to S3. + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + Body: reader, + ACL: aws.String("private"), + ServerSideEncryption: aws.String("AES256"), + StorageClass: aws.String("STANDARD_IA"), + Tagging: aws.String(tags), + }) + + //in case it fails to upload + if err != nil { + return fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err) + } + + return nil +} + +func toTagging(attributes map[string][]byte) *s3.Tagging { + tagging := &s3.Tagging{} + for k, v := range attributes { + tagging.TagSet = append(tagging.TagSet, &s3.Tag{ + Key: aws.String(k), + Value: aws.String(string(v)), + }) + } + return tagging +} + +func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { + tagging := toTagging(entry.Extended) + if len(tagging.TagSet) > 0 { + _, err = s.conn.PutObjectTagging(&s3.PutObjectTaggingInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + Tagging: toTagging(entry.Extended), + }) + } else { + _, err = s.conn.DeleteObjectTagging(&s3.DeleteObjectTaggingInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + }) + } + return +} +func (s *s3RemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) { + _, err = s.conn.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + }) + return +} diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go index 20ded5f5b..7a9ad1f65 100644 --- a/weed/shell/command_remote_configure.go +++ b/weed/shell/command_remote_configure.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "io" "regexp" @@ -96,9 +97,15 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE conf.S3SecretKey = "" - fmt.Fprintf(writer, "%+v\n", conf) + m := jsonpb.Marshaler{ + EmitDefaults: false, + Indent: " ", + } - return nil + err := m.Marshal(writer, conf) + fmt.Fprintln(writer) + + return err }) } diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 73a5119d5..37b235a55 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -88,7 +88,10 @@ func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *Command Indent: " ", } - return m.Marshal(writer, mappings) + err = m.Marshal(writer, mappings) + fmt.Fprintln(writer) + + return }