diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 2cbe83e21..ea909a527 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -78,15 +78,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. cipherKey = util.GenCipherKey() } - err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan interface{}) { - sizeBuf := make([]byte, 4) - for item := range outputChan { - b := item.([]byte) - util.Uint32toBytes(sizeBuf, uint32(len(b))) - dst.Write(sizeBuf) - dst.Write(b) - } - }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { if !entry.Entry.IsDirectory { ext := filepath.Ext(entry.Entry.Name) if encrypted, encErr := util.Encrypt([]byte(entry.Entry.Name), cipherKey); encErr == nil { @@ -102,6 +94,14 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. outputChan <- bytes return nil + }, func(outputChan chan interface{}) { + sizeBuf := make([]byte, 4) + for item := range outputChan { + b := item.([]byte) + util.Uint32toBytes(sizeBuf, uint32(len(b))) + dst.Write(sizeBuf) + dst.Write(b) + } }) if err == nil { @@ -112,7 +112,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. } -func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error { +func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{})) error { var wg sync.WaitGroup wg.Add(1) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 3302542aa..2d570fef3 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -145,22 +145,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint cookie uint32 path util.FullPath } - return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(outputChan chan interface{}) { - buffer := make([]byte, 16) - for item := range outputChan { - i := item.(*Item) - if f, ok := files[i.vid]; ok { - util.Uint64toBytes(buffer, i.fileKey) - util.Uint32toBytes(buffer[8:], i.cookie) - util.Uint32toBytes(buffer[12:], uint32(len(i.path))) - f.Write(buffer) - f.Write([]byte(i.path)) - // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path) - } else { - fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) - } - } - }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { if verbose && entry.Entry.IsDirectory { fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) } @@ -178,6 +163,21 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint } } return nil + }, func(outputChan chan interface{}) { + buffer := make([]byte, 16) + for item := range outputChan { + i := item.(*Item) + if f, ok := files[i.vid]; ok { + util.Uint64toBytes(buffer, i.fileKey) + util.Uint32toBytes(buffer[8:], i.cookie) + util.Uint32toBytes(buffer[12:], uint32(len(i.path))) + f.Write(buffer) + f.Write([]byte(i.path)) + // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path) + } else { + fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) + } + } }) } @@ -307,14 +307,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer vid uint32 fileKey uint64 } - return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) { - buffer := make([]byte, 8) - for item := range outputChan { - i := item.(*Item) - util.Uint64toBytes(buffer, i.fileKey) - files[i.vid].Write(buffer) - } - }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) if resolveErr != nil { if verbose { @@ -330,6 +323,13 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer } } return nil + }, func(outputChan chan interface{}) { + buffer := make([]byte, 8) + for item := range outputChan { + i := item.(*Item) + util.Uint64toBytes(buffer, i.fileKey) + files[i.vid].Write(buffer) + } }) }