diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index 7e8dc0b05..fc5669ec0 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -27,7 +27,7 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { fh: fh, } - dirtyPages.uploadPipeline = page_writer.NewUploadPipeline( + dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(), fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) return dirtyPages diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 2fb1bbf52..83e5c8b60 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -1,6 +1,7 @@ package page_writer import ( + "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/mem" @@ -18,6 +19,7 @@ type UploadPipeline struct { activeWriterCond *sync.Cond activeWriterCount int32 saveToStorageFn SaveToStorageFunc + filepath util.FullPath } type SealedChunk struct { @@ -25,14 +27,15 @@ type SealedChunk struct { referenceCounter int // track uploading or reading processes } -func (sc *SealedChunk) FreeReference() { +func (sc *SealedChunk) FreeReference(messageOnFree string) { sc.referenceCounter-- if sc.referenceCounter == 0 { + glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) mem.Free(sc.chunk.buf) } } -func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { +func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { return &UploadPipeline{ ChunkSize: chunkSize, writableChunks: make(map[LogicChunkIndex]*MemChunk), @@ -40,6 +43,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, writers: writers, activeWriterCond: sync.NewCond(&sync.Mutex{}), saveToStorageFn: saveToStorageFn, + filepath: filepath, } } @@ -77,7 +81,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { cw.sealedChunksLock.Unlock() if found { maxStop = readMemChunk(sealedChunk.chunk, p, off, logicChunkIndex, cw.ChunkSize) - sealedChunk.FreeReference() + sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) } // read from writable chunks last @@ -125,12 +129,12 @@ func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { atomic.AddInt32(&cw.activeWriterCount, 1) - glog.V(4).Infof("activeWriterCount %d ++> %d", cw.activeWriterCount-1, cw.activeWriterCount) + glog.V(4).Infof("%s activeWriterCount %d ++> %d", cw.filepath, cw.activeWriterCount-1, cw.activeWriterCount) cw.sealedChunksLock.Lock() if oldMemChunk, found := cw.sealedChunks[logicChunkIndex]; found { - oldMemChunk.FreeReference() + oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", cw.filepath, logicChunkIndex)) } sealedChunk := &SealedChunk{ chunk: memChunk, @@ -149,10 +153,10 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.sealedChunksLock.Lock() defer cw.sealedChunksLock.Unlock() delete(cw.sealedChunks, logicChunkIndex) - sealedChunk.FreeReference() + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) atomic.AddInt32(&cw.activeWriterCount, -1) - glog.V(4).Infof("activeWriterCount %d --> %d", cw.activeWriterCount+1, cw.activeWriterCount) + glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount) // Lock and Unlock are not required, // but it may signal multiple times during one wakeup, // and the waiting goroutine may miss some of them!