seaweedfs/weed/util/pool.go

99 lines
1.6 KiB
Go
Raw Permalink Normal View History

package util
import (
"errors"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
)
var (
TimeoutErr = errors.New("timeout")
)
// A bufferedChan implemented by a buffered channel
type ResourcePool struct {
sync.Mutex
bufferedChan chan interface{}
poolSizeLimit int
inuse int
newFn func() (interface{}, error)
}
func NewResourcePool(poolSizeLimit int, newFn func() (interface{}, error)) *ResourcePool {
p := &ResourcePool{
poolSizeLimit: poolSizeLimit,
newFn: newFn,
bufferedChan: make(chan interface{}, poolSizeLimit),
}
return p
}
func (p *ResourcePool) Size() int {
p.Lock()
defer p.Unlock()
return len(p.bufferedChan) + p.inuse
}
func (p *ResourcePool) Free() int {
p.Lock()
defer p.Unlock()
return p.poolSizeLimit - p.inuse
}
func (p *ResourcePool) Get(timeout time.Duration) (interface{}, error) {
d, err := p.get(timeout)
if err != nil {
return nil, err
}
if d == nil && p.newFn != nil {
var err error
d, err = p.newFn()
if err != nil {
return nil, err
}
}
p.Lock()
defer p.Unlock()
p.inuse++
return d, nil
}
func (p *ResourcePool) Release(v interface{}) {
p.Lock()
defer p.Unlock()
if p.inuse == 0 {
glog.V(0).Infof("released too many times?")
return
}
p.bufferedChan <- v
p.inuse--
}
func (p *ResourcePool) get(timeout time.Duration) (interface{}, error) {
select {
case v := <-p.bufferedChan:
return v, nil
default:
}
if p.Free() > 0 {
d, err := p.newFn()
if err != nil {
return nil, err
}
return d, nil
}
// wait for an freed item
select {
case v := <-p.bufferedChan:
return v, nil
case <-time.After(timeout):
}
return nil, TimeoutErr
}