seaweedfs/weed/mount/weedfs.go

211 lines
6.5 KiB
Go
Raw Normal View History

2022-02-11 04:32:13 +00:00
package mount
import (
2022-02-12 09:54:16 +00:00
"context"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
2022-02-11 06:43:55 +00:00
"google.golang.org/grpc"
2022-02-14 06:50:44 +00:00
"math/rand"
2022-02-11 06:43:55 +00:00
"os"
"path"
"path/filepath"
2022-02-11 06:43:55 +00:00
"time"
2022-02-11 04:32:13 +00:00
"github.com/hanwen/go-fuse/v2/fs"
)
2022-02-11 06:43:55 +00:00
type Option struct {
MountDirectory string
FilerAddresses []pb.ServerAddress
filerIndex int
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
CacheDir string
CacheSizeMB int64
DataCenter string
Umask os.FileMode
Quota int64
2022-06-06 03:27:12 +00:00
DisableXAttr bool
2022-02-11 06:43:55 +00:00
MountUid uint32
MountGid uint32
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
uniqueCacheDir string
uniqueCacheTempPageDir string
2022-02-11 06:43:55 +00:00
}
type WFS struct {
2022-03-13 08:14:50 +00:00
// https://dl.acm.org/doi/fullHtml/10.1145/3310148
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
fuse.RawFileSystem
2022-04-02 22:14:37 +00:00
mount_pb.UnimplementedSeaweedMountServer
2022-02-11 04:32:13 +00:00
fs.Inode
2022-02-14 06:50:44 +00:00
option *Option
metaCache *meta_cache.MetaCache
stats statsCache
chunkCache *chunk_cache.TieredChunkCache
signature int32
concurrentWriters *util.LimitedConcurrentExecutor
inodeToPath *InodeToPath
fhmap *FileHandleToInode
dhmap *DirectoryHandleToInode
fuseServer *fuse.Server
2022-03-06 06:10:43 +00:00
IsOverQuota bool
2022-02-11 06:43:55 +00:00
}
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
signature: util.RandomInt32(),
2022-02-28 20:16:53 +00:00
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
2022-02-14 03:14:34 +00:00
fhmap: NewFileHandleToInode(),
dhmap: NewDirectoryHandleToInode(),
2022-02-11 06:43:55 +00:00
}
2022-02-14 06:50:44 +00:00
wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
wfs.option.setupUniqueCacheDirectory()
if option.CacheSizeMB > 0 {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
}
2022-02-28 20:16:53 +00:00
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper,
util.FullPath(option.FilerMountRootPath),
func(path util.FullPath) {
wfs.inodeToPath.MarkChildrenCached(path)
}, func(path util.FullPath) bool {
return wfs.inodeToPath.IsChildrenCached(path)
}, func(filePath util.FullPath, entry *filer_pb.Entry) {
})
2022-02-11 06:43:55 +00:00
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
2022-02-16 08:39:21 +00:00
os.RemoveAll(option.getUniqueCacheDir())
2022-02-11 06:43:55 +00:00
})
2022-02-14 06:50:44 +00:00
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
}
2022-02-11 06:43:55 +00:00
return wfs
}
2022-02-14 09:09:31 +00:00
func (wfs *WFS) StartBackgroundTasks() {
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
go wfs.loopCheckQuota()
2022-02-14 09:09:31 +00:00
}
2022-02-12 05:35:09 +00:00
func (wfs *WFS) String() string {
return "seaweedfs"
}
func (wfs *WFS) Init(server *fuse.Server) {
wfs.fuseServer = server
}
2022-07-25 01:24:34 +00:00
func (wfs *WFS) maybeReadEntry(inode uint64, followSymLink bool) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, targetInode uint64, status fuse.Status) {
path, status = wfs.inodeToPath.GetPath(inode)
if status != fuse.OK {
return
}
2022-02-14 09:36:10 +00:00
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
entry = fh.GetEntry()
if entry != nil && fh.entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
2022-04-06 17:11:11 +00:00
}
2022-07-24 22:30:55 +00:00
} else {
entry, status = wfs.maybeLoadEntry(path)
2022-02-14 09:36:10 +00:00
}
2022-07-25 01:24:34 +00:00
targetInode = inode
if status == fuse.OK && followSymLink && entry.FileMode()&os.ModeSymlink != 0 {
if entry != nil && entry.Attributes != nil && entry.Attributes.Inode != 0 {
targetInode = entry.Attributes.Inode
}
target := util.FullPath(filepath.Join(string(path), "../"+entry.Attributes.SymlinkTarget))
targetParent, _ := target.DirAndName()
wfs.inodeToPath.EnsurePath(util.FullPath(targetParent), true)
entry, status = wfs.maybeLoadEntry(target)
2022-07-24 22:11:24 +00:00
}
return
2022-02-12 09:54:16 +00:00
}
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
// glog.V(3).Infof("read entry cache miss %s", fullpath)
dir, name := fullpath.DirAndName()
// return a valid entry for the mount root
if string(fullpath) == wfs.option.FilerMountRootPath {
return &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: wfs.option.MountMtime.Unix(),
FileMode: uint32(wfs.option.MountMode),
Uid: wfs.option.MountUid,
Gid: wfs.option.MountGid,
Crtime: wfs.option.MountCtime.Unix(),
},
}, fuse.OK
}
// read from async meta cache
2022-05-25 01:52:04 +00:00
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
2022-02-12 09:54:16 +00:00
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
}
2022-02-13 06:41:45 +00:00
return cachedEntry.ToProtoEntry(), fuse.OK
2022-02-12 09:54:16 +00:00
}
2022-02-14 06:50:44 +00:00
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
return wfs.option.FilerAddresses[wfs.option.filerIndex]
}
2022-02-11 06:43:55 +00:00
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "swap")
os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
}
func (option *Option) getTempFilePageDir() string {
return option.uniqueCacheTempPageDir
2022-02-11 06:43:55 +00:00
}
func (option *Option) getUniqueCacheDir() string {
return option.uniqueCacheDir
2022-02-11 04:32:13 +00:00
}