mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
add "weed filer.cat" to read files directly from volume servers
This commit is contained in:
parent
95ecf0c72f
commit
2b76854641
|
@ -15,6 +15,7 @@ var Commands = []*Command{
|
||||||
cmdDownload,
|
cmdDownload,
|
||||||
cmdExport,
|
cmdExport,
|
||||||
cmdFiler,
|
cmdFiler,
|
||||||
|
cmdFilerCat,
|
||||||
cmdFilerReplicate,
|
cmdFilerReplicate,
|
||||||
cmdFilerSynchronize,
|
cmdFilerSynchronize,
|
||||||
cmdFix,
|
cmdFix,
|
||||||
|
|
118
weed/command/filer_cat.go
Normal file
118
weed/command/filer_cat.go
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
package command
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"math"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
filerCat FilerCatOptions
|
||||||
|
)
|
||||||
|
|
||||||
|
type FilerCatOptions struct {
|
||||||
|
grpcDialOption grpc.DialOption
|
||||||
|
filerAddress string
|
||||||
|
filerClient filer_pb.SeaweedFilerClient
|
||||||
|
output *string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
|
||||||
|
return func(fileId string) (targetUrls []string, err error) {
|
||||||
|
vid := filer.VolumeId(fileId)
|
||||||
|
resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
||||||
|
VolumeIds: []string{vid},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
locations := resp.LocationsMap[vid]
|
||||||
|
for _, loc := range locations.Locations {
|
||||||
|
targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdFilerCat.Run = runFilerCat // break init cycle
|
||||||
|
filerCat.output = cmdFilerCat.Flag.String("o", "", "write to file instead of stdout")
|
||||||
|
}
|
||||||
|
|
||||||
|
var cmdFilerCat = &Command{
|
||||||
|
UsageLine: "filer.cat [-o <file>] http://localhost:8888/path/to/file",
|
||||||
|
Short: "copy one file to local",
|
||||||
|
Long: `read one file to stdout or write to a file
|
||||||
|
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
func runFilerCat(cmd *Command, args []string) bool {
|
||||||
|
|
||||||
|
util.LoadConfiguration("security", false)
|
||||||
|
|
||||||
|
if len(args) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
filerSource := args[len(args)-1]
|
||||||
|
|
||||||
|
filerUrl, err := url.Parse(filerSource)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("The last argument should be a URL on filer: %v\n", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
urlPath := filerUrl.Path
|
||||||
|
if strings.HasSuffix(urlPath, "/") {
|
||||||
|
fmt.Printf("The last argument should be a file: %v\n", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
filerCat.filerAddress = filerUrl.Host
|
||||||
|
filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
|
||||||
|
|
||||||
|
dir, name := util.FullPath(urlPath).DirAndName()
|
||||||
|
|
||||||
|
writer := os.Stdout
|
||||||
|
if *filerCat.output != "" {
|
||||||
|
|
||||||
|
fmt.Printf("saving %s to %s\n", filerSource, *filerCat.output)
|
||||||
|
|
||||||
|
f, err := os.OpenFile(*filerCat.output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("open file %s: %v\n", *filerCat.output, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
writer = f
|
||||||
|
}
|
||||||
|
|
||||||
|
pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||||
|
Name: name,
|
||||||
|
Directory: dir,
|
||||||
|
}
|
||||||
|
respLookupEntry, err := filer_pb.LookupEntry(client, request)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
filerCat.filerClient = client
|
||||||
|
|
||||||
|
return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
|
@ -92,7 +92,7 @@ func runCopy(cmd *Command, args []string) bool {
|
||||||
}
|
}
|
||||||
urlPath := filerUrl.Path
|
urlPath := filerUrl.Path
|
||||||
if !strings.HasSuffix(urlPath, "/") {
|
if !strings.HasSuffix(urlPath, "/") {
|
||||||
fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
|
fmt.Printf("The last argument should be a folder and end with \"/\"\n")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package filer
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
@ -38,7 +39,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
|
func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
|
||||||
// TODO maybe parallel this
|
// TODO maybe parallel this
|
||||||
for _, chunk := range chunks {
|
for _, chunk := range chunks {
|
||||||
if !chunk.IsChunkManifest {
|
if !chunk.IsChunkManifest {
|
||||||
|
@ -63,7 +64,7 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
|
func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
|
||||||
if !chunk.IsChunkManifest {
|
if !chunk.IsChunkManifest {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -84,7 +85,7 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO fetch from cache for weed mount?
|
// TODO fetch from cache for weed mount?
|
||||||
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
|
func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
|
||||||
urlStrings, err := lookupFileIdFn(fileId)
|
urlStrings, err := lookupFileIdFn(fileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
|
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -52,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
|
||||||
return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5_digests, nil)), len(chunks))
|
return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5_digests, nil)), len(chunks))
|
||||||
}
|
}
|
||||||
|
|
||||||
func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
|
func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
|
||||||
|
|
||||||
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
|
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
|
||||||
|
|
||||||
|
@ -71,7 +72,7 @@ func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
|
func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
|
aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
|
||||||
if aErr != nil {
|
if aErr != nil {
|
||||||
|
@ -116,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool {
|
||||||
return cv.Size == cv.ChunkSize
|
return cv.Size == cv.ChunkSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
|
func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
|
||||||
|
|
||||||
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
|
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
|
||||||
|
|
||||||
|
@ -222,7 +223,7 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
|
||||||
|
|
||||||
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
|
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
|
||||||
// If the file chunk content is a chunk manifest
|
// If the file chunk content is a chunk manifest
|
||||||
func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
|
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
|
||||||
|
|
||||||
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
|
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
type ChunkReadAt struct {
|
type ChunkReadAt struct {
|
||||||
masterClient *wdclient.MasterClient
|
masterClient *wdclient.MasterClient
|
||||||
chunkViews []*ChunkView
|
chunkViews []*ChunkView
|
||||||
lookupFileId LookupFileIdFunctionType
|
lookupFileId wdclient.LookupFileIdFunctionType
|
||||||
readerLock sync.Mutex
|
readerLock sync.Mutex
|
||||||
fileSize int64
|
fileSize int64
|
||||||
|
|
||||||
|
@ -31,9 +31,7 @@ type ChunkReadAt struct {
|
||||||
var _ = io.ReaderAt(&ChunkReadAt{})
|
var _ = io.ReaderAt(&ChunkReadAt{})
|
||||||
var _ = io.Closer(&ChunkReadAt{})
|
var _ = io.Closer(&ChunkReadAt{})
|
||||||
|
|
||||||
type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
|
func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType {
|
||||||
|
|
||||||
func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
|
|
||||||
|
|
||||||
vidCache := make(map[string]*filer_pb.Locations)
|
vidCache := make(map[string]*filer_pb.Locations)
|
||||||
var vicCacheLock sync.RWMutex
|
var vicCacheLock sync.RWMutex
|
||||||
|
|
|
@ -13,16 +13,16 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
||||||
|
|
||||||
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
|
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
|
||||||
chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
|
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
|
||||||
|
|
||||||
fileId2Url := make(map[string][]string)
|
fileId2Url := make(map[string][]string)
|
||||||
|
|
||||||
for _, chunkView := range chunkViews {
|
for _, chunkView := range chunkViews {
|
||||||
|
|
||||||
urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
|
urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
||||||
return err
|
return err
|
||||||
|
@ -86,7 +86,7 @@ type ChunkStreamReader struct {
|
||||||
bufferOffset int64
|
bufferOffset int64
|
||||||
bufferPos int
|
bufferPos int
|
||||||
chunkIndex int
|
chunkIndex int
|
||||||
lookupFileId LookupFileIdFunctionType
|
lookupFileId wdclient.LookupFileIdFunctionType
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = io.ReadSeeker(&ChunkStreamReader{})
|
var _ = io.ReadSeeker(&ChunkStreamReader{})
|
||||||
|
|
|
@ -3,6 +3,7 @@ package filersink
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
@ -206,7 +207,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
func compareChunks(lookupFileIdFn filer.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
|
func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
|
||||||
aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
|
aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
|
||||||
if aErr != nil {
|
if aErr != nil {
|
||||||
return nil, nil, aErr
|
return nil, nil, aErr
|
||||||
|
|
|
@ -15,6 +15,12 @@ const (
|
||||||
maxCursorIndex = 4096
|
maxCursorIndex = 4096
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type HasLookupFileIdFunction interface {
|
||||||
|
GetLookupFileIdFunction() LookupFileIdFunctionType
|
||||||
|
}
|
||||||
|
|
||||||
|
type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
|
||||||
|
|
||||||
type Location struct {
|
type Location struct {
|
||||||
Url string `json:"url,omitempty"`
|
Url string `json:"url,omitempty"`
|
||||||
PublicUrl string `json:"publicUrl,omitempty"`
|
PublicUrl string `json:"publicUrl,omitempty"`
|
||||||
|
@ -67,6 +73,10 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (vc *vidMap) GetLookupFileIdFunction() LookupFileIdFunctionType {
|
||||||
|
return vc.LookupFileId
|
||||||
|
}
|
||||||
|
|
||||||
func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
|
func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
|
||||||
parts := strings.Split(fileId, ",")
|
parts := strings.Split(fileId, ",")
|
||||||
if len(parts) != 2 {
|
if len(parts) != 2 {
|
||||||
|
|
Loading…
Reference in a new issue