mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
fixed file handle by file full path
This commit is contained in:
parent
299312c805
commit
6816661b0f
|
@ -122,14 +122,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
|
||||||
file := dir.newFile(req.Name, nil)
|
file := dir.newFile(req.Name, nil)
|
||||||
dir.NodeMap[req.Name] = file
|
dir.NodeMap[req.Name] = file
|
||||||
file.isOpen = true
|
file.isOpen = true
|
||||||
return file, &FileHandle{
|
return file, dir.wfs.AcquireHandle(file, req.Uid, req.Gid), nil
|
||||||
f: file,
|
|
||||||
dirtyPages: newDirtyPages(file),
|
|
||||||
RequestId: req.Header.ID,
|
|
||||||
NodeId: req.Header.Node,
|
|
||||||
Uid: req.Uid,
|
|
||||||
Gid: req.Gid,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
|
|
@ -26,12 +26,14 @@ type File struct {
|
||||||
isOpen bool
|
isOpen bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (file *File) fullpath() string {
|
||||||
|
return filepath.Join(file.dir.Path, file.Name)
|
||||||
|
}
|
||||||
|
|
||||||
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
|
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
|
||||||
|
|
||||||
fullPath := filepath.Join(file.dir.Path, file.Name)
|
|
||||||
|
|
||||||
if file.attributes == nil || !file.isOpen {
|
if file.attributes == nil || !file.isOpen {
|
||||||
item := file.wfs.listDirectoryEntriesCache.Get(fullPath)
|
item := file.wfs.listDirectoryEntriesCache.Get(file.fullpath())
|
||||||
if item != nil {
|
if item != nil {
|
||||||
entry := item.Value().(*filer_pb.Entry)
|
entry := item.Value().(*filer_pb.Entry)
|
||||||
file.Chunks = entry.Chunks
|
file.Chunks = entry.Chunks
|
||||||
|
@ -54,7 +56,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
|
||||||
file.attributes = resp.Attributes
|
file.attributes = resp.Attributes
|
||||||
file.Chunks = resp.Chunks
|
file.Chunks = resp.Chunks
|
||||||
|
|
||||||
glog.V(1).Infof("file attr %v %+v: %d", fullPath, file.attributes, filer2.TotalSize(file.Chunks))
|
glog.V(1).Infof("file attr %v %+v: %d", file.fullpath(), file.attributes, filer2.TotalSize(file.Chunks))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -77,30 +79,26 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
|
||||||
|
|
||||||
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
|
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
|
||||||
|
|
||||||
fullPath := filepath.Join(file.dir.Path, file.Name)
|
glog.V(3).Infof("%v file open %+v", file.fullpath(), req)
|
||||||
|
|
||||||
glog.V(3).Infof("%v file open %+v", fullPath, req)
|
|
||||||
|
|
||||||
file.isOpen = true
|
file.isOpen = true
|
||||||
|
|
||||||
return &FileHandle{
|
handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)
|
||||||
f: file,
|
|
||||||
dirtyPages: newDirtyPages(file),
|
resp.Handle = fuse.HandleID(handle.handle)
|
||||||
RequestId: req.Header.ID,
|
|
||||||
NodeId: req.Header.Node,
|
glog.V(3).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
|
||||||
Uid: req.Uid,
|
|
||||||
Gid: req.Gid,
|
return handle, nil
|
||||||
}, nil
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
|
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
|
||||||
fullPath := filepath.Join(file.dir.Path, file.Name)
|
|
||||||
|
|
||||||
glog.V(3).Infof("%v file setattr %+v", fullPath, req)
|
glog.V(3).Infof("%v file setattr %+v, fh=%d", file.fullpath(), req, req.Handle)
|
||||||
if req.Valid.Size() {
|
if req.Valid.Size() {
|
||||||
|
|
||||||
glog.V(3).Infof("%v file setattr set size=%v", fullPath, req.Size)
|
glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size)
|
||||||
if req.Size == 0 {
|
if req.Size == 0 {
|
||||||
// fmt.Printf("truncate %v \n", fullPath)
|
// fmt.Printf("truncate %v \n", fullPath)
|
||||||
file.Chunks = nil
|
file.Chunks = nil
|
||||||
|
|
|
@ -19,6 +19,8 @@ type FileHandle struct {
|
||||||
dirtyPages *ContinuousDirtyPages
|
dirtyPages *ContinuousDirtyPages
|
||||||
dirtyMetadata bool
|
dirtyMetadata bool
|
||||||
|
|
||||||
|
handle uint64
|
||||||
|
|
||||||
f *File
|
f *File
|
||||||
RequestId fuse.RequestID // unique ID for request
|
RequestId fuse.RequestID // unique ID for request
|
||||||
NodeId fuse.NodeID // file or directory the request is about
|
NodeId fuse.NodeID // file or directory the request is about
|
||||||
|
@ -26,6 +28,15 @@ type FileHandle struct {
|
||||||
Gid uint32 // group ID of process making request
|
Gid uint32 // group ID of process making request
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
|
||||||
|
return &FileHandle{
|
||||||
|
f: file,
|
||||||
|
dirtyPages: newDirtyPages(file),
|
||||||
|
Uid: uid,
|
||||||
|
Gid: gid,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var _ = fs.Handle(&FileHandle{})
|
var _ = fs.Handle(&FileHandle{})
|
||||||
|
|
||||||
// var _ = fs.HandleReadAller(&FileHandle{})
|
// var _ = fs.HandleReadAller(&FileHandle{})
|
||||||
|
@ -36,8 +47,9 @@ var _ = fs.HandleReleaser(&FileHandle{})
|
||||||
|
|
||||||
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
||||||
|
|
||||||
glog.V(4).Infof("%v/%v read fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(req.Size))
|
glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))
|
||||||
|
|
||||||
|
// this value should come from the filer instead of the old f
|
||||||
if len(fh.f.Chunks) == 0 {
|
if len(fh.f.Chunks) == 0 {
|
||||||
glog.V(0).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name)
|
glog.V(0).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name)
|
||||||
return fmt.Errorf("empty file %v/%v", fh.f.dir.Path, fh.f.Name)
|
return fmt.Errorf("empty file %v/%v", fh.f.dir.Path, fh.f.Name)
|
||||||
|
@ -123,7 +135,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
|
||||||
|
|
||||||
// write the request to volume servers
|
// write the request to volume servers
|
||||||
|
|
||||||
glog.V(4).Infof("%+v/%v write fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)))
|
glog.V(4).Infof("%+v/%v write fh %d: [%d,%d)", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)))
|
||||||
|
|
||||||
chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
|
chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -148,7 +160,9 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
|
||||||
|
|
||||||
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
|
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
|
||||||
|
|
||||||
glog.V(4).Infof("%+v/%v release fh", fh.f.dir.Path, fh.f.Name)
|
glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle)
|
||||||
|
|
||||||
|
fh.f.wfs.ReleaseHandle(fuse.HandleID(fh.handle))
|
||||||
|
|
||||||
fh.f.isOpen = false
|
fh.f.isOpen = false
|
||||||
|
|
||||||
|
@ -160,7 +174,7 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
|
||||||
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||||
// fflush works at fh level
|
// fflush works at fh level
|
||||||
// send the data to the OS
|
// send the data to the OS
|
||||||
glog.V(4).Infof("%s/%s fh flush %v", fh.f.dir.Path, fh.f.Name, req)
|
glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req)
|
||||||
|
|
||||||
chunk, err := fh.dirtyPages.FlushToStorage(ctx)
|
chunk, err := fh.dirtyPages.FlushToStorage(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -6,6 +6,9 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/karlseguin/ccache"
|
"github.com/karlseguin/ccache"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"sync"
|
||||||
|
"bazil.org/fuse"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WFS struct {
|
type WFS struct {
|
||||||
|
@ -14,6 +17,11 @@ type WFS struct {
|
||||||
collection string
|
collection string
|
||||||
replication string
|
replication string
|
||||||
chunkSizeLimit int64
|
chunkSizeLimit int64
|
||||||
|
|
||||||
|
// contains all open handles
|
||||||
|
handles []*FileHandle
|
||||||
|
pathToHandleIndex map[string]int
|
||||||
|
pathToHandleLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSeaweedFileSystem(filerGrpcAddress string, collection string, replication string, chunkSizeLimitMB int) *WFS {
|
func NewSeaweedFileSystem(filerGrpcAddress string, collection string, replication string, chunkSizeLimitMB int) *WFS {
|
||||||
|
@ -23,6 +31,7 @@ func NewSeaweedFileSystem(filerGrpcAddress string, collection string, replicatio
|
||||||
collection: collection,
|
collection: collection,
|
||||||
replication: replication,
|
replication: replication,
|
||||||
chunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
|
chunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
|
||||||
|
pathToHandleIndex: make(map[string]int),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,3 +51,60 @@ func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
|
||||||
|
|
||||||
return fn(client)
|
return fn(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (handle *FileHandle) {
|
||||||
|
wfs.pathToHandleLock.Lock()
|
||||||
|
defer wfs.pathToHandleLock.Unlock()
|
||||||
|
|
||||||
|
fullpath := file.fullpath()
|
||||||
|
|
||||||
|
index, found := wfs.pathToHandleIndex[fullpath]
|
||||||
|
if found && wfs.handles[index] != nil {
|
||||||
|
glog.V(4).Infoln(fullpath, "found handle id", index)
|
||||||
|
return wfs.handles[index]
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a new handler
|
||||||
|
handle = &FileHandle{
|
||||||
|
f: file,
|
||||||
|
dirtyPages: newDirtyPages(file),
|
||||||
|
Uid: uid,
|
||||||
|
Gid: gid,
|
||||||
|
}
|
||||||
|
|
||||||
|
if found && wfs.handles[index] != nil {
|
||||||
|
glog.V(4).Infoln(fullpath, "reuse previous handle id", index)
|
||||||
|
wfs.handles[index] = handle
|
||||||
|
handle.handle = uint64(index)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, h := range wfs.handles {
|
||||||
|
if h == nil {
|
||||||
|
wfs.handles[i] = handle
|
||||||
|
handle.handle = uint64(i)
|
||||||
|
wfs.pathToHandleIndex[fullpath] = i
|
||||||
|
glog.V(4).Infoln(fullpath, "reuse handle id", handle.handle)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wfs.handles = append(wfs.handles, handle)
|
||||||
|
handle.handle = uint64(len(wfs.handles) - 1)
|
||||||
|
println(fullpath, "new handle id", handle.handle)
|
||||||
|
wfs.pathToHandleIndex[fullpath] = int(handle.handle)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wfs *WFS) ReleaseHandle(handleId fuse.HandleID) {
|
||||||
|
wfs.pathToHandleLock.Lock()
|
||||||
|
defer wfs.pathToHandleLock.Unlock()
|
||||||
|
|
||||||
|
glog.V(4).Infoln("releasing handle id", handleId, "current handles lengh", len(wfs.handles))
|
||||||
|
if int(handleId) < len(wfs.handles) {
|
||||||
|
wfs.handles[int(handleId)] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue