mount with name

This commit is contained in:
chrislu 2022-02-10 22:43:55 -08:00
parent 7a0c35674c
commit b6143de52a
2 changed files with 249 additions and 11 deletions

View file

@ -1,14 +1,24 @@
package command package command
import ( import (
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mount" "github.com/chrislusf/seaweedfs/weed/mount"
"github.com/chrislusf/seaweedfs/weed/mount/unmount" "github.com/chrislusf/seaweedfs/weed/mount/unmount"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
"net/http" "net/http"
"os" "os"
"os/user"
"runtime"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
@ -42,18 +52,171 @@ func runMount2(cmd *Command, args []string) bool {
func RunMount2(option *Mount2Options, umask os.FileMode) bool { func RunMount2(option *Mount2Options, umask os.FileMode) bool {
opts := &fs.Options{} // basic checks
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
if chunkSizeLimitMB <= 0 {
fmt.Printf("Please specify a reasonable buffer size.")
return false
}
// try to connect to filer
filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
var err error
for i := 0; i < 10; i++ {
err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
return true
}
filerMountRootPath := *option.filerMountRootPath
// clean up mount point
dir := util.ResolvePath(*option.dir)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
unmount.Unmount(dir)
// detect mount folder mode
if *option.dirAutoCreate {
os.MkdirAll(dir, os.FileMode(0777)&^umask)
}
fileInfo, err := os.Stat(dir)
// collect uid, gid
uid, gid := uint32(0), uint32(0)
mountMode := os.ModeDir | 0777
if err == nil {
mountMode = os.ModeDir | os.FileMode(0777)&^umask
uid, gid = util.GetFileUidGid(fileInfo)
fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode)
} else {
fmt.Printf("can not stat %s\n", dir)
return false
}
// detect uid, gid
if uid == 0 {
if u, err := user.Current(); err == nil {
if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
uid = uint32(parsedId)
}
if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
gid = uint32(parsedId)
}
fmt.Printf("current uid=%d gid=%d\n", uid, gid)
}
}
// mapping uid, gid
uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
if err != nil {
fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
return false
}
// Ensure target mount point availability
if isValid := checkMountPointAvailable(dir); !isValid {
glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
return true
}
// mount fuse
sec := time.Second
opts := &fs.Options{
MountOptions: fuse.MountOptions{
AllowOther: *option.allowOthers,
Options: nil,
MaxBackground: 128,
MaxWrite: 1024 * 1024 * 2,
MaxReadAhead: 1024 * 1024 * 2,
IgnoreSecurityLabels: false,
RememberInodes: false,
FsName: *option.filer + ":" + filerMountRootPath,
Name: "seaweedfs",
SingleThreaded: false,
DisableXAttrs: false,
Debug: false,
EnableLocks: false,
ExplicitDataCacheControl: false,
// SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
DirectMount: true,
DirectMountFlags: 0,
// EnableAcl: false,
},
EntryTimeout: &sec,
AttrTimeout: &sec,
NegativeTimeout: nil,
FirstAutomaticIno: 0,
OnAdd: nil,
NullPermissions: false,
UID: 0,
GID: 0,
ServerCallbacks: nil,
Logger: nil,
}
opts.Debug = true opts.Debug = true
unmount.Unmount(*option.dir) // find mount point
grace.OnInterrupt(func() { mountRoot := filerMountRootPath
unmount.Unmount(*option.dir) if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
MountDirectory: dir,
FilerAddresses: filerAddresses,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
CacheDir: *option.cacheDir,
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
MountUid: uid,
MountGid: gid,
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
Umask: umask,
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
UidGidMapper: uidGidMapper,
}) })
server, err := fs.Mount(*option.dir, &mount.WeedFS{}, opts) server, err := fs.Mount(dir, seaweedFileSystem, opts)
if err != nil { if err != nil {
glog.Fatalf("Mount fail: %v", err) glog.Fatalf("Mount fail: %v", err)
} }
grace.OnInterrupt(func() {
unmount.Unmount(dir)
})
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
server.Wait() server.Wait()
return true return true

View file

@ -2,17 +2,92 @@ package mount
import ( import (
"context" "context"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
"os"
"path"
"path/filepath"
"syscall" "syscall"
"time"
"github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
) )
type WeedFS struct { type Option struct {
fs.Inode MountDirectory string
FilerAddresses []pb.ServerAddress
filerIndex int
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
CacheDir string
CacheSizeMB int64
DataCenter string
Umask os.FileMode
MountUid uint32
MountGid uint32
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
uniqueCacheDir string
uniqueCacheTempPageDir string
} }
func (r *WeedFS) OnAdd(ctx context.Context) { type WFS struct {
fs.Inode
option *Option
metaCache *meta_cache.MetaCache
signature int32
}
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
signature: util.RandomInt32(),
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
})
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
})
return wfs
}
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
}
func (option *Option) getTempFilePageDir() string {
return option.uniqueCacheTempPageDir
}
func (option *Option) getUniqueCacheDir() string {
return option.uniqueCacheDir
}
func (r *WFS) OnAdd(ctx context.Context) {
ch := r.NewPersistentInode( ch := r.NewPersistentInode(
ctx, &fs.MemRegularFile{ ctx, &fs.MemRegularFile{
Data: []byte("file.txt"), Data: []byte("file.txt"),
@ -23,10 +98,10 @@ func (r *WeedFS) OnAdd(ctx context.Context) {
r.AddChild("file.txt", ch, false) r.AddChild("file.txt", ch, false)
} }
func (r *WeedFS) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno { func (r *WFS) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
out.Mode = 0755 out.Mode = 0755
return 0 return 0
} }
var _ = (fs.NodeGetattrer)((*WeedFS)(nil)) var _ = (fs.NodeGetattrer)((*WFS)(nil))
var _ = (fs.NodeOnAdder)((*WeedFS)(nil)) var _ = (fs.NodeOnAdder)((*WFS)(nil))