mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
remove writeOnly flag
This commit is contained in:
parent
bc96682760
commit
4fd29dad86
|
@ -161,7 +161,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0)
|
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
|
||||||
return file, fh, nil
|
return file, fh, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,6 @@ import (
|
||||||
type ContinuousDirtyPages struct {
|
type ContinuousDirtyPages struct {
|
||||||
intervals *page_writer.ContinuousIntervals
|
intervals *page_writer.ContinuousIntervals
|
||||||
f *File
|
f *File
|
||||||
writeOnly bool
|
|
||||||
writeWaitGroup sync.WaitGroup
|
writeWaitGroup sync.WaitGroup
|
||||||
chunkAddLock sync.Mutex
|
chunkAddLock sync.Mutex
|
||||||
lastErr error
|
lastErr error
|
||||||
|
@ -23,11 +22,10 @@ type ContinuousDirtyPages struct {
|
||||||
replication string
|
replication string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages {
|
func newContinuousDirtyPages(file *File) *ContinuousDirtyPages {
|
||||||
dirtyPages := &ContinuousDirtyPages{
|
dirtyPages := &ContinuousDirtyPages{
|
||||||
intervals: &page_writer.ContinuousIntervals{},
|
intervals: &page_writer.ContinuousIntervals{},
|
||||||
f: file,
|
f: file,
|
||||||
writeOnly: writeOnly,
|
|
||||||
}
|
}
|
||||||
return dirtyPages
|
return dirtyPages
|
||||||
}
|
}
|
||||||
|
@ -108,7 +106,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
|
||||||
defer pages.writeWaitGroup.Done()
|
defer pages.writeWaitGroup.Done()
|
||||||
|
|
||||||
reader = io.LimitReader(reader, size)
|
reader = io.LimitReader(reader, size)
|
||||||
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
|
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
|
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
|
||||||
pages.lastErr = err
|
pages.lastErr = err
|
||||||
|
@ -149,13 +147,3 @@ func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int6
|
||||||
func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) {
|
func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) {
|
||||||
return pages.collection, pages.replication
|
return pages.collection, pages.replication
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pages *ContinuousDirtyPages) SetWriteOnly(writeOnly bool) {
|
|
||||||
if pages.writeOnly {
|
|
||||||
pages.writeOnly = writeOnly
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pages *ContinuousDirtyPages) GetWriteOnly() (writeOnly bool) {
|
|
||||||
return pages.writeOnly
|
|
||||||
}
|
|
||||||
|
|
|
@ -15,7 +15,6 @@ type TempFileDirtyPages struct {
|
||||||
f *File
|
f *File
|
||||||
tf *os.File
|
tf *os.File
|
||||||
writtenIntervals *page_writer.WrittenContinuousIntervals
|
writtenIntervals *page_writer.WrittenContinuousIntervals
|
||||||
writeOnly bool
|
|
||||||
writeWaitGroup sync.WaitGroup
|
writeWaitGroup sync.WaitGroup
|
||||||
pageAddLock sync.Mutex
|
pageAddLock sync.Mutex
|
||||||
chunkAddLock sync.Mutex
|
chunkAddLock sync.Mutex
|
||||||
|
@ -24,11 +23,10 @@ type TempFileDirtyPages struct {
|
||||||
replication string
|
replication string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
|
func newTempFileDirtyPages(file *File) *TempFileDirtyPages {
|
||||||
|
|
||||||
tempFile := &TempFileDirtyPages{
|
tempFile := &TempFileDirtyPages{
|
||||||
f: file,
|
f: file,
|
||||||
writeOnly: writeOnly,
|
|
||||||
writtenIntervals: &page_writer.WrittenContinuousIntervals{},
|
writtenIntervals: &page_writer.WrittenContinuousIntervals{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,7 +116,7 @@ func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, s
|
||||||
defer pages.writeWaitGroup.Done()
|
defer pages.writeWaitGroup.Done()
|
||||||
|
|
||||||
reader = io.LimitReader(reader, size)
|
reader = io.LimitReader(reader, size)
|
||||||
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
|
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
|
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
|
||||||
pages.lastErr = err
|
pages.lastErr = err
|
||||||
|
@ -146,13 +144,3 @@ func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64)
|
||||||
func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
|
func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
|
||||||
return pages.collection, pages.replication
|
return pages.collection, pages.replication
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) {
|
|
||||||
if pages.writeOnly {
|
|
||||||
pages.writeOnly = writeOnly
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) {
|
|
||||||
return pages.writeOnly
|
|
||||||
}
|
|
||||||
|
|
|
@ -97,7 +97,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
|
||||||
|
|
||||||
glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
|
glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
|
||||||
|
|
||||||
handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0)
|
handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)
|
||||||
|
|
||||||
resp.Handle = fuse.HandleID(handle.handle)
|
resp.Handle = fuse.HandleID(handle.handle)
|
||||||
|
|
||||||
|
|
|
@ -37,11 +37,11 @@ type FileHandle struct {
|
||||||
isDeleted bool
|
isDeleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
|
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
|
||||||
fh := &FileHandle{
|
fh := &FileHandle{
|
||||||
f: file,
|
f: file,
|
||||||
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
|
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
|
||||||
dirtyPages: newTempFileDirtyPages(file, writeOnly),
|
dirtyPages: newTempFileDirtyPages(file),
|
||||||
Uid: uid,
|
Uid: uid,
|
||||||
Gid: gid,
|
Gid: gid,
|
||||||
}
|
}
|
||||||
|
@ -305,7 +305,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
|
||||||
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
|
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
|
||||||
|
|
||||||
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
|
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
|
||||||
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath(), fh.dirtyPages.GetWriteOnly()), chunks)
|
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
|
||||||
if manifestErr != nil {
|
if manifestErr != nil {
|
||||||
// not good, but should be ok
|
// not good, but should be ok
|
||||||
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
|
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
|
||||||
|
|
|
@ -5,6 +5,4 @@ type DirtyPages interface {
|
||||||
FlushData() error
|
FlushData() error
|
||||||
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
|
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
|
||||||
GetStorageOptions() (collection, replication string)
|
GetStorageOptions() (collection, replication string)
|
||||||
SetWriteOnly(writeOnly bool)
|
|
||||||
GetWriteOnly() (writeOnly bool)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,7 +148,7 @@ func (wfs *WFS) Root() (fs.Node, error) {
|
||||||
return wfs.root, nil
|
return wfs.root, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) {
|
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
|
||||||
|
|
||||||
fullpath := file.fullpath()
|
fullpath := file.fullpath()
|
||||||
glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
|
glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
|
||||||
|
@ -160,7 +160,6 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file
|
||||||
if found && existingHandle != nil && existingHandle.f.isOpen > 0 {
|
if found && existingHandle != nil && existingHandle.f.isOpen > 0 {
|
||||||
existingHandle.f.isOpen++
|
existingHandle.f.isOpen++
|
||||||
wfs.handlesLock.Unlock()
|
wfs.handlesLock.Unlock()
|
||||||
existingHandle.dirtyPages.SetWriteOnly(writeOnly)
|
|
||||||
glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen)
|
glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen)
|
||||||
return existingHandle
|
return existingHandle
|
||||||
}
|
}
|
||||||
|
@ -168,7 +167,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file
|
||||||
|
|
||||||
entry, _ := file.maybeLoadEntry(context.Background())
|
entry, _ := file.maybeLoadEntry(context.Background())
|
||||||
file.entry = entry
|
file.entry = entry
|
||||||
fileHandle = newFileHandle(file, uid, gid, writeOnly)
|
fileHandle = newFileHandle(file, uid, gid)
|
||||||
|
|
||||||
wfs.handlesLock.Lock()
|
wfs.handlesLock.Lock()
|
||||||
file.isOpen++
|
file.isOpen++
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.SaveDataAsChunkFunctionType {
|
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
|
||||||
|
|
||||||
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
|
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
|
||||||
var fileId, host string
|
var fileId, host string
|
||||||
|
@ -74,7 +74,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa
|
||||||
return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
|
return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !writeOnly {
|
if offset == 0 {
|
||||||
wfs.chunkCache.SetChunk(fileId, data)
|
wfs.chunkCache.SetChunk(fileId, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue