seaweedfs/weed/util/limited_async_pool.go

67 lines
1.4 KiB
Go
Raw Permalink Normal View History

2022-09-25 20:45:55 +00:00
package util
// initial version comes from https://hackernoon.com/asyncawait-in-golang-an-introductory-guide-ol1e34sg
2022-09-26 00:42:23 +00:00
import (
"container/list"
"context"
"sync"
)
2022-09-25 20:45:55 +00:00
type Future interface {
Await() interface{}
}
type future struct {
await func(ctx context.Context) interface{}
}
func (f future) Await() interface{} {
return f.await(context.Background())
}
type LimitedAsyncExecutor struct {
2022-09-26 00:42:23 +00:00
executor *LimitedConcurrentExecutor
futureList *list.List
futureListCond *sync.Cond
2022-09-25 20:45:55 +00:00
}
func NewLimitedAsyncExecutor(limit int) *LimitedAsyncExecutor {
return &LimitedAsyncExecutor{
2022-09-26 00:42:23 +00:00
executor: NewLimitedConcurrentExecutor(limit),
futureList: list.New(),
futureListCond: sync.NewCond(&sync.Mutex{}),
2022-09-25 20:45:55 +00:00
}
}
2022-09-26 00:42:23 +00:00
func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) {
2022-09-25 20:45:55 +00:00
var result interface{}
c := make(chan struct{})
ae.executor.Execute(func() {
defer close(c)
result = job()
})
2022-09-26 00:42:23 +00:00
f := future{await: func(ctx context.Context) interface{} {
2022-09-25 20:45:55 +00:00
select {
case <-ctx.Done():
return ctx.Err()
case <-c:
return result
}
}}
2022-09-26 00:42:23 +00:00
ae.futureListCond.L.Lock()
ae.futureList.PushBack(f)
ae.futureListCond.Signal()
ae.futureListCond.L.Unlock()
}
func (ae *LimitedAsyncExecutor) NextFuture() Future {
ae.futureListCond.L.Lock()
for ae.futureList.Len() == 0 {
ae.futureListCond.Wait()
}
f := ae.futureList.Remove(ae.futureList.Front())
ae.futureListCond.L.Unlock()
return f.(Future)
2022-09-25 20:45:55 +00:00
}