seaweedfs/weed/mount/weedfs.go

211 lines
6.5 KiB
Go
Raw Normal View History

2022-02-11 04:32:13 +00:00
package mount
import (
2022-02-12 09:54:16 +00:00
"context"
"math/rand"
"os"
"path"
"path/filepath"
"sync/atomic"
"time"
"github.com/hanwen/go-fuse/v2/fuse"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
2022-02-11 04:32:13 +00:00
"github.com/hanwen/go-fuse/v2/fs"
)
2022-02-11 06:43:55 +00:00
type Option struct {
2022-08-21 19:20:27 +00:00
filerIndex int32 // align memory for atomic read/write
2022-02-11 06:43:55 +00:00
FilerAddresses []pb.ServerAddress
2022-08-21 19:20:27 +00:00
MountDirectory string
2022-02-11 06:43:55 +00:00
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
2023-08-17 06:47:43 +00:00
CacheDirForRead string
CacheSizeMBForRead int64
CacheDirForWrite string
2022-02-11 06:43:55 +00:00
DataCenter string
Umask os.FileMode
Quota int64
2022-06-06 03:27:12 +00:00
DisableXAttr bool
2022-02-11 06:43:55 +00:00
MountUid uint32
MountGid uint32
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
2023-08-17 06:47:43 +00:00
uniqueCacheDirForRead string
uniqueCacheDirForWrite string
2022-02-11 06:43:55 +00:00
}
type WFS struct {
2022-03-13 08:14:50 +00:00
// https://dl.acm.org/doi/fullHtml/10.1145/3310148
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
fuse.RawFileSystem
2022-04-02 22:14:37 +00:00
mount_pb.UnimplementedSeaweedMountServer
2022-02-11 04:32:13 +00:00
fs.Inode
2022-02-14 06:50:44 +00:00
option *Option
metaCache *meta_cache.MetaCache
stats statsCache
chunkCache *chunk_cache.TieredChunkCache
signature int32
concurrentWriters *util.LimitedConcurrentExecutor
inodeToPath *InodeToPath
fhmap *FileHandleToInode
dhmap *DirectoryHandleToInode
fuseServer *fuse.Server
2022-03-06 06:10:43 +00:00
IsOverQuota bool
2023-09-21 18:08:26 +00:00
fhLockTable *util.LockTable[FileHandleId]
2022-02-11 06:43:55 +00:00
}
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
signature: util.RandomInt32(),
2022-02-28 20:16:53 +00:00
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
2022-02-14 03:14:34 +00:00
fhmap: NewFileHandleToInode(),
dhmap: NewDirectoryHandleToInode(),
2023-09-21 18:08:26 +00:00
fhLockTable: util.NewLockTable[FileHandleId](),
2022-02-11 06:43:55 +00:00
}
wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
2022-02-14 06:50:44 +00:00
wfs.option.setupUniqueCacheDirectory()
2023-08-17 06:47:43 +00:00
if option.CacheSizeMBForRead > 0 {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
2022-02-14 06:50:44 +00:00
}
2023-08-17 06:47:43 +00:00
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
2022-02-28 20:16:53 +00:00
util.FullPath(option.FilerMountRootPath),
func(path util.FullPath) {
wfs.inodeToPath.MarkChildrenCached(path)
}, func(path util.FullPath) bool {
return wfs.inodeToPath.IsChildrenCached(path)
}, func(filePath util.FullPath, entry *filer_pb.Entry) {
})
2022-02-11 06:43:55 +00:00
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
2023-08-17 06:47:43 +00:00
os.RemoveAll(option.getUniqueCacheDirForWrite())
2023-08-17 06:54:23 +00:00
os.RemoveAll(option.getUniqueCacheDirForRead())
2022-02-11 06:43:55 +00:00
})
2022-02-14 06:50:44 +00:00
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
}
2022-02-11 06:43:55 +00:00
return wfs
}
2022-02-14 09:09:31 +00:00
func (wfs *WFS) StartBackgroundTasks() {
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
go wfs.loopCheckQuota()
2022-02-14 09:09:31 +00:00
}
2022-02-12 05:35:09 +00:00
func (wfs *WFS) String() string {
return "seaweedfs"
}
func (wfs *WFS) Init(server *fuse.Server) {
wfs.fuseServer = server
}
2022-08-04 08:35:18 +00:00
func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
path, status = wfs.inodeToPath.GetPath(inode)
if status != fuse.OK {
return
}
2022-02-14 09:36:10 +00:00
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
if entry != nil && fh.entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
}
})
2022-07-24 22:30:55 +00:00
} else {
entry, status = wfs.maybeLoadEntry(path)
2022-02-14 09:36:10 +00:00
}
return
2022-02-12 09:54:16 +00:00
}
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
// glog.V(3).Infof("read entry cache miss %s", fullpath)
dir, name := fullpath.DirAndName()
// return a valid entry for the mount root
if string(fullpath) == wfs.option.FilerMountRootPath {
return &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: wfs.option.MountMtime.Unix(),
FileMode: uint32(wfs.option.MountMode),
Uid: wfs.option.MountUid,
Gid: wfs.option.MountGid,
Crtime: wfs.option.MountCtime.Unix(),
},
}, fuse.OK
}
// read from async meta cache
2022-05-25 01:52:04 +00:00
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
2022-02-12 09:54:16 +00:00
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
}
2022-02-13 06:41:45 +00:00
return cachedEntry.ToProtoEntry(), fuse.OK
2022-02-12 09:54:16 +00:00
}
2022-02-14 06:50:44 +00:00
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
i := atomic.LoadInt32(&wfs.option.filerIndex)
return wfs.option.FilerAddresses[i]
2022-02-14 06:50:44 +00:00
}
2022-02-11 06:43:55 +00:00
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
2023-08-17 06:47:43 +00:00
option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
}
2023-08-17 06:47:43 +00:00
func (option *Option) getUniqueCacheDirForWrite() string {
return option.uniqueCacheDirForWrite
2022-02-11 06:43:55 +00:00
}
2023-08-17 06:47:43 +00:00
func (option *Option) getUniqueCacheDirForRead() string {
return option.uniqueCacheDirForRead
2022-02-11 04:32:13 +00:00
}