diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index 98b3929ac..7e8dc0b05 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -11,7 +11,7 @@ import ( ) type MemoryChunkPages struct { - f *File + fh *FileHandle writeWaitGroup sync.WaitGroup chunkAddLock sync.Mutex lastErr error @@ -21,14 +21,14 @@ type MemoryChunkPages struct { hasWrites bool } -func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages { +func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { dirtyPages := &MemoryChunkPages{ - f: file, + fh: fh, } dirtyPages.uploadPipeline = page_writer.NewUploadPipeline( - file.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) + fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) return dirtyPages } @@ -36,7 +36,7 @@ func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages { func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) { pages.hasWrites = true - glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.f.fullpath(), offset, offset+int64(len(data))) pages.uploadPipeline.SaveDataAt(data, offset) return @@ -79,23 +79,24 @@ func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, defer pages.writeWaitGroup.Done() defer cleanupFn() - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) + chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset) if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err) pages.lastErr = err return } chunk.Mtime = mtime pages.collection, pages.replication = collection, replication pages.chunkAddLock.Lock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) + pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.entryViewCache = nil + glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size) pages.chunkAddLock.Unlock() } - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) + if pages.fh.f.wfs.concurrentWriters != nil { + pages.fh.f.wfs.concurrentWriters.Execute(writer) } else { go writer() } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index ef8f62938..8545be9b6 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -36,12 +36,12 @@ type FileHandle struct { func newFileHandle(file *File, uid, gid uint32) *FileHandle { fh := &FileHandle{ - f: file, - // dirtyPages: newContinuousDirtyPages(file, writeOnly), - dirtyPages: newPageWriter(file, file.wfs.option.ChunkSizeLimit), - Uid: uid, - Gid: gid, + f: file, + Uid: uid, + Gid: gid, } + // dirtyPages: newContinuousDirtyPages(file, writeOnly), + fh.dirtyPages = newPageWriter(fh, file.wfs.option.ChunkSizeLimit) entry := fh.f.getEntry() if entry != nil { entry.Attributes.FileSize = filer.FileSize(entry) @@ -149,6 +149,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { reader := fh.reader if reader == nil { chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64) + glog.V(4).Infof("file handle read %s [%d,%d) from %+v", fileFullPath, offset, offset+int64(len(buff)), chunkViews) reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) } fh.reader = reader diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index d618a1dda..90ef7d7c4 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -6,7 +6,7 @@ import ( ) type PageWriter struct { - f *File + fh *FileHandle collection string replication string chunkSize int64 @@ -20,19 +20,19 @@ var ( _ = page_writer.DirtyPages(&PageWriter{}) ) -func newPageWriter(file *File, chunkSize int64) *PageWriter { +func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { pw := &PageWriter{ - f: file, + fh: fh, chunkSize: chunkSize, writerPattern: NewWriterPattern(chunkSize), - randomWriter: newMemoryChunkPages(file, chunkSize), + randomWriter: newMemoryChunkPages(fh, chunkSize), } return pw } func (pw *PageWriter) AddPage(offset int64, data []byte) { - glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) + glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.fh.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { @@ -64,7 +64,7 @@ func (pw *PageWriter) FlushData() error { } func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { - glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.f.fullpath(), offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ {