seaweedfs/weed/filesys/wfs.go

223 lines
5.9 KiB
Go
Raw Normal View History

2018-05-06 05:47:16 +00:00
package filesys
2018-05-08 08:59:43 +00:00
import (
2018-11-23 07:05:22 +00:00
"context"
"fmt"
2018-11-23 07:05:22 +00:00
"math"
"os"
"path"
"sync"
"time"
2020-06-28 17:14:17 +00:00
"google.golang.org/grpc"
2020-06-11 08:50:00 +00:00
2020-06-28 17:18:32 +00:00
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
2018-06-06 09:09:57 +00:00
"github.com/chrislusf/seaweedfs/weed/glog"
2018-07-22 00:39:10 +00:00
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2020-03-23 07:01:34 +00:00
"github.com/chrislusf/seaweedfs/weed/util"
2020-04-11 19:45:24 +00:00
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
2018-05-08 08:59:43 +00:00
)
2018-05-06 05:47:16 +00:00
2018-07-22 08:14:36 +00:00
type Option struct {
2020-04-12 04:12:41 +00:00
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
ChunkSizeLimit int64
CacheDir string
CacheSizeMB int64
DataCenter string
EntryCacheTtl time.Duration
Umask os.FileMode
MountUid uint32
MountGid uint32
MountMode os.FileMode
2019-05-10 22:03:31 +00:00
MountCtime time.Time
MountMtime time.Time
OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
2018-07-22 08:14:36 +00:00
}
2018-11-23 07:05:22 +00:00
var _ = fs.FS(&WFS{})
var _ = fs.FSStatfser(&WFS{})
2018-05-06 05:47:16 +00:00
type WFS struct {
2020-07-11 13:16:17 +00:00
option *Option
2018-06-06 09:09:57 +00:00
// contains all open handles, protected by handlesLock
2020-04-08 19:50:20 +00:00
handlesLock sync.Mutex
handles map[uint64]*FileHandle
bufPool sync.Pool
stats statsCache
2020-03-26 05:19:19 +00:00
root fs.Node
2020-08-10 04:56:09 +00:00
fsNodeCache *FsCache
2020-08-18 03:15:53 +00:00
chunkCache *chunk_cache.TieredChunkCache
metaCache *meta_cache.MetaCache
signature int32
}
type statsCache struct {
filer_pb.StatisticsResponse
lastChecked int64 // unix time in seconds
2018-05-06 05:47:16 +00:00
}
2018-07-22 08:14:36 +00:00
func NewSeaweedFileSystem(option *Option) *WFS {
2019-01-01 10:33:57 +00:00
wfs := &WFS{
2020-07-11 13:16:17 +00:00
option: option,
handles: make(map[uint64]*FileHandle),
2018-12-28 11:27:48 +00:00
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
signature: util.RandomInt32(),
2020-04-12 07:52:54 +00:00
}
2020-08-10 04:56:09 +00:00
cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
2020-04-12 07:52:54 +00:00
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
2020-09-27 17:41:29 +00:00
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
2018-05-06 05:47:16 +00:00
}
2020-06-28 17:18:32 +00:00
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), option.UidGidMapper)
2020-06-28 17:18:32 +00:00
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
2020-07-11 06:03:22 +00:00
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
})
2019-01-01 10:33:57 +00:00
entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
2020-08-10 04:56:09 +00:00
wfs.fsNodeCache = newFsCache(wfs.root)
2019-01-01 10:33:57 +00:00
return wfs
2018-05-06 05:47:16 +00:00
}
func (wfs *WFS) Root() (fs.Node, error) {
return wfs.root, nil
2018-05-06 05:47:16 +00:00
}
2018-05-08 08:59:43 +00:00
2018-07-22 08:14:36 +00:00
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
2018-06-06 09:09:57 +00:00
fullpath := file.fullpath()
2020-08-16 02:55:28 +00:00
glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
2018-06-06 09:09:57 +00:00
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
2018-06-06 09:09:57 +00:00
2020-04-08 19:50:20 +00:00
inodeId := file.fullpath().AsInode()
existingHandle, found := wfs.handles[inodeId]
if found && existingHandle != nil {
file.isOpen++
2020-04-08 19:50:20 +00:00
return existingHandle
}
2018-11-15 06:48:54 +00:00
fileHandle = newFileHandle(file, uid, gid)
file.maybeLoadEntry(context.Background())
file.isOpen++
2020-04-08 19:50:20 +00:00
wfs.handles[inodeId] = fileHandle
fileHandle.handle = inodeId
2018-06-06 09:09:57 +00:00
return
}
2020-03-23 07:01:34 +00:00
func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
2018-06-06 09:09:57 +00:00
2020-08-19 06:42:09 +00:00
glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
2020-04-08 19:50:20 +00:00
delete(wfs.handles, fullpath.AsInode())
2018-06-06 09:09:57 +00:00
return
}
2018-11-23 07:05:22 +00:00
// Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
2020-08-31 03:12:04 +00:00
glog.V(4).Infof("reading fs stats: %+v", req)
2018-11-23 07:05:22 +00:00
if wfs.stats.lastChecked < time.Now().Unix()-20 {
err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
}
2020-08-31 03:12:04 +00:00
glog.V(4).Infof("reading filer stats: %+v", request)
resp, err := client.Statistics(context.Background(), request)
if err != nil {
glog.V(0).Infof("reading filer stats %v: %v", request, err)
return err
}
2020-08-31 03:12:04 +00:00
glog.V(4).Infof("read filer stats: %+v", resp)
wfs.stats.TotalSize = resp.TotalSize
wfs.stats.UsedSize = resp.UsedSize
wfs.stats.FileCount = resp.FileCount
wfs.stats.lastChecked = time.Now().Unix()
return nil
})
if err != nil {
glog.V(0).Infof("filer Statistics: %v", err)
return err
}
}
totalDiskSize := wfs.stats.TotalSize
usedDiskSize := wfs.stats.UsedSize
actualFileCount := wfs.stats.FileCount
2018-11-23 07:05:22 +00:00
// Compute the total number of available blocks
resp.Blocks = totalDiskSize / blockSize
// Compute the number of used blocks
2019-01-17 01:17:19 +00:00
numBlocks := uint64(usedDiskSize / blockSize)
2018-11-23 07:05:22 +00:00
// Report the number of free and available blocks for the block size
2019-01-17 01:17:19 +00:00
resp.Bfree = resp.Blocks - numBlocks
resp.Bavail = resp.Blocks - numBlocks
2018-11-23 07:05:22 +00:00
resp.Bsize = uint32(blockSize)
// Report the total number of possible files in the file system (and those free)
resp.Files = math.MaxInt64
resp.Ffree = math.MaxInt64 - actualFileCount
// Report the maximum length of a name and the minimum fragment size
resp.Namelen = 1024
resp.Frsize = uint32(blockSize)
return nil
}
func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
2020-09-24 10:06:44 +00:00
if entry.Attributes == nil {
return
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
}
func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
2020-09-24 10:06:44 +00:00
if entry.Attributes == nil {
return
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
}