seaweedfs/weed/filer2/filer_client_util.go

170 lines
4.2 KiB
Go

package filer2
import (
"context"
"fmt"
"io"
"math"
"strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func VolumeId(fileId string) string {
lastCommaIndex := strings.LastIndex(fileId, ",")
if lastCommaIndex > 0 {
return fileId[:lastCommaIndex]
}
return fileId
}
type FilerClient interface {
WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
AdjustedUrl(hostAndPort string) string
}
func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
var vids []string
for _, chunkView := range chunkViews {
vids = append(vids, VolumeId(chunkView.FileId))
}
vid2Locations := make(map[string]*filer_pb.Locations)
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: vids,
})
if err != nil {
return err
}
vid2Locations = resp.LocationsMap
return nil
})
if err != nil {
return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
}
var wg sync.WaitGroup
for _, chunkView := range chunkViews {
wg.Add(1)
go func(chunkView *ChunkView) {
defer wg.Done()
glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
locations := vid2Locations[VolumeId(chunkView.FileId)]
if locations == nil || len(locations.Locations) == 0 {
glog.V(0).Infof("failed to locate %s", chunkView.FileId)
err = fmt.Errorf("failed to locate %s", chunkView.FileId)
return
}
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
var n int64
n, err = util.ReadUrl(fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId), chunkView.CipherKey, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)])
if err != nil {
glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err)
err = fmt.Errorf("failed to read http://%s/%s: %v",
volumeServerAddress, chunkView.FileId, err)
return
}
glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
totalRead += n
}(chunkView)
}
wg.Wait()
return
}
func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
dir, name := fullFilePath.DirAndName()
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
}
// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
resp, err := client.LookupDirectoryEntry(context.Background(), request)
if err != nil {
if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
return nil
}
glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
return err
}
if resp.Entry == nil {
// glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
return nil
}
entry = resp.Entry
return nil
})
return
}
func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
lastEntryName := ""
request := &filer_pb.ListEntriesRequest{
Directory: string(fullDirPath),
Prefix: prefix,
StartFromFileName: lastEntryName,
Limit: math.MaxUint32,
}
glog.V(3).Infof("read directory: %v", request)
stream, err := client.ListEntries(context.Background(), request)
if err != nil {
return fmt.Errorf("list %s: %v", fullDirPath, err)
}
var prevEntry *filer_pb.Entry
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
if prevEntry != nil {
fn(prevEntry, true)
}
break
} else {
return recvErr
}
}
if prevEntry != nil {
fn(prevEntry, false)
}
prevEntry = resp.Entry
}
return nil
})
return
}