seaweedfs/weed/util/limiter.go

71 lines
1.5 KiB
Go
Raw Normal View History

2020-10-21 09:16:21 +00:00
package util
2021-04-02 08:10:24 +00:00
import (
"math/rand"
"reflect"
"sync"
"sync/atomic"
)
2020-10-21 09:16:21 +00:00
2021-04-02 08:10:24 +00:00
type OperationRequest func()
type LimitedOutOfOrderProcessor struct {
processorSlots uint32
processors []chan OperationRequest
processorLimit int32
processorLimitCond *sync.Cond
currentProcessor int32
2020-10-21 09:16:21 +00:00
}
2021-04-02 08:10:24 +00:00
func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
2020-10-21 09:16:21 +00:00
2021-04-02 08:10:24 +00:00
processorSlots := uint32(32)
c = &LimitedOutOfOrderProcessor{
processorSlots: processorSlots,
processors: make([]chan OperationRequest, processorSlots),
processorLimit: limit,
processorLimitCond: sync.NewCond(new(sync.Mutex)),
2020-10-21 09:16:21 +00:00
}
2021-04-02 08:10:24 +00:00
for i := 0; i < int(processorSlots); i++ {
c.processors[i] = make(chan OperationRequest)
2020-10-21 09:16:21 +00:00
}
2021-04-02 08:10:24 +00:00
cases := make([]reflect.SelectCase, processorSlots)
for i, ch := range c.processors {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
2020-10-21 09:16:21 +00:00
go func() {
2021-04-02 08:10:24 +00:00
for {
_, value, ok := reflect.Select(cases)
if !ok {
continue
}
request := value.Interface().(OperationRequest)
c.processorLimitCond.L.Lock()
for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
c.processorLimitCond.Wait()
}
atomic.AddInt32(&c.currentProcessor, 1)
c.processorLimitCond.L.Unlock()
go func() {
defer atomic.AddInt32(&c.currentProcessor, -1)
defer c.processorLimitCond.Signal()
request()
}()
}
2020-10-21 09:16:21 +00:00
}()
2021-04-02 08:10:24 +00:00
return c
}
func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
index := rand.Uint32() % c.processorSlots
c.processors[index] <- request
2020-10-21 09:16:21 +00:00
}