diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index e6548d7be..f8f3c7984 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -31,8 +31,10 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { fh: fh, } + swapFileDir := fh.f.wfs.option.getTempFilePageDir() + dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(), - fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) + fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, swapFileDir) return dirtyPages } diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go deleted file mode 100644 index 12f5b5159..000000000 --- a/weed/filesys/dirty_pages_temp_file.go +++ /dev/null @@ -1,112 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "os" - "sync" - "time" -) - -type TempFileDirtyPages struct { - f *File - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string - chunkedFile *page_writer.ChunkedFileWriter -} - -func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages { - - tempFile := &TempFileDirtyPages{ - f: file, - chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize), - } - - return tempFile -} - -func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { - - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - - glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) - if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil { - pages.lastErr = err - } - - return -} - -func (pages *TempFileDirtyPages) FlushData() error { - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - pages.chunkedFile.Reset() - return nil -} - -func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.chunkedFile.ReadDataAt(data, startOffset) -} - -func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { - - pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) { - reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) - pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size()) - }) - -} - -func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } - -} - -func (pages TempFileDirtyPages) Destroy() { - pages.chunkedFile.Reset() -} - -func (pages *TempFileDirtyPages) LockForRead(startOffset, stopOffset int64) { -} - -func (pages *TempFileDirtyPages) UnlockForRead(startOffset, stopOffset int64) { -} diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go deleted file mode 100644 index 4dad46c69..000000000 --- a/weed/filesys/page_writer/chunked_file_writer.go +++ /dev/null @@ -1,159 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "io" - "os" - "sync" -) - -type ActualChunkIndex int - -// ChunkedFileWriter assumes the write requests will come in within chunks -type ChunkedFileWriter struct { - dir string - file *os.File - logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex - chunkUsages []*ChunkWrittenIntervalList - ChunkSize int64 - sync.Mutex -} - -var _ = io.WriterAt(&ChunkedFileWriter{}) - -func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter { - return &ChunkedFileWriter{ - dir: dir, - file: nil, - logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), - ChunkSize: chunkSize, - } -} - -func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) { - cw.Lock() - defer cw.Unlock() - - if cw.file == nil { - cw.file, err = os.CreateTemp(cw.dir, "") - if err != nil { - glog.Errorf("create temp file: %v", err) - return - } - } - - actualOffset, chunkUsage := cw.toActualWriteOffset(off) - n, err = cw.file.WriteAt(p, actualOffset) - if err == nil { - startOffset := off % cw.ChunkSize - chunkUsage.MarkWritten(startOffset, startOffset+int64(n)) - } - return -} - -func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { - cw.Lock() - defer cw.Unlock() - - if cw.file == nil { - return - } - - logicChunkIndex := off / cw.ChunkSize - actualChunkIndex, chunkUsage := cw.toActualReadOffset(off) - if chunkUsage != nil { - for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset) - if logicStart < logicStop { - actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize - _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart) - if err != nil { - glog.Errorf("reading temp file: %v", err) - break - } - maxStop = max(maxStop, logicStop) - } - } - } - return -} - -func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) { - logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) - offsetRemainder := logicOffset % cw.ChunkSize - existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if found { - return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] - } - cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages)) - chunkUsage = newChunkWrittenIntervalList() - cw.chunkUsages = append(cw.chunkUsages, chunkUsage) - return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage -} - -func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) { - logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) - existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if found { - return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] - } - return 0, nil -} - -func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) { - for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { - chunkUsage := cw.chunkUsages[actualChunkIndex] - for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - process(cw.file, logicChunkIndex, t) - } - } -} - -// Reset releases used resources -func (cw *ChunkedFileWriter) Reset() { - if cw.file != nil { - cw.file.Close() - os.Remove(cw.file.Name()) - cw.file = nil - } - cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex) - cw.chunkUsages = cw.chunkUsages[:0] -} - -type FileIntervalReader struct { - f *os.File - startOffset int64 - stopOffset int64 - position int64 -} - -var _ = io.Reader(&FileIntervalReader{}) - -func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader { - actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if !found { - // this should never happen - return nil - } - return &FileIntervalReader{ - f: cw.file, - startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset, - stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset, - position: 0, - } -} - -func (fr *FileIntervalReader) Read(p []byte) (n int, err error) { - readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position)) - n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position) - if err == nil || err == io.EOF { - fr.position += int64(n) - if fr.stopOffset-fr.startOffset-fr.position == 0 { - // return a tiny bit faster - err = io.EOF - return - } - } - return -} diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go deleted file mode 100644 index 244ed62c3..000000000 --- a/weed/filesys/page_writer/chunked_file_writer_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "os" - "testing" -) - -func TestChunkedFileWriter_toActualOffset(t *testing.T) { - cw := NewChunkedFileWriter("", 16) - - writeToFile(cw, 50, 60) - writeToFile(cw, 60, 64) - - writeToFile(cw, 32, 40) - writeToFile(cw, 42, 48) - - writeToFile(cw, 48, 50) - - assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered") - assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals") - -} - -func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { - - _, chunkUsage := cw.toActualWriteOffset(startOffset) - - // skip doing actual writing - - innerOffset := startOffset % cw.ChunkSize - chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset) - -} - -func TestWriteChunkedFile(t *testing.T) { - x := NewChunkedFileWriter(os.TempDir(), 20) - defer x.Reset() - y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() - - batchSize := 4 - buf := make([]byte, batchSize) - for i := 0; i < 256; i++ { - for x := 0; x < batchSize; x++ { - buf[x] = byte(i) - } - x.WriteAt(buf, int64(i*batchSize)) - y.WriteAt(buf, int64((255-i)*batchSize)) - } - - a := make([]byte, 1) - b := make([]byte, 1) - for i := 0; i < 256*batchSize; i++ { - x.ReadDataAt(a, int64(i)) - y.ReadDataAt(b, int64(256*batchSize-1-i)) - assert.Equal(t, a[0], b[0], "same read") - } - -} diff --git a/weed/filesys/page_writer/page_chunk_file.go b/weed/filesys/page_writer/page_chunk_file.go new file mode 100644 index 000000000..0a26abbd1 --- /dev/null +++ b/weed/filesys/page_writer/page_chunk_file.go @@ -0,0 +1,116 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "os" +) + +var ( + _ = PageChunk(&TempFileChunk{}) +) + +type ActualChunkIndex int + +type SwapFile struct { + dir string + file *os.File + logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex + chunkSize int64 +} + +type TempFileChunk struct { + swapfile *SwapFile + usage *ChunkWrittenIntervalList + logicChunkIndex LogicChunkIndex + actualChunkIndex ActualChunkIndex +} + +func NewSwapFile(dir string, chunkSize int64) *SwapFile { + return &SwapFile{ + dir: dir, + file: nil, + logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), + chunkSize: chunkSize, + } +} +func (sf *SwapFile) FreeResource() { + if sf.file != nil { + sf.file.Close() + os.Remove(sf.file.Name()) + } +} + +func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *TempFileChunk) { + if sf.file == nil { + var err error + sf.file, err = os.CreateTemp(sf.dir, "") + if err != nil { + glog.Errorf("create swap file: %v", err) + return nil + } + } + actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex] + if !found { + actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) + sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex + } + + return &TempFileChunk{ + swapfile: sf, + usage: newChunkWrittenIntervalList(), + logicChunkIndex: logicChunkIndex, + actualChunkIndex: actualChunkIndex, + } +} + +func (tc *TempFileChunk) FreeResource() { +} + +func (tc *TempFileChunk) WriteDataAt(src []byte, offset int64) (n int) { + innerOffset := offset % tc.swapfile.chunkSize + var err error + n, err = tc.swapfile.file.WriteAt(src, int64(tc.actualChunkIndex)*tc.swapfile.chunkSize+innerOffset) + if err == nil { + tc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) + } else { + glog.Errorf("failed to write swap file %s: %v", tc.swapfile.file.Name(), err) + } + return +} + +func (tc *TempFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { + chunkStartOffset := int64(tc.logicChunkIndex) * tc.swapfile.chunkSize + for t := tc.usage.head.next; t != tc.usage.tail; t = t.next { + logicStart := max(off, chunkStartOffset+t.StartOffset) + logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) + if logicStart < logicStop { + actualStart := logicStart - chunkStartOffset + int64(tc.actualChunkIndex)*tc.swapfile.chunkSize + if _, err := tc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + glog.Errorf("failed to reading swap file %s: %v", tc.swapfile.file.Name(), err) + break + } + maxStop = max(maxStop, logicStop) + } + } + return +} + +func (tc *TempFileChunk) IsComplete() bool { + return tc.usage.IsComplete(tc.swapfile.chunkSize) +} + +func (tc *TempFileChunk) SaveContent(saveFn SaveToStorageFunc) { + if saveFn == nil { + return + } + for t := tc.usage.head.next; t != tc.usage.tail; t = t.next { + data := mem.Allocate(int(t.Size())) + tc.swapfile.file.ReadAt(data, t.StartOffset+int64(tc.actualChunkIndex)*tc.swapfile.chunkSize) + reader := util.NewBytesReader(data) + saveFn(reader, int64(tc.logicChunkIndex)*tc.swapfile.chunkSize+t.StartOffset, t.Size(), func() { + }) + mem.Free(data) + } +} diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 40722924f..ee85cf6c8 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -24,6 +24,7 @@ type UploadPipeline struct { saveToStorageFn SaveToStorageFunc activeReadChunks map[LogicChunkIndex]int activeReadChunksLock sync.Mutex + swapFile *SwapFile } type SealedChunk struct { @@ -39,7 +40,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { } } -func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { +func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, swapFileDir string) *UploadPipeline { return &UploadPipeline{ ChunkSize: chunkSize, writableChunks: make(map[LogicChunkIndex]PageChunk), @@ -49,177 +50,185 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx saveToStorageFn: saveToStorageFn, filepath: filepath, activeReadChunks: make(map[LogicChunkIndex]int), + swapFile: NewSwapFile(swapFileDir, chunkSize), } } -func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { - cw.writableChunksLock.Lock() - defer cw.writableChunksLock.Unlock() +func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) + logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) - memChunk, found := cw.writableChunks[logicChunkIndex] + memChunk, found := up.writableChunks[logicChunkIndex] if !found { - memChunk = NewMemChunk(logicChunkIndex, cw.ChunkSize) - cw.writableChunks[logicChunkIndex] = memChunk + if len(up.writableChunks) < 0 { + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } else { + memChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) + if memChunk == nil { + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } + } + up.writableChunks[logicChunkIndex] = memChunk } n = memChunk.WriteDataAt(p, off) - cw.maybeMoveToSealed(memChunk, logicChunkIndex) + up.maybeMoveToSealed(memChunk, logicChunkIndex) return } -func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) +func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { + logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) // read from sealed chunks first - cw.sealedChunksLock.Lock() - sealedChunk, found := cw.sealedChunks[logicChunkIndex] + up.sealedChunksLock.Lock() + sealedChunk, found := up.sealedChunks[logicChunkIndex] if found { sealedChunk.referenceCounter++ } - cw.sealedChunksLock.Unlock() + up.sealedChunksLock.Unlock() if found { maxStop = sealedChunk.chunk.ReadDataAt(p, off) - glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop) - sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) + glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) + sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) } // read from writable chunks last - cw.writableChunksLock.Lock() - defer cw.writableChunksLock.Unlock() - writableChunk, found := cw.writableChunks[logicChunkIndex] + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() + writableChunk, found := up.writableChunks[logicChunkIndex] if !found { return } writableMaxStop := writableChunk.ReadDataAt(p, off) - glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop) + glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) maxStop = max(maxStop, writableMaxStop) return } -func (cw *UploadPipeline) FlushAll() { - cw.writableChunksLock.Lock() - defer cw.writableChunksLock.Unlock() +func (up *UploadPipeline) FlushAll() { + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() - for logicChunkIndex, memChunk := range cw.writableChunks { - cw.moveToSealed(memChunk, logicChunkIndex) + for logicChunkIndex, memChunk := range up.writableChunks { + up.moveToSealed(memChunk, logicChunkIndex) } - cw.waitForCurrentWritersToComplete() + up.waitForCurrentWritersToComplete() } -func (cw *UploadPipeline) LockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) - if stopOffset%cw.ChunkSize > 0 { +func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) + if stopOffset%up.ChunkSize > 0 { stopLogicChunkIndex += 1 } - cw.activeReadChunksLock.Lock() - defer cw.activeReadChunksLock.Unlock() + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := cw.activeReadChunks[i]; found { - cw.activeReadChunks[i] = count + 1 + if count, found := up.activeReadChunks[i]; found { + up.activeReadChunks[i] = count + 1 } else { - cw.activeReadChunks[i] = 1 + up.activeReadChunks[i] = 1 } } } -func (cw *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) - if stopOffset%cw.ChunkSize > 0 { +func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) + if stopOffset%up.ChunkSize > 0 { stopLogicChunkIndex += 1 } - cw.activeReadChunksLock.Lock() - defer cw.activeReadChunksLock.Unlock() + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := cw.activeReadChunks[i]; found { + if count, found := up.activeReadChunks[i]; found { if count == 1 { - delete(cw.activeReadChunks, i) + delete(up.activeReadChunks, i) } else { - cw.activeReadChunks[i] = count - 1 + up.activeReadChunks[i] = count - 1 } } } } -func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { - cw.activeReadChunksLock.Lock() - defer cw.activeReadChunksLock.Unlock() - if count, found := cw.activeReadChunks[logicChunkIndex]; found { +func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() + if count, found := up.activeReadChunks[logicChunkIndex]; found { return count > 0 } return false } -func (cw *UploadPipeline) waitForCurrentWritersToComplete() { - cw.uploaderCountCond.L.Lock() +func (up *UploadPipeline) waitForCurrentWritersToComplete() { + up.uploaderCountCond.L.Lock() t := int32(100) for { - t = atomic.LoadInt32(&cw.uploaderCount) + t = atomic.LoadInt32(&up.uploaderCount) if t <= 0 { break } - cw.uploaderCountCond.Wait() + up.uploaderCountCond.Wait() } - cw.uploaderCountCond.L.Unlock() + up.uploaderCountCond.L.Unlock() } -func (cw *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { +func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { if memChunk.IsComplete() { - cw.moveToSealed(memChunk, logicChunkIndex) + up.moveToSealed(memChunk, logicChunkIndex) } } -func (cw *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { - atomic.AddInt32(&cw.uploaderCount, 1) - glog.V(4).Infof("%s uploaderCount %d ++> %d", cw.filepath, cw.uploaderCount-1, cw.uploaderCount) +func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { + atomic.AddInt32(&up.uploaderCount, 1) + glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) - cw.sealedChunksLock.Lock() + up.sealedChunksLock.Lock() - if oldMemChunk, found := cw.sealedChunks[logicChunkIndex]; found { - oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", cw.filepath, logicChunkIndex)) + if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { + oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) } sealedChunk := &SealedChunk{ chunk: memChunk, referenceCounter: 1, // default 1 is for uploading process } - cw.sealedChunks[logicChunkIndex] = sealedChunk - delete(cw.writableChunks, logicChunkIndex) + up.sealedChunks[logicChunkIndex] = sealedChunk + delete(up.writableChunks, logicChunkIndex) - cw.sealedChunksLock.Unlock() + up.sealedChunksLock.Unlock() - cw.uploaders.Execute(func() { + up.uploaders.Execute(func() { // first add to the file chunks - sealedChunk.chunk.SaveContent(cw.saveToStorageFn) + sealedChunk.chunk.SaveContent(up.saveToStorageFn) // notify waiting process - atomic.AddInt32(&cw.uploaderCount, -1) - glog.V(4).Infof("%s uploaderCount %d --> %d", cw.filepath, cw.uploaderCount+1, cw.uploaderCount) + atomic.AddInt32(&up.uploaderCount, -1) + glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount) // Lock and Unlock are not required, // but it may signal multiple times during one wakeup, // and the waiting goroutine may miss some of them! - cw.uploaderCountCond.L.Lock() - cw.uploaderCountCond.Broadcast() - cw.uploaderCountCond.L.Unlock() + up.uploaderCountCond.L.Lock() + up.uploaderCountCond.Broadcast() + up.uploaderCountCond.L.Unlock() // wait for readers - for cw.IsLocked(logicChunkIndex) { + for up.IsLocked(logicChunkIndex) { time.Sleep(59 * time.Millisecond) } // then remove from sealed chunks - cw.sealedChunksLock.Lock() - defer cw.sealedChunksLock.Unlock() - delete(cw.sealedChunks, logicChunkIndex) - sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) + up.sealedChunksLock.Lock() + defer up.sealedChunksLock.Unlock() + delete(up.sealedChunks, logicChunkIndex) + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) }) } -func (p2 *UploadPipeline) Shutdown() { - +func (up *UploadPipeline) Shutdown() { + up.swapFile.FreeResource() } diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go index d17948251..5ecb677e8 100644 --- a/weed/filesys/page_writer/upload_pipeline_test.go +++ b/weed/filesys/page_writer/upload_pipeline_test.go @@ -7,7 +7,7 @@ import ( func TestUploadPipeline(t *testing.T) { - uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil) + uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil, "") writeRange(uploadPipeline, 0, 131072) writeRange(uploadPipeline, 131072, 262144)