seaweedfs/weed/util/limiter.go

115 lines
2.5 KiB
Go
Raw Permalink Normal View History

2020-10-21 09:16:21 +00:00
package util
2021-04-02 08:10:24 +00:00
import (
"math/rand"
"reflect"
"sync"
"sync/atomic"
)
2020-10-21 09:16:21 +00:00
// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
// LimitedConcurrentExecutor object
type LimitedConcurrentExecutor struct {
limit int
tokenChan chan int
}
func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
// allocate a limiter instance
c := &LimitedConcurrentExecutor{
limit: limit,
tokenChan: make(chan int, limit),
}
// allocate the tokenChan:
for i := 0; i < c.limit; i++ {
c.tokenChan <- i
}
return c
}
// Execute adds a function to the execution queue.
// if num of go routines allocated by this instance is < limit
// launch a new go routine to execute job
// else wait until a go routine becomes available
func (c *LimitedConcurrentExecutor) Execute(job func()) {
token := <-c.tokenChan
go func() {
defer func() {
c.tokenChan <- token
}()
// run the job
job()
}()
}
// a different implementation, but somehow more "conservative"
2021-04-02 08:10:24 +00:00
type OperationRequest func()
type LimitedOutOfOrderProcessor struct {
processorSlots uint32
processors []chan OperationRequest
processorLimit int32
processorLimitCond *sync.Cond
currentProcessor int32
2020-10-21 09:16:21 +00:00
}
2021-04-02 08:10:24 +00:00
func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
2020-10-21 09:16:21 +00:00
2021-04-02 08:10:24 +00:00
processorSlots := uint32(32)
c = &LimitedOutOfOrderProcessor{
processorSlots: processorSlots,
processors: make([]chan OperationRequest, processorSlots),
processorLimit: limit,
processorLimitCond: sync.NewCond(new(sync.Mutex)),
2020-10-21 09:16:21 +00:00
}
2021-04-02 08:10:24 +00:00
for i := 0; i < int(processorSlots); i++ {
c.processors[i] = make(chan OperationRequest)
2020-10-21 09:16:21 +00:00
}
2021-04-02 08:10:24 +00:00
cases := make([]reflect.SelectCase, processorSlots)
for i, ch := range c.processors {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
2020-10-21 09:16:21 +00:00
go func() {
2021-04-02 08:10:24 +00:00
for {
_, value, ok := reflect.Select(cases)
if !ok {
continue
}
request := value.Interface().(OperationRequest)
2021-04-05 01:38:30 +00:00
if c.processorLimit > 0 {
c.processorLimitCond.L.Lock()
for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
c.processorLimitCond.Wait()
}
atomic.AddInt32(&c.currentProcessor, 1)
c.processorLimitCond.L.Unlock()
2021-04-02 08:10:24 +00:00
}
go func() {
2021-04-05 01:38:30 +00:00
if c.processorLimit > 0 {
defer atomic.AddInt32(&c.currentProcessor, -1)
defer c.processorLimitCond.Signal()
}
2021-04-02 08:10:24 +00:00
request()
}()
}
2020-10-21 09:16:21 +00:00
}()
2021-04-02 08:10:24 +00:00
return c
}
func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
index := rand.Uint32() % c.processorSlots
c.processors[index] <- request
2020-10-21 09:16:21 +00:00
}