seaweedfs/weed/wdclient/net2/base_connection_pool.go

160 lines
4.4 KiB
Go
Raw Permalink Normal View History

package net2
import (
"net"
"strings"
"time"
rp "github.com/seaweedfs/seaweedfs/weed/wdclient/resource_pool"
)
const defaultDialTimeout = 1 * time.Second
func defaultDialFunc(network string, address string) (net.Conn, error) {
return net.DialTimeout(network, address, defaultDialTimeout)
}
func parseResourceLocation(resourceLocation string) (
network string,
address string) {
idx := strings.Index(resourceLocation, " ")
if idx >= 0 {
return resourceLocation[:idx], resourceLocation[idx+1:]
}
return "", resourceLocation
}
// A thin wrapper around the underlying resource pool.
type connectionPoolImpl struct {
options ConnectionOptions
pool rp.ResourcePool
}
// This returns a connection pool where all connections are connected
// to the same (network, address)
func newBaseConnectionPool(
options ConnectionOptions,
createPool func(rp.Options) rp.ResourcePool) ConnectionPool {
dial := options.Dial
if dial == nil {
dial = defaultDialFunc
}
openFunc := func(loc string) (interface{}, error) {
network, address := parseResourceLocation(loc)
return dial(network, address)
}
closeFunc := func(handle interface{}) error {
return handle.(net.Conn).Close()
}
poolOptions := rp.Options{
MaxActiveHandles: options.MaxActiveConnections,
MaxIdleHandles: options.MaxIdleConnections,
MaxIdleTime: options.MaxIdleTime,
OpenMaxConcurrency: options.DialMaxConcurrency,
Open: openFunc,
Close: closeFunc,
NowFunc: options.NowFunc,
}
return &connectionPoolImpl{
options: options,
pool: createPool(poolOptions),
}
}
// This returns a connection pool where all connections are connected
// to the same (network, address)
func NewSimpleConnectionPool(options ConnectionOptions) ConnectionPool {
return newBaseConnectionPool(options, rp.NewSimpleResourcePool)
}
// This returns a connection pool that manages multiple (network, address)
// entries. The connections to each (network, address) entry acts
// independently. For example ("tcp", "localhost:11211") could act as memcache
// shard 0 and ("tcp", "localhost:11212") could act as memcache shard 1.
func NewMultiConnectionPool(options ConnectionOptions) ConnectionPool {
return newBaseConnectionPool(
options,
func(poolOptions rp.Options) rp.ResourcePool {
return rp.NewMultiResourcePool(poolOptions, nil)
})
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) NumActive() int32 {
return p.pool.NumActive()
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) ActiveHighWaterMark() int32 {
return p.pool.ActiveHighWaterMark()
}
// This returns the number of alive idle connections. This method is not part
// of ConnectionPool's API. It is used only for testing.
func (p *connectionPoolImpl) NumIdle() int {
return p.pool.NumIdle()
}
// BaseConnectionPool can only register a single (network, address) entry.
// Register should be call before any Get calls.
func (p *connectionPoolImpl) Register(network string, address string) error {
return p.pool.Register(network + " " + address)
}
// BaseConnectionPool has nothing to do on Unregister.
func (p *connectionPoolImpl) Unregister(network string, address string) error {
return nil
}
func (p *connectionPoolImpl) ListRegistered() []NetworkAddress {
result := make([]NetworkAddress, 0, 1)
for _, location := range p.pool.ListRegistered() {
network, address := parseResourceLocation(location)
result = append(
result,
NetworkAddress{
Network: network,
Address: address,
})
}
return result
}
// This gets an active connection from the connection pool. Note that network
// and address arguments are ignored (The connections with point to the
// network/address provided by the first Register call).
func (p *connectionPoolImpl) Get(
network string,
address string) (ManagedConn, error) {
handle, err := p.pool.Get(network + " " + address)
if err != nil {
return nil, err
}
return NewManagedConn(network, address, handle, p, p.options), nil
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) Release(conn ManagedConn) error {
return conn.ReleaseConnection()
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) Discard(conn ManagedConn) error {
return conn.DiscardConnection()
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) EnterLameDuckMode() {
p.pool.EnterLameDuckMode()
}