From 640c53cda82da903d984b5190a1bdd414499942b Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 25 Sep 2022 17:42:23 -0700 Subject: [PATCH] add future list --- weed/util/limited_async_pool.go | 32 +++++++++++++++++++++++----- weed/util/limited_async_pool_test.go | 16 +++++++------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/weed/util/limited_async_pool.go b/weed/util/limited_async_pool.go index c78de158b..51dfd6252 100644 --- a/weed/util/limited_async_pool.go +++ b/weed/util/limited_async_pool.go @@ -2,7 +2,11 @@ package util // initial version comes from https://hackernoon.com/asyncawait-in-golang-an-introductory-guide-ol1e34sg -import "context" +import ( + "container/list" + "context" + "sync" +) type Future interface { Await() interface{} @@ -17,23 +21,27 @@ func (f future) Await() interface{} { } type LimitedAsyncExecutor struct { - executor *LimitedConcurrentExecutor + executor *LimitedConcurrentExecutor + futureList *list.List + futureListCond *sync.Cond } func NewLimitedAsyncExecutor(limit int) *LimitedAsyncExecutor { return &LimitedAsyncExecutor{ - executor: NewLimitedConcurrentExecutor(limit), + executor: NewLimitedConcurrentExecutor(limit), + futureList: list.New(), + futureListCond: sync.NewCond(&sync.Mutex{}), } } -func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future { +func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) { var result interface{} c := make(chan struct{}) ae.executor.Execute(func() { defer close(c) result = job() }) - return future{await: func(ctx context.Context) interface{} { + f := future{await: func(ctx context.Context) interface{} { select { case <-ctx.Done(): return ctx.Err() @@ -41,4 +49,18 @@ func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future { return result } }} + ae.futureListCond.L.Lock() + ae.futureList.PushBack(f) + ae.futureListCond.Signal() + ae.futureListCond.L.Unlock() +} + +func (ae *LimitedAsyncExecutor) NextFuture() Future { + ae.futureListCond.L.Lock() + for ae.futureList.Len() == 0 { + ae.futureListCond.Wait() + } + f := ae.futureList.Remove(ae.futureList.Front()) + ae.futureListCond.L.Unlock() + return f.(Future) } diff --git a/weed/util/limited_async_pool_test.go b/weed/util/limited_async_pool_test.go index 935b158da..090ce5375 100644 --- a/weed/util/limited_async_pool_test.go +++ b/weed/util/limited_async_pool_test.go @@ -10,17 +10,17 @@ import ( func TestAsyncPool(t *testing.T) { p := NewLimitedAsyncExecutor(3) - var results []Future - results = append(results, p.Execute(FirstFunc)) - results = append(results, p.Execute(SecondFunc)) - results = append(results, p.Execute(ThirdFunc)) - results = append(results, p.Execute(FourthFunc)) - results = append(results, p.Execute(FifthFunc)) + p.Execute(FirstFunc) + p.Execute(SecondFunc) + p.Execute(ThirdFunc) + p.Execute(FourthFunc) + p.Execute(FifthFunc) var sorted_results []int - for _, r := range results { - x := r.Await().(int) + for i := 0; i < 5; i++ { + f := p.NextFuture() + x := f.Await().(int) println(x) sorted_results = append(sorted_results, x) }