seaweedfs/weed/filesys/wfs.go

121 lines
3.3 KiB
Go
Raw Normal View History

2018-05-06 05:47:16 +00:00
package filesys
2018-05-08 08:59:43 +00:00
import (
"bazil.org/fuse/fs"
"fmt"
2018-05-10 06:18:02 +00:00
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/karlseguin/ccache"
2018-06-06 09:09:57 +00:00
"sync"
"bazil.org/fuse"
"github.com/chrislusf/seaweedfs/weed/glog"
2018-07-04 02:07:55 +00:00
"github.com/chrislusf/seaweedfs/weed/util"
"strings"
2018-05-08 08:59:43 +00:00
)
2018-05-06 05:47:16 +00:00
type WFS struct {
2018-06-06 06:37:41 +00:00
filerGrpcAddress string
filerMountRootPath string
listDirectoryEntriesCache *ccache.Cache
collection string
replication string
2018-06-12 06:13:33 +00:00
ttlSec int32
chunkSizeLimit int64
dataCenter string
2018-06-06 09:09:57 +00:00
// contains all open handles
handles []*FileHandle
pathToHandleIndex map[string]int
pathToHandleLock sync.Mutex
2018-05-06 05:47:16 +00:00
}
func NewSeaweedFileSystem(filerGrpcAddress string, filerMountRootPath string, collection string, replication string, ttlSec int32, chunkSizeLimitMB int, dataCenter string) *WFS {
if filerMountRootPath != "/" && strings.HasSuffix(filerMountRootPath, "/") {
filerMountRootPath = filerMountRootPath[0:len(filerMountRootPath)-1]
}
2018-05-06 05:47:16 +00:00
return &WFS{
2018-06-06 06:37:41 +00:00
filerGrpcAddress: filerGrpcAddress,
filerMountRootPath: filerMountRootPath,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)),
collection: collection,
replication: replication,
2018-06-12 06:13:33 +00:00
ttlSec: ttlSec,
chunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
dataCenter: dataCenter,
2018-06-06 09:09:57 +00:00
pathToHandleIndex: make(map[string]int),
2018-05-06 05:47:16 +00:00
}
}
func (wfs *WFS) Root() (fs.Node, error) {
return &Dir{Path: wfs.filerMountRootPath, wfs: wfs}, nil
2018-05-06 05:47:16 +00:00
}
2018-05-08 08:59:43 +00:00
2018-05-10 06:18:02 +00:00
func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
2018-05-08 08:59:43 +00:00
2018-07-04 02:07:55 +00:00
grpcConnection, err := util.GrpcDial(wfs.filerGrpcAddress)
2018-05-08 08:59:43 +00:00
if err != nil {
2018-06-06 06:37:41 +00:00
return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err)
2018-05-08 08:59:43 +00:00
}
defer grpcConnection.Close()
2018-05-10 06:18:02 +00:00
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
2018-05-08 08:59:43 +00:00
return fn(client)
}
2018-06-06 09:09:57 +00:00
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (handle *FileHandle) {
wfs.pathToHandleLock.Lock()
defer wfs.pathToHandleLock.Unlock()
fullpath := file.fullpath()
index, found := wfs.pathToHandleIndex[fullpath]
if found && wfs.handles[index] != nil {
glog.V(4).Infoln(fullpath, "found handle id", index)
return wfs.handles[index]
}
// create a new handler
handle = &FileHandle{
f: file,
dirtyPages: newDirtyPages(file),
Uid: uid,
Gid: gid,
}
if found && wfs.handles[index] != nil {
glog.V(4).Infoln(fullpath, "reuse previous handle id", index)
wfs.handles[index] = handle
handle.handle = uint64(index)
return
}
for i, h := range wfs.handles {
if h == nil {
wfs.handles[i] = handle
handle.handle = uint64(i)
wfs.pathToHandleIndex[fullpath] = i
glog.V(4).Infoln(fullpath, "reuse handle id", handle.handle)
return
}
}
wfs.handles = append(wfs.handles, handle)
handle.handle = uint64(len(wfs.handles) - 1)
2018-06-06 09:21:36 +00:00
glog.V(4).Infoln(fullpath, "new handle id", handle.handle)
2018-06-06 09:09:57 +00:00
wfs.pathToHandleIndex[fullpath] = int(handle.handle)
return
}
func (wfs *WFS) ReleaseHandle(handleId fuse.HandleID) {
wfs.pathToHandleLock.Lock()
defer wfs.pathToHandleLock.Unlock()
glog.V(4).Infoln("releasing handle id", handleId, "current handles lengh", len(wfs.handles))
if int(handleId) < len(wfs.handles) {
wfs.handles[int(handleId)] = nil
}
return
}