From 047446d5ca3f23370d085ef6e9a89924760cf658 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 15:50:11 -0800 Subject: [PATCH] remove extra async execution --- weed/filesys/dirty_pages_mem_chunk.go | 46 ++++++++------------------- 1 file changed, 14 insertions(+), 32 deletions(-) diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index fc5669ec0..9313c4562 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -46,8 +46,7 @@ func (pages *MemoryChunkPages) FlushData() error { if !pages.hasWrites { return nil } - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() + pages.uploadPipeline.FlushAll() if pages.lastErr != nil { return fmt.Errorf("flush data: %v", pages.lastErr) } @@ -65,41 +64,24 @@ func (pages *MemoryChunkPages) GetStorageOptions() (collection, replication stri return pages.collection, pages.replication } -func (pages *MemoryChunkPages) saveChunkedFileToStorage() { - - pages.uploadPipeline.FlushAll() - -} - func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - defer cleanupFn() - - chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk}) - pages.fh.entryViewCache = nil - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size) - pages.chunkAddLock.Unlock() + defer cleanupFn() + chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset) + if err != nil { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err) + pages.lastErr = err + return } - - if pages.fh.f.wfs.concurrentWriters != nil { - pages.fh.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } + chunk.Mtime = mtime + pages.collection, pages.replication = collection, replication + pages.chunkAddLock.Lock() + pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.entryViewCache = nil + glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size) + pages.chunkAddLock.Unlock() }