mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
rename file
This commit is contained in:
parent
31922b2bf2
commit
e31fdbc89b
40
weed/util/limited_pool.go
Normal file
40
weed/util/limited_pool.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package util
|
||||
|
||||
// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
|
||||
|
||||
// LimitedConcurrentExecutor object
|
||||
type LimitedConcurrentExecutor struct {
|
||||
limit int
|
||||
tokenChan chan int
|
||||
}
|
||||
|
||||
func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
|
||||
|
||||
// allocate a limiter instance
|
||||
c := &LimitedConcurrentExecutor{
|
||||
limit: limit,
|
||||
tokenChan: make(chan int, limit),
|
||||
}
|
||||
|
||||
// allocate the tokenChan:
|
||||
for i := 0; i < c.limit; i++ {
|
||||
c.tokenChan <- i
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Execute adds a function to the execution queue.
|
||||
// if num of go routines allocated by this instance is < limit
|
||||
// launch a new go routine to execute job
|
||||
// else wait until a go routine becomes available
|
||||
func (c *LimitedConcurrentExecutor) Execute(job func()) {
|
||||
token := <-c.tokenChan
|
||||
go func() {
|
||||
defer func() {
|
||||
c.tokenChan <- token
|
||||
}()
|
||||
// run the job
|
||||
job()
|
||||
}()
|
||||
}
|
|
@ -1,114 +0,0 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
|
||||
|
||||
// LimitedConcurrentExecutor object
|
||||
type LimitedConcurrentExecutor struct {
|
||||
limit int
|
||||
tokenChan chan int
|
||||
}
|
||||
|
||||
func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
|
||||
|
||||
// allocate a limiter instance
|
||||
c := &LimitedConcurrentExecutor{
|
||||
limit: limit,
|
||||
tokenChan: make(chan int, limit),
|
||||
}
|
||||
|
||||
// allocate the tokenChan:
|
||||
for i := 0; i < c.limit; i++ {
|
||||
c.tokenChan <- i
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Execute adds a function to the execution queue.
|
||||
// if num of go routines allocated by this instance is < limit
|
||||
// launch a new go routine to execute job
|
||||
// else wait until a go routine becomes available
|
||||
func (c *LimitedConcurrentExecutor) Execute(job func()) {
|
||||
token := <-c.tokenChan
|
||||
go func() {
|
||||
defer func() {
|
||||
c.tokenChan <- token
|
||||
}()
|
||||
// run the job
|
||||
job()
|
||||
}()
|
||||
}
|
||||
|
||||
// a different implementation, but somehow more "conservative"
|
||||
type OperationRequest func()
|
||||
|
||||
type LimitedOutOfOrderProcessor struct {
|
||||
processorLimit int32
|
||||
processorLimitCond *sync.Cond
|
||||
processorSlots uint32
|
||||
processors []chan OperationRequest
|
||||
currentProcessor int32
|
||||
}
|
||||
|
||||
func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
|
||||
|
||||
processorSlots := uint32(32)
|
||||
c = &LimitedOutOfOrderProcessor{
|
||||
processorSlots: processorSlots,
|
||||
processors: make([]chan OperationRequest, processorSlots),
|
||||
processorLimit: limit,
|
||||
processorLimitCond: sync.NewCond(new(sync.Mutex)),
|
||||
}
|
||||
|
||||
for i := 0; i < int(processorSlots); i++ {
|
||||
c.processors[i] = make(chan OperationRequest)
|
||||
}
|
||||
|
||||
cases := make([]reflect.SelectCase, processorSlots)
|
||||
for i, ch := range c.processors {
|
||||
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
_, value, ok := reflect.Select(cases)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
request := value.Interface().(OperationRequest)
|
||||
|
||||
if c.processorLimit > 0 {
|
||||
c.processorLimitCond.L.Lock()
|
||||
for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
|
||||
c.processorLimitCond.Wait()
|
||||
}
|
||||
atomic.AddInt32(&c.currentProcessor, 1)
|
||||
c.processorLimitCond.L.Unlock()
|
||||
}
|
||||
|
||||
go func() {
|
||||
if c.processorLimit > 0 {
|
||||
defer atomic.AddInt32(&c.currentProcessor, -1)
|
||||
defer c.processorLimitCond.Signal()
|
||||
}
|
||||
request()
|
||||
}()
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
|
||||
index := rand.Uint32() % c.processorSlots
|
||||
c.processors[index] <- request
|
||||
}
|
Loading…
Reference in a new issue