From 8826601be1a5fe563d955b57a51b15d917baa22b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 30 Oct 2020 21:22:20 -0700 Subject: [PATCH] mount: optional limit for the number of concurrent writers --- weed/command/mount.go | 2 ++ weed/command/mount_std.go | 1 + weed/filesys/dirty_page.go | 24 ++++++++++++++---------- weed/filesys/wfs.go | 8 ++++++++ 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/weed/command/mount.go b/weed/command/mount.go index 7fdb21254..f325cb0a5 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -14,6 +14,7 @@ type MountOptions struct { replication *string ttlSec *int chunkSizeLimitMB *int + concurrentWriters *int cacheDir *string cacheSizeMB *int64 dataCenter *string @@ -42,6 +43,7 @@ func init() { mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") + mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0") mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 20d08314c..649450e54 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -175,6 +175,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { Replication: *option.replication, TtlSec: int32(*option.ttlSec), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, + ConcurrentWriters: *option.concurrentWriters, CacheDir: *option.cacheDir, CacheSizeMB: *option.cacheSizeMB, DataCenter: *option.dataCenter, diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index dd0c48796..c1b78a220 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -9,12 +9,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - concurrentWriterLimit = runtime.NumCPU() - concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit) ) type ContinuousDirtyPages struct { @@ -33,7 +27,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { dirtyPages := &ContinuousDirtyPages{ intervals: &ContinuousIntervals{}, f: file, - chunkSaveErrChan: make(chan error, concurrentWriterLimit), + chunkSaveErrChan: make(chan error, runtime.NumCPU()), } go func() { for t := range dirtyPages.chunkSaveErrChan { @@ -100,14 +94,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { + errChanSize := pages.f.wfs.option.ConcurrentWriters + if errChanSize == 0 { + errChanSize = runtime.NumCPU() + } if pages.chunkSaveErrChanClosed { - pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit) + pages.chunkSaveErrChan = make(chan error, errChanSize) pages.chunkSaveErrChanClosed = false } mtime := time.Now().UnixNano() pages.writeWaitGroup.Add(1) - go func() { + writer := func() { defer pages.writeWaitGroup.Done() reader = io.LimitReader(reader, size) @@ -121,7 +119,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, pages.collection, pages.replication = collection, replication pages.f.addChunks([]*filer_pb.FileChunk{chunk}) glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) - }() + } + + if pages.f.wfs.concurrentWriters != nil { + pages.f.wfs.concurrentWriters.Execute(writer) + } else { + go writer() + } } func max(x, y int64) int64 { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index d31579fce..cd14e8032 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -31,6 +31,7 @@ type Option struct { Replication string TtlSec int32 ChunkSizeLimit int64 + ConcurrentWriters int CacheDir string CacheSizeMB int64 DataCenter string @@ -68,6 +69,9 @@ type WFS struct { chunkCache *chunk_cache.TieredChunkCache metaCache *meta_cache.MetaCache signature int32 + + // throttle writers + concurrentWriters *util.LimitedConcurrentExecutor } type statsCache struct { filer_pb.StatisticsResponse @@ -110,6 +114,10 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry} wfs.fsNodeCache = newFsCache(wfs.root) + if wfs.option.ConcurrentWriters > 0 { + wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) + } + return wfs }