seaweedfs/weed/filer/remote_storage.go
chrislu 9f9ef1340c use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
2021-12-26 00:15:03 -08:00

182 lines
5.8 KiB
Go

package filer
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"math"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/viant/ptrie"
)
const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
type FilerRemoteStorage struct {
rules ptrie.Trie
storageNameToConf map[string]*remote_pb.RemoteConf
}
func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
rs = &FilerRemoteStorage{
rules: ptrie.New(),
storageNameToConf: make(map[string]*remote_pb.RemoteConf),
}
return rs
}
func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
// execute this on filer
limit := int64(math.MaxInt32)
entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, limit, "", "", "")
if err != nil {
if err == filer_pb.ErrNotFound {
return nil
}
glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err)
return
}
for _, entry := range entries {
if entry.Name() == REMOTE_STORAGE_MOUNT_FILE {
if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil {
return err
}
continue
}
if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
return nil
}
conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(entry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err)
}
rs.storageNameToConf[conf.Name] = conf
}
return nil
}
func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) {
mappings := &remote_pb.RemoteStorageMapping{}
if err := proto.Unmarshal(data, mappings); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err)
}
for dir, storageLocation := range mappings.Mappings {
rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation)
}
return nil
}
func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *remote_pb.RemoteStorageLocation) {
rs.rules.Put([]byte(dir+"/"), loc)
}
func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation) {
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
mountDir = util.FullPath(string(key[:len(key)-1]))
remoteLocation = value.(*remote_pb.RemoteStorageLocation)
return true
})
return
}
func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
var storageLocation *remote_pb.RemoteStorageLocation
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
storageLocation = value.(*remote_pb.RemoteStorageLocation)
return true
})
if storageLocation == nil {
found = false
return
}
return rs.GetRemoteStorageClient(storageLocation.Name)
}
func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
remoteConf, found = rs.storageNameToConf[storageName]
if !found {
return
}
var err error
if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil {
found = true
return
}
return
}
func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error) {
mappings = &remote_pb.RemoteStorageMapping{
Mappings: make(map[string]*remote_pb.RemoteStorageLocation),
}
if len(oldContent) > 0 {
if err = proto.Unmarshal(oldContent, mappings); err != nil {
glog.Warningf("unmarshal existing mappings: %v", err)
}
}
return
}
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
return readErr
}); readErr != nil {
return nil, readErr
}
// unmarshal storage configuration
conf = &remote_pb.RemoteConf{}
if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
return
}
return
}
func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress)
if listErr != nil {
return nil, "", nil, nil, listErr
}
if dir == "" {
return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option")
}
var localMountedDir string
var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
for k, loc := range mappings.Mappings {
if strings.HasPrefix(dir, k) {
localMountedDir, remoteStorageMountedLocation = k, loc
}
}
if localMountedDir == "" {
return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir)
}
// find remote storage configuration
remoteStorageConf, err := ReadRemoteStorageConf(grpcDialOption, filerAddress, remoteStorageMountedLocation.Name)
if err != nil {
return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err
}
return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil
}