diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index bfd266c5f..a9bf1d4e1 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -11,7 +11,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" "google.golang.org/grpc" "time" ) @@ -93,7 +92,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) }, func(err error) bool { if err != nil { - fmt.Printf("synchronize %s: %v\n", dir, err) + glog.Errorf("synchronize %s: %v", dir, err) } return true }) @@ -137,19 +136,19 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return nil } if message.OldEntry == nil && message.NewEntry != nil { - if len(message.NewEntry.Chunks) == 0 { + if !filer.HasData(message.NewEntry) { return nil } - fmt.Printf("create: %+v\n", resp) + glog.V(2).Infof("create: %+v", resp) if !shouldSendToRemote(message.NewEntry) { - fmt.Printf("skipping creating: %+v\n", resp) + glog.V(2).Infof("skipping creating: %+v", resp) return nil } dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) if message.NewEntry.IsDirectory { return client.WriteDirectory(dest, message.NewEntry) } - reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) + reader := filer.NewFileReader(filerSource, message.NewEntry) remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) if writeErr != nil { return writeErr @@ -157,7 +156,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) } if message.OldEntry != nil && message.NewEntry == nil { - fmt.Printf("delete: %+v\n", resp) + glog.V(2).Infof("delete: %+v", resp) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) return client.DeleteFile(dest) } @@ -165,23 +164,23 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) if !shouldSendToRemote(message.NewEntry) { - fmt.Printf("skipping updating: %+v\n", resp) + glog.V(2).Infof("skipping updating: %+v", resp) return nil } if message.NewEntry.IsDirectory { return client.WriteDirectory(dest, message.NewEntry) } if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { - if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) { - fmt.Printf("update meta: %+v\n", resp) + if filer.IsSameData(message.OldEntry, message.NewEntry) { + glog.V(2).Infof("update meta: %+v", resp) return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry) } } - fmt.Printf("update: %+v\n", resp) + glog.V(2).Infof("update: %+v", resp) if err := client.DeleteFile(oldDest); err != nil { return err } - reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) + reader := filer.NewFileReader(filerSource, message.NewEntry) remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) if writeErr != nil { return writeErr @@ -212,19 +211,6 @@ func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLoca } } -func isSameChunks(a, b []*filer_pb.FileChunk) bool { - if len(a) != len(b) { - return false - } - for i := 0; i < len(a); i++ { - x, y := a[i], b[i] - if !proto.Equal(x, y) { - return false - } - } - return true -} - func shouldSendToRemote(entry *filer_pb.Entry) bool { if entry.RemoteEntry == nil { return true diff --git a/weed/filer/stream.go b/weed/filer/stream.go index cad37a080..2a3870aac 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -3,6 +3,7 @@ package filer import ( "bytes" "fmt" + "github.com/golang/protobuf/proto" "io" "math" "sort" @@ -16,6 +17,44 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" ) +func HasData(entry *filer_pb.Entry) bool { + + if len(entry.Content) > 0 { + return true + } + + return len(entry.Chunks) > 0 +} + +func IsSameData(a, b *filer_pb.Entry) bool { + + if len(a.Content) > 0 || len(b.Content) > 0 { + return bytes.Equal(a.Content, b.Content) + } + + return isSameChunks(a.Chunks, b.Chunks) +} + +func isSameChunks(a, b []*filer_pb.FileChunk) bool { + if len(a) != len(b) { + return false + } + for i := 0; i < len(a); i++ { + x, y := a[i], b[i] + if !proto.Equal(x, y) { + return false + } + } + return true +} + +func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader { + if len(entry.Content) > 0 { + return bytes.NewReader(entry.Content) + } + return NewChunkStreamReader(filerClient, entry.Chunks) +} + func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)