From 1444e9d2758ab1f6adeb16a656c1c1cea4292145 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 6 Mar 2021 14:24:01 -0800 Subject: [PATCH] migrated multi host connection pool from godropbox package removing unneeded dependencies, which involved etcd versions. --- weed/wdclient/net2/base_connection_pool.go | 159 ++++++++ weed/wdclient/net2/connection_pool.go | 97 +++++ weed/wdclient/net2/doc.go | 6 + weed/wdclient/net2/ip.go | 177 +++++++++ weed/wdclient/net2/managed_connection.go | 185 ++++++++++ weed/wdclient/net2/port.go | 19 + weed/wdclient/resource_pool/doc.go | 5 + weed/wdclient/resource_pool/managed_handle.go | 97 +++++ .../resource_pool/multi_resource_pool.go | 200 ++++++++++ weed/wdclient/resource_pool/resource_pool.go | 96 +++++ weed/wdclient/resource_pool/semaphore.go | 154 ++++++++ .../resource_pool/simple_resource_pool.go | 343 ++++++++++++++++++ 12 files changed, 1538 insertions(+) create mode 100644 weed/wdclient/net2/base_connection_pool.go create mode 100644 weed/wdclient/net2/connection_pool.go create mode 100644 weed/wdclient/net2/doc.go create mode 100644 weed/wdclient/net2/ip.go create mode 100644 weed/wdclient/net2/managed_connection.go create mode 100644 weed/wdclient/net2/port.go create mode 100644 weed/wdclient/resource_pool/doc.go create mode 100644 weed/wdclient/resource_pool/managed_handle.go create mode 100644 weed/wdclient/resource_pool/multi_resource_pool.go create mode 100644 weed/wdclient/resource_pool/resource_pool.go create mode 100644 weed/wdclient/resource_pool/semaphore.go create mode 100644 weed/wdclient/resource_pool/simple_resource_pool.go diff --git a/weed/wdclient/net2/base_connection_pool.go b/weed/wdclient/net2/base_connection_pool.go new file mode 100644 index 000000000..5cc037d0f --- /dev/null +++ b/weed/wdclient/net2/base_connection_pool.go @@ -0,0 +1,159 @@ +package net2 + +import ( + "net" + "strings" + "time" + + rp "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool" +) + +const defaultDialTimeout = 1 * time.Second + +func defaultDialFunc(network string, address string) (net.Conn, error) { + return net.DialTimeout(network, address, defaultDialTimeout) +} + +func parseResourceLocation(resourceLocation string) ( + network string, + address string) { + + idx := strings.Index(resourceLocation, " ") + if idx >= 0 { + return resourceLocation[:idx], resourceLocation[idx+1:] + } + + return "", resourceLocation +} + +// A thin wrapper around the underlying resource pool. +type connectionPoolImpl struct { + options ConnectionOptions + + pool rp.ResourcePool +} + +// This returns a connection pool where all connections are connected +// to the same (network, address) +func newBaseConnectionPool( + options ConnectionOptions, + createPool func(rp.Options) rp.ResourcePool) ConnectionPool { + + dial := options.Dial + if dial == nil { + dial = defaultDialFunc + } + + openFunc := func(loc string) (interface{}, error) { + network, address := parseResourceLocation(loc) + return dial(network, address) + } + + closeFunc := func(handle interface{}) error { + return handle.(net.Conn).Close() + } + + poolOptions := rp.Options{ + MaxActiveHandles: options.MaxActiveConnections, + MaxIdleHandles: options.MaxIdleConnections, + MaxIdleTime: options.MaxIdleTime, + OpenMaxConcurrency: options.DialMaxConcurrency, + Open: openFunc, + Close: closeFunc, + NowFunc: options.NowFunc, + } + + return &connectionPoolImpl{ + options: options, + pool: createPool(poolOptions), + } +} + +// This returns a connection pool where all connections are connected +// to the same (network, address) +func NewSimpleConnectionPool(options ConnectionOptions) ConnectionPool { + return newBaseConnectionPool(options, rp.NewSimpleResourcePool) +} + +// This returns a connection pool that manages multiple (network, address) +// entries. The connections to each (network, address) entry acts +// independently. For example ("tcp", "localhost:11211") could act as memcache +// shard 0 and ("tcp", "localhost:11212") could act as memcache shard 1. +func NewMultiConnectionPool(options ConnectionOptions) ConnectionPool { + return newBaseConnectionPool( + options, + func(poolOptions rp.Options) rp.ResourcePool { + return rp.NewMultiResourcePool(poolOptions, nil) + }) +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) NumActive() int32 { + return p.pool.NumActive() +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) ActiveHighWaterMark() int32 { + return p.pool.ActiveHighWaterMark() +} + +// This returns the number of alive idle connections. This method is not part +// of ConnectionPool's API. It is used only for testing. +func (p *connectionPoolImpl) NumIdle() int { + return p.pool.NumIdle() +} + +// BaseConnectionPool can only register a single (network, address) entry. +// Register should be call before any Get calls. +func (p *connectionPoolImpl) Register(network string, address string) error { + return p.pool.Register(network + " " + address) +} + +// BaseConnectionPool has nothing to do on Unregister. +func (p *connectionPoolImpl) Unregister(network string, address string) error { + return nil +} + +func (p *connectionPoolImpl) ListRegistered() []NetworkAddress { + result := make([]NetworkAddress, 0, 1) + for _, location := range p.pool.ListRegistered() { + network, address := parseResourceLocation(location) + + result = append( + result, + NetworkAddress{ + Network: network, + Address: address, + }) + } + return result +} + +// This gets an active connection from the connection pool. Note that network +// and address arguments are ignored (The connections with point to the +// network/address provided by the first Register call). +func (p *connectionPoolImpl) Get( + network string, + address string) (ManagedConn, error) { + + handle, err := p.pool.Get(network + " " + address) + if err != nil { + return nil, err + } + return NewManagedConn(network, address, handle, p, p.options), nil +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) Release(conn ManagedConn) error { + return conn.ReleaseConnection() +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) Discard(conn ManagedConn) error { + return conn.DiscardConnection() +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) EnterLameDuckMode() { + p.pool.EnterLameDuckMode() +} diff --git a/weed/wdclient/net2/connection_pool.go b/weed/wdclient/net2/connection_pool.go new file mode 100644 index 000000000..5b8d4d232 --- /dev/null +++ b/weed/wdclient/net2/connection_pool.go @@ -0,0 +1,97 @@ +package net2 + +import ( + "net" + "time" +) + +type ConnectionOptions struct { + // The maximum number of connections that can be active per host at any + // given time (A non-positive value indicates the number of connections + // is unbounded). + MaxActiveConnections int32 + + // The maximum number of idle connections per host that are kept alive by + // the connection pool. + MaxIdleConnections uint32 + + // The maximum amount of time an idle connection can alive (if specified). + MaxIdleTime *time.Duration + + // This limits the number of concurrent Dial calls (there's no limit when + // DialMaxConcurrency is non-positive). + DialMaxConcurrency int + + // Dial specifies the dial function for creating network connections. + // If Dial is nil, net.DialTimeout is used, with timeout set to 1 second. + Dial func(network string, address string) (net.Conn, error) + + // This specifies the now time function. When the function is non-nil, the + // connection pool will use the specified function instead of time.Now to + // generate the current time. + NowFunc func() time.Time + + // This specifies the timeout for any Read() operation. + // Note that setting this to 0 (i.e. not setting it) will make + // read operations block indefinitely. + ReadTimeout time.Duration + + // This specifies the timeout for any Write() operation. + // Note that setting this to 0 (i.e. not setting it) will make + // write operations block indefinitely. + WriteTimeout time.Duration +} + +func (o ConnectionOptions) getCurrentTime() time.Time { + if o.NowFunc == nil { + return time.Now() + } else { + return o.NowFunc() + } +} + +// A generic interface for managed connection pool. All connection pool +// implementations must be threadsafe. +type ConnectionPool interface { + // This returns the number of active connections that are on loan. + NumActive() int32 + + // This returns the highest number of active connections for the entire + // lifetime of the pool. + ActiveHighWaterMark() int32 + + // This returns the number of idle connections that are in the pool. + NumIdle() int + + // This associates (network, address) to the connection pool; afterwhich, + // the user can get connections to (network, address). + Register(network string, address string) error + + // This dissociate (network, address) from the connection pool; + // afterwhich, the user can no longer get connections to + // (network, address). + Unregister(network string, address string) error + + // This returns the list of registered (network, address) entries. + ListRegistered() []NetworkAddress + + // This gets an active connection from the connection pool. The connection + // will remain active until one of the following is called: + // 1. conn.ReleaseConnection() + // 2. conn.DiscardConnection() + // 3. pool.Release(conn) + // 4. pool.Discard(conn) + Get(network string, address string) (ManagedConn, error) + + // This releases an active connection back to the connection pool. + Release(conn ManagedConn) error + + // This discards an active connection from the connection pool. + Discard(conn ManagedConn) error + + // Enter the connection pool into lame duck mode. The connection pool + // will no longer return connections, and all idle connections are closed + // immediately (including active connections that are released back to the + // pool afterward). + EnterLameDuckMode() +} diff --git a/weed/wdclient/net2/doc.go b/weed/wdclient/net2/doc.go new file mode 100644 index 000000000..f4d6552e4 --- /dev/null +++ b/weed/wdclient/net2/doc.go @@ -0,0 +1,6 @@ +// net2 is a collection of functions meant to supplement the capabilities +// provided by the standard "net" package. +package net2 + +// copied from https://github.com/dropbox/godropbox/tree/master/net2 +// removed other dependencies \ No newline at end of file diff --git a/weed/wdclient/net2/ip.go b/weed/wdclient/net2/ip.go new file mode 100644 index 000000000..60e46342f --- /dev/null +++ b/weed/wdclient/net2/ip.go @@ -0,0 +1,177 @@ +package net2 + +import ( + "fmt" + "log" + "net" + "os" + "strings" + "sync" +) + +var myHostname string +var myHostnameOnce sync.Once + +// Like os.Hostname but caches first successful result, making it cheap to call it +// over and over. +// It will also crash whole process if fetching Hostname fails! +func MyHostname() string { + myHostnameOnce.Do(func() { + var err error + myHostname, err = os.Hostname() + if err != nil { + log.Fatal(err) + } + }) + return myHostname +} + +var myIp4 *net.IPAddr +var myIp4Once sync.Once + +// Resolves `MyHostname()` to an Ip4 address. Caches first successful result, making it +// cheap to call it over and over. +// It will also crash whole process if resolving the IP fails! +func MyIp4() *net.IPAddr { + myIp4Once.Do(func() { + var err error + myIp4, err = net.ResolveIPAddr("ip4", MyHostname()) + if err != nil { + log.Fatal(err) + } + }) + return myIp4 +} + +var myIp6 *net.IPAddr +var myIp6Once sync.Once + +// Resolves `MyHostname()` to an Ip6 address. Caches first successful result, making it +// cheap to call it over and over. +// It will also crash whole process if resolving the IP fails! +func MyIp6() *net.IPAddr { + myIp6Once.Do(func() { + var err error + myIp6, err = net.ResolveIPAddr("ip6", MyHostname()) + if err != nil { + log.Fatal(err) + } + }) + return myIp6 +} + +// This returns the list of local ip addresses which other hosts can connect +// to (NOTE: Loopback ip is ignored). +// Also resolves Hostname to an address and adds it to the list too, so +// IPs from /etc/hosts can work too. +func GetLocalIPs() ([]*net.IP, error) { + hostname, err := os.Hostname() + if err != nil { + return nil, fmt.Errorf("Failed to lookup hostname: %v", err) + } + // Resolves IP Address from Hostname, this way overrides in /etc/hosts + // can work too for IP resolution. + ipInfo, err := net.ResolveIPAddr("ip4", hostname) + if err != nil { + return nil, fmt.Errorf("Failed to resolve ip: %v", err) + } + ips := []*net.IP{&ipInfo.IP} + + // TODO(zviad): Is rest of the code really necessary? + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil, fmt.Errorf( "Failed to get interface addresses: %v", err) + } + for _, addr := range addrs { + ipnet, ok := addr.(*net.IPNet) + if !ok { + continue + } + + if ipnet.IP.IsLoopback() { + continue + } + + ips = append(ips, &ipnet.IP) + } + return ips, nil +} + +var localhostIPNets []*net.IPNet + +func init() { + for _, mask := range []string{"127.0.0.1/8", "::1/128"} { + _, ipnet, err := net.ParseCIDR(mask) + if err != nil { + panic(err) + } + localhostIPNets = append(localhostIPNets, ipnet) + } +} + +func IsLocalhostIp(ipStr string) bool { + ip := net.ParseIP(ipStr) + if ip == nil { + return false + } + for _, ipnet := range localhostIPNets { + if ipnet.Contains(ip) { + return true + } + } + return false +} + +// Given a host string, return true if the host is an ip (v4/v6) localhost. +func IsLocalhost(host string) bool { + return IsLocalhostIp(host) || + host == "localhost" || + host == "ip6-localhost" || + host == "ipv6-localhost" +} + +// Resolves hostnames in addresses to actual IP4 addresses. Skips all invalid addresses +// and all addresses that can't be resolved. +// `addrs` are assumed to be of form: [":", ...] +// Returns an error in addition to resolved addresses if not all resolutions succeed. +func ResolveIP4s(addrs []string) ([]string, error) { + resolvedAddrs := make([]string, 0, len(addrs)) + var lastErr error + + for _, server := range addrs { + hostPort := strings.Split(server, ":") + if len(hostPort) != 2 { + lastErr = fmt.Errorf("Skipping invalid address: %s", server) + continue + } + + ip, err := net.ResolveIPAddr("ip4", hostPort[0]) + if err != nil { + lastErr = err + continue + } + resolvedAddrs = append(resolvedAddrs, ip.IP.String()+":"+hostPort[1]) + } + return resolvedAddrs, lastErr +} + +func LookupValidAddrs() (map[string]bool, error) { + hostName, err := os.Hostname() + if err != nil { + return nil, err + } + addrs, err := net.LookupHost(hostName) + if err != nil { + return nil, err + } + validAddrs := make(map[string]bool) + validAddrs[hostName] = true + for _, addr := range addrs { + validAddrs[addr] = true + } + // Special case localhost/127.0.0.1 so that this works on devVMs. It should + // have no affect in production. + validAddrs["127.0.0.1"] = true + validAddrs["localhost"] = true + return validAddrs, nil +} diff --git a/weed/wdclient/net2/managed_connection.go b/weed/wdclient/net2/managed_connection.go new file mode 100644 index 000000000..a886210d1 --- /dev/null +++ b/weed/wdclient/net2/managed_connection.go @@ -0,0 +1,185 @@ +package net2 + +import ( + "fmt" + "net" + "time" + + "errors" + "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool" +) + +// Dial's arguments. +type NetworkAddress struct { + Network string + Address string +} + +// A connection managed by a connection pool. NOTE: SetDeadline, +// SetReadDeadline and SetWriteDeadline are disabled for managed connections. +// (The deadlines are set by the connection pool). +type ManagedConn interface { + net.Conn + + // This returns the original (network, address) entry used for creating + // the connection. + Key() NetworkAddress + + // This returns the underlying net.Conn implementation. + RawConn() net.Conn + + // This returns the connection pool which owns this connection. + Owner() ConnectionPool + + // This indictes a user is done with the connection and releases the + // connection back to the connection pool. + ReleaseConnection() error + + // This indicates the connection is an invalid state, and that the + // connection should be discarded from the connection pool. + DiscardConnection() error +} + +// A physical implementation of ManagedConn +type managedConnImpl struct { + addr NetworkAddress + handle resource_pool.ManagedHandle + pool ConnectionPool + options ConnectionOptions +} + +// This creates a managed connection wrapper. +func NewManagedConn( + network string, + address string, + handle resource_pool.ManagedHandle, + pool ConnectionPool, + options ConnectionOptions) ManagedConn { + + addr := NetworkAddress{ + Network: network, + Address: address, + } + + return &managedConnImpl{ + addr: addr, + handle: handle, + pool: pool, + options: options, + } +} + +func (c *managedConnImpl) rawConn() (net.Conn, error) { + h, err := c.handle.Handle() + return h.(net.Conn), err +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) RawConn() net.Conn { + h, _ := c.handle.Handle() + return h.(net.Conn) +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) Key() NetworkAddress { + return c.addr +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) Owner() ConnectionPool { + return c.pool +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) ReleaseConnection() error { + return c.handle.Release() +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) DiscardConnection() error { + return c.handle.Discard() +} + +// See net.Conn for documentation +func (c *managedConnImpl) Read(b []byte) (n int, err error) { + conn, err := c.rawConn() + if err != nil { + return 0, err + } + + if c.options.ReadTimeout > 0 { + deadline := c.options.getCurrentTime().Add(c.options.ReadTimeout) + _ = conn.SetReadDeadline(deadline) + } + n, err = conn.Read(b) + if err != nil { + var localAddr string + if conn.LocalAddr() != nil { + localAddr = conn.LocalAddr().String() + } else { + localAddr = "(nil)" + } + + var remoteAddr string + if conn.RemoteAddr() != nil { + remoteAddr = conn.RemoteAddr().String() + } else { + remoteAddr = "(nil)" + } + err = fmt.Errorf("Read error from host: %s <-> %s: %v", localAddr, remoteAddr, err) + } + return +} + +// See net.Conn for documentation +func (c *managedConnImpl) Write(b []byte) (n int, err error) { + conn, err := c.rawConn() + if err != nil { + return 0, err + } + + if c.options.WriteTimeout > 0 { + deadline := c.options.getCurrentTime().Add(c.options.WriteTimeout) + _ = conn.SetWriteDeadline(deadline) + } + n, err = conn.Write(b) + if err != nil { + err = fmt.Errorf("Write error: %v", err) + } + return +} + +// See net.Conn for documentation +func (c *managedConnImpl) Close() error { + return c.handle.Discard() +} + +// See net.Conn for documentation +func (c *managedConnImpl) LocalAddr() net.Addr { + conn, _ := c.rawConn() + return conn.LocalAddr() +} + +// See net.Conn for documentation +func (c *managedConnImpl) RemoteAddr() net.Addr { + conn, _ := c.rawConn() + return conn.RemoteAddr() +} + +// SetDeadline is disabled for managed connection (The deadline is set by +// us, with respect to the read/write timeouts specified in ConnectionOptions). +func (c *managedConnImpl) SetDeadline(t time.Time) error { + return errors.New("Cannot set deadline for managed connection") +} + +// SetReadDeadline is disabled for managed connection (The deadline is set by +// us with respect to the read timeout specified in ConnectionOptions). +func (c *managedConnImpl) SetReadDeadline(t time.Time) error { + return errors.New("Cannot set read deadline for managed connection") +} + +// SetWriteDeadline is disabled for managed connection (The deadline is set by +// us with respect to the write timeout specified in ConnectionOptions). +func (c *managedConnImpl) SetWriteDeadline(t time.Time) error { + return errors.New("Cannot set write deadline for managed connection") +} diff --git a/weed/wdclient/net2/port.go b/weed/wdclient/net2/port.go new file mode 100644 index 000000000..f83adba28 --- /dev/null +++ b/weed/wdclient/net2/port.go @@ -0,0 +1,19 @@ +package net2 + +import ( + "net" + "strconv" +) + +// Returns the port information. +func GetPort(addr net.Addr) (int, error) { + _, lport, err := net.SplitHostPort(addr.String()) + if err != nil { + return -1, err + } + lportInt, err := strconv.Atoi(lport) + if err != nil { + return -1, err + } + return lportInt, nil +} diff --git a/weed/wdclient/resource_pool/doc.go b/weed/wdclient/resource_pool/doc.go new file mode 100644 index 000000000..c17b17c6c --- /dev/null +++ b/weed/wdclient/resource_pool/doc.go @@ -0,0 +1,5 @@ +// A generic resource pool for managing resources such as network connections. +package resource_pool + +// copied from https://github.com/dropbox/godropbox/tree/master/resource_pool +// removed other dependencies \ No newline at end of file diff --git a/weed/wdclient/resource_pool/managed_handle.go b/weed/wdclient/resource_pool/managed_handle.go new file mode 100644 index 000000000..e1d82ca7b --- /dev/null +++ b/weed/wdclient/resource_pool/managed_handle.go @@ -0,0 +1,97 @@ +package resource_pool + +import ( + "sync/atomic" + + "errors" +) + +// A resource handle managed by a resource pool. +type ManagedHandle interface { + // This returns the handle's resource location. + ResourceLocation() string + + // This returns the underlying resource handle (or error if the handle + // is no longer active). + Handle() (interface{}, error) + + // This returns the resource pool which owns this handle. + Owner() ResourcePool + + // The releases the underlying resource handle to the caller and marks the + // managed handle as inactive. The caller is responsible for cleaning up + // the released handle. This returns nil if the managed handle no longer + // owns the resource. + ReleaseUnderlyingHandle() interface{} + + // This indictes a user is done with the handle and releases the handle + // back to the resource pool. + Release() error + + // This indicates the handle is an invalid state, and that the + // connection should be discarded from the connection pool. + Discard() error +} + +// A physical implementation of ManagedHandle +type managedHandleImpl struct { + location string + handle interface{} + pool ResourcePool + isActive int32 // atomic bool + options Options +} + +// This creates a managed handle wrapper. +func NewManagedHandle( + resourceLocation string, + handle interface{}, + pool ResourcePool, + options Options) ManagedHandle { + + h := &managedHandleImpl{ + location: resourceLocation, + handle: handle, + pool: pool, + options: options, + } + atomic.StoreInt32(&h.isActive, 1) + + return h +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) ResourceLocation() string { + return c.location +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) Handle() (interface{}, error) { + if atomic.LoadInt32(&c.isActive) == 0 { + return c.handle, errors.New("Resource handle is no longer valid") + } + return c.handle, nil +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) Owner() ResourcePool { + return c.pool +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) ReleaseUnderlyingHandle() interface{} { + if atomic.CompareAndSwapInt32(&c.isActive, 1, 0) { + return c.handle + } + return nil +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) Release() error { + return c.pool.Release(c) +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) Discard() error { + return c.pool.Discard(c) +} diff --git a/weed/wdclient/resource_pool/multi_resource_pool.go b/weed/wdclient/resource_pool/multi_resource_pool.go new file mode 100644 index 000000000..9ac25526d --- /dev/null +++ b/weed/wdclient/resource_pool/multi_resource_pool.go @@ -0,0 +1,200 @@ +package resource_pool + +import ( + "fmt" + "sync" + + "errors" +) + +// A resource pool implementation that manages multiple resource location +// entries. The handles to each resource location entry acts independently. +// For example "tcp localhost:11211" could act as memcache +// shard 0 and "tcp localhost:11212" could act as memcache shard 1. +type multiResourcePool struct { + options Options + + createPool func(Options) ResourcePool + + rwMutex sync.RWMutex + isLameDuck bool // guarded by rwMutex + // NOTE: the locationPools is guarded by rwMutex, but the pool entries + // are not. + locationPools map[string]ResourcePool +} + +// This returns a MultiResourcePool, which manages multiple +// resource location entries. The handles to each resource location +// entry acts independently. +// +// When createPool is nil, NewSimpleResourcePool is used as default. +func NewMultiResourcePool( + options Options, + createPool func(Options) ResourcePool) ResourcePool { + + if createPool == nil { + createPool = NewSimpleResourcePool + } + + return &multiResourcePool{ + options: options, + createPool: createPool, + rwMutex: sync.RWMutex{}, + isLameDuck: false, + locationPools: make(map[string]ResourcePool), + } +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) NumActive() int32 { + total := int32(0) + + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + for _, pool := range p.locationPools { + total += pool.NumActive() + } + return total +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) ActiveHighWaterMark() int32 { + high := int32(0) + + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + for _, pool := range p.locationPools { + val := pool.ActiveHighWaterMark() + if val > high { + high = val + } + } + return high +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) NumIdle() int { + total := 0 + + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + for _, pool := range p.locationPools { + total += pool.NumIdle() + } + return total +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Register(resourceLocation string) error { + if resourceLocation == "" { + return errors.New("Registering invalid resource location") + } + + p.rwMutex.Lock() + defer p.rwMutex.Unlock() + + if p.isLameDuck { + return fmt.Errorf( + "Cannot register %s to lame duck resource pool", + resourceLocation) + } + + if _, inMap := p.locationPools[resourceLocation]; inMap { + return nil + } + + pool := p.createPool(p.options) + if err := pool.Register(resourceLocation); err != nil { + return err + } + + p.locationPools[resourceLocation] = pool + return nil +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Unregister(resourceLocation string) error { + p.rwMutex.Lock() + defer p.rwMutex.Unlock() + + if pool, inMap := p.locationPools[resourceLocation]; inMap { + _ = pool.Unregister("") + pool.EnterLameDuckMode() + delete(p.locationPools, resourceLocation) + } + return nil +} + +func (p *multiResourcePool) ListRegistered() []string { + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + result := make([]string, 0, len(p.locationPools)) + for key, _ := range p.locationPools { + result = append(result, key) + } + + return result +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Get( + resourceLocation string) (ManagedHandle, error) { + + pool := p.getPool(resourceLocation) + if pool == nil { + return nil, fmt.Errorf( + "%s is not registered in the resource pool", + resourceLocation) + } + return pool.Get(resourceLocation) +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Release(handle ManagedHandle) error { + pool := p.getPool(handle.ResourceLocation()) + if pool == nil { + return errors.New( + "Resource pool cannot take control of a handle owned " + + "by another resource pool") + } + + return pool.Release(handle) +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Discard(handle ManagedHandle) error { + pool := p.getPool(handle.ResourceLocation()) + if pool == nil { + return errors.New( + "Resource pool cannot take control of a handle owned " + + "by another resource pool") + } + + return pool.Discard(handle) +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) EnterLameDuckMode() { + p.rwMutex.Lock() + defer p.rwMutex.Unlock() + + p.isLameDuck = true + + for _, pool := range p.locationPools { + pool.EnterLameDuckMode() + } +} + +func (p *multiResourcePool) getPool(resourceLocation string) ResourcePool { + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + if pool, inMap := p.locationPools[resourceLocation]; inMap { + return pool + } + return nil +} diff --git a/weed/wdclient/resource_pool/resource_pool.go b/weed/wdclient/resource_pool/resource_pool.go new file mode 100644 index 000000000..26c433f50 --- /dev/null +++ b/weed/wdclient/resource_pool/resource_pool.go @@ -0,0 +1,96 @@ +package resource_pool + +import ( + "time" +) + +type Options struct { + // The maximum number of active resource handles per resource location. (A + // non-positive value indicates the number of active resource handles is + // unbounded). + MaxActiveHandles int32 + + // The maximum number of idle resource handles per resource location that + // are kept alive by the resource pool. + MaxIdleHandles uint32 + + // The maximum amount of time an idle resource handle can remain alive (if + // specified). + MaxIdleTime *time.Duration + + // This limits the number of concurrent Open calls (there's no limit when + // OpenMaxConcurrency is non-positive). + OpenMaxConcurrency int + + // This function creates a resource handle (e.g., a connection) for a + // resource location. The function must be thread-safe. + Open func(resourceLocation string) ( + handle interface{}, + err error) + + // This function destroys a resource handle and performs the necessary + // cleanup to free up resources. The function must be thread-safe. + Close func(handle interface{}) error + + // This specifies the now time function. When the function is non-nil, the + // resource pool will use the specified function instead of time.Now to + // generate the current time. + NowFunc func() time.Time +} + +func (o Options) getCurrentTime() time.Time { + if o.NowFunc == nil { + return time.Now() + } else { + return o.NowFunc() + } +} + +// A generic interface for managed resource pool. All resource pool +// implementations must be threadsafe. +type ResourcePool interface { + // This returns the number of active resource handles. + NumActive() int32 + + // This returns the highest number of actives handles for the entire + // lifetime of the pool. If the pool contains multiple sub-pools, the + // high water mark is the max of the sub-pools' high water marks. + ActiveHighWaterMark() int32 + + // This returns the number of alive idle handles. NOTE: This is only used + // for testing. + NumIdle() int + + // This associates a resource location to the resource pool; afterwhich, + // the user can get resource handles for the resource location. + Register(resourceLocation string) error + + // This dissociates a resource location from the resource pool; afterwhich, + // the user can no longer get resource handles for the resource location. + // If the given resource location corresponds to a sub-pool, the unregistered + // sub-pool will enter lame duck mode. + Unregister(resourceLocation string) error + + // This returns the list of registered resource location entries. + ListRegistered() []string + + // This gets an active resource handle from the resource pool. The + // handle will remain active until one of the following is called: + // 1. handle.Release() + // 2. handle.Discard() + // 3. pool.Release(handle) + // 4. pool.Discard(handle) + Get(key string) (ManagedHandle, error) + + // This releases an active resource handle back to the resource pool. + Release(handle ManagedHandle) error + + // This discards an active resource from the resource pool. + Discard(handle ManagedHandle) error + + // Enter the resource pool into lame duck mode. The resource pool + // will no longer return resource handles, and all idle resource handles + // are closed immediately (including active resource handles that are + // released back to the pool afterward). + EnterLameDuckMode() +} diff --git a/weed/wdclient/resource_pool/semaphore.go b/weed/wdclient/resource_pool/semaphore.go new file mode 100644 index 000000000..ff35d5bc5 --- /dev/null +++ b/weed/wdclient/resource_pool/semaphore.go @@ -0,0 +1,154 @@ +package resource_pool + +import ( + "fmt" + "sync" + "sync/atomic" + "time" +) + +type Semaphore interface { + // Increment the semaphore counter by one. + Release() + + // Decrement the semaphore counter by one, and block if counter < 0 + Acquire() + + // Decrement the semaphore counter by one, and block if counter < 0 + // Wait for up to the given duration. Returns true if did not timeout + TryAcquire(timeout time.Duration) bool +} + +// A simple counting Semaphore. +type boundedSemaphore struct { + slots chan struct{} +} + +// Create a bounded semaphore. The count parameter must be a positive number. +// NOTE: The bounded semaphore will panic if the user tries to Release +// beyond the specified count. +func NewBoundedSemaphore(count uint) Semaphore { + sem := &boundedSemaphore{ + slots: make(chan struct{}, int(count)), + } + for i := 0; i < cap(sem.slots); i++ { + sem.slots <- struct{}{} + } + return sem +} + +// Acquire returns on successful acquisition. +func (sem *boundedSemaphore) Acquire() { + <-sem.slots +} + +// TryAcquire returns true if it acquires a resource slot within the +// timeout, false otherwise. +func (sem *boundedSemaphore) TryAcquire(timeout time.Duration) bool { + if timeout > 0 { + // Wait until we get a slot or timeout expires. + tm := time.NewTimer(timeout) + defer tm.Stop() + select { + case <-sem.slots: + return true + case <-tm.C: + // Timeout expired. In very rare cases this might happen even if + // there is a slot available, e.g. GC pause after we create the timer + // and select randomly picked this one out of the two available channels. + // We should do one final immediate check below. + } + } + + // Return true if we have a slot available immediately and false otherwise. + select { + case <-sem.slots: + return true + default: + return false + } +} + +// Release the acquired semaphore. You must not release more than you +// have acquired. +func (sem *boundedSemaphore) Release() { + select { + case sem.slots <- struct{}{}: + default: + // slots is buffered. If a send blocks, it indicates a programming + // error. + panic(fmt.Errorf("too many releases for boundedSemaphore")) + } +} + +// This returns an unbound counting semaphore with the specified initial count. +// The semaphore counter can be arbitrary large (i.e., Release can be called +// unlimited amount of times). +// +// NOTE: In general, users should use bounded semaphore since it is more +// efficient than unbounded semaphore. +func NewUnboundedSemaphore(initialCount int) Semaphore { + res := &unboundedSemaphore{ + counter: int64(initialCount), + } + res.cond.L = &res.lock + return res +} + +type unboundedSemaphore struct { + lock sync.Mutex + cond sync.Cond + counter int64 +} + +func (s *unboundedSemaphore) Release() { + s.lock.Lock() + s.counter += 1 + if s.counter > 0 { + // Not broadcasting here since it's unlike we can satify all waiting + // goroutines. Instead, we will Signal again if there are left over + // quota after Acquire, in case of lost wakeups. + s.cond.Signal() + } + s.lock.Unlock() +} + +func (s *unboundedSemaphore) Acquire() { + s.lock.Lock() + for s.counter < 1 { + s.cond.Wait() + } + s.counter -= 1 + if s.counter > 0 { + s.cond.Signal() + } + s.lock.Unlock() +} + +func (s *unboundedSemaphore) TryAcquire(timeout time.Duration) bool { + done := make(chan bool, 1) + // Gate used to communicate between the threads and decide what the result + // is. If the main thread decides, we have timed out, otherwise we succeed. + decided := new(int32) + atomic.StoreInt32(decided, 0) + go func() { + s.Acquire() + if atomic.SwapInt32(decided, 1) == 0 { + // Acquire won the race + done <- true + } else { + // If we already decided the result, and this thread did not win + s.Release() + } + }() + select { + case <-done: + return true + case <-time.After(timeout): + if atomic.SwapInt32(decided, 1) == 1 { + // The other thread already decided the result + return true + } + return false + } +} diff --git a/weed/wdclient/resource_pool/simple_resource_pool.go b/weed/wdclient/resource_pool/simple_resource_pool.go new file mode 100644 index 000000000..b0c539100 --- /dev/null +++ b/weed/wdclient/resource_pool/simple_resource_pool.go @@ -0,0 +1,343 @@ +package resource_pool + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" +) + +type idleHandle struct { + handle interface{} + keepUntil *time.Time +} + +type TooManyHandles struct { + location string +} + +func (t TooManyHandles) Error() string { + return fmt.Sprintf("Too many handles to %s", t.location) +} + +type OpenHandleError struct { + location string + err error +} + +func (o OpenHandleError) Error() string { + return fmt.Sprintf("Failed to open resource handle: %s (%v)", o.location, o.err) +} + +// A resource pool implementation where all handles are associated to the +// same resource location. +type simpleResourcePool struct { + options Options + + numActive *int32 // atomic counter + + activeHighWaterMark *int32 // atomic / monotonically increasing value + + openTokens Semaphore + + mutex sync.Mutex + location string // guard by mutex + idleHandles []*idleHandle // guarded by mutex + isLameDuck bool // guarded by mutex +} + +// This returns a SimpleResourcePool, where all handles are associated to a +// single resource location. +func NewSimpleResourcePool(options Options) ResourcePool { + numActive := new(int32) + atomic.StoreInt32(numActive, 0) + + activeHighWaterMark := new(int32) + atomic.StoreInt32(activeHighWaterMark, 0) + + var tokens Semaphore + if options.OpenMaxConcurrency > 0 { + tokens = NewBoundedSemaphore(uint(options.OpenMaxConcurrency)) + } + + return &simpleResourcePool{ + location: "", + options: options, + numActive: numActive, + activeHighWaterMark: activeHighWaterMark, + openTokens: tokens, + mutex: sync.Mutex{}, + idleHandles: make([]*idleHandle, 0, 0), + isLameDuck: false, + } +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) NumActive() int32 { + return atomic.LoadInt32(p.numActive) +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) ActiveHighWaterMark() int32 { + return atomic.LoadInt32(p.activeHighWaterMark) +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) NumIdle() int { + p.mutex.Lock() + defer p.mutex.Unlock() + return len(p.idleHandles) +} + +// SimpleResourcePool can only register a single (network, address) entry. +// Register should be call before any Get calls. +func (p *simpleResourcePool) Register(resourceLocation string) error { + if resourceLocation == "" { + return errors.New("Invalid resource location") + } + + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.isLameDuck { + return fmt.Errorf( + "cannot register %s to lame duck resource pool", + resourceLocation) + } + + if p.location == "" { + p.location = resourceLocation + return nil + } + return errors.New("SimpleResourcePool can only register one location") +} + +// SimpleResourcePool will enter lame duck mode upon calling Unregister. +func (p *simpleResourcePool) Unregister(resourceLocation string) error { + p.EnterLameDuckMode() + return nil +} + +func (p *simpleResourcePool) ListRegistered() []string { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.location != "" { + return []string{p.location} + } + return []string{} +} + +func (p *simpleResourcePool) getLocation() (string, error) { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.location == "" { + return "", fmt.Errorf( + "resource location is not set for SimpleResourcePool") + } + + if p.isLameDuck { + return "", fmt.Errorf( + "lame duck resource pool cannot return handles to %s", + p.location) + } + + return p.location, nil +} + +// This gets an active resource from the resource pool. Note that the +// resourceLocation argument is ignored (The handles are associated to the +// resource location provided by the first Register call). +func (p *simpleResourcePool) Get(unused string) (ManagedHandle, error) { + activeCount := atomic.AddInt32(p.numActive, 1) + if p.options.MaxActiveHandles > 0 && + activeCount > p.options.MaxActiveHandles { + + atomic.AddInt32(p.numActive, -1) + return nil, TooManyHandles{p.location} + } + + highest := atomic.LoadInt32(p.activeHighWaterMark) + for activeCount > highest && + !atomic.CompareAndSwapInt32( + p.activeHighWaterMark, + highest, + activeCount) { + + highest = atomic.LoadInt32(p.activeHighWaterMark) + } + + if h := p.getIdleHandle(); h != nil { + return h, nil + } + + location, err := p.getLocation() + if err != nil { + atomic.AddInt32(p.numActive, -1) + return nil, err + } + + if p.openTokens != nil { + // Current implementation does not wait for tokens to become available. + // If that causes availability hits, we could increase the wait, + // similar to simple_pool.go. + if p.openTokens.TryAcquire(0) { + defer p.openTokens.Release() + } else { + // We could not immediately acquire a token. + // Instead of waiting + atomic.AddInt32(p.numActive, -1) + return nil, OpenHandleError{ + p.location, errors.New("Open Error: reached OpenMaxConcurrency")} + } + } + + handle, err := p.options.Open(location) + if err != nil { + atomic.AddInt32(p.numActive, -1) + return nil, OpenHandleError{p.location, err} + } + + return NewManagedHandle(p.location, handle, p, p.options), nil +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) Release(handle ManagedHandle) error { + if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p { + return errors.New( + "Resource pool cannot take control of a handle owned " + + "by another resource pool") + } + + h := handle.ReleaseUnderlyingHandle() + if h != nil { + // We can unref either before or after queuing the idle handle. + // The advantage of unref-ing before queuing is that there is + // a higher chance of successful Get when number of active handles + // is close to the limit (but potentially more handle creation). + // The advantage of queuing before unref-ing is that there's a + // higher chance of reusing handle (but potentially more Get failures). + atomic.AddInt32(p.numActive, -1) + p.queueIdleHandles(h) + } + + return nil +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) Discard(handle ManagedHandle) error { + if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p { + return errors.New( + "Resource pool cannot take control of a handle owned " + + "by another resource pool") + } + + h := handle.ReleaseUnderlyingHandle() + if h != nil { + atomic.AddInt32(p.numActive, -1) + if err := p.options.Close(h); err != nil { + return fmt.Errorf("failed to close resource handle: %v", err) + } + } + return nil +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) EnterLameDuckMode() { + p.mutex.Lock() + + toClose := p.idleHandles + p.isLameDuck = true + p.idleHandles = []*idleHandle{} + + p.mutex.Unlock() + + p.closeHandles(toClose) +} + +// This returns an idle resource, if there is one. +func (p *simpleResourcePool) getIdleHandle() ManagedHandle { + var toClose []*idleHandle + defer func() { + // NOTE: Must keep the closure around to late bind the toClose slice. + p.closeHandles(toClose) + }() + + now := p.options.getCurrentTime() + + p.mutex.Lock() + defer p.mutex.Unlock() + + var i int + for i = 0; i < len(p.idleHandles); i++ { + idle := p.idleHandles[i] + if idle.keepUntil == nil || now.Before(*idle.keepUntil) { + break + } + } + if i > 0 { + toClose = p.idleHandles[0:i] + } + + if i < len(p.idleHandles) { + idle := p.idleHandles[i] + p.idleHandles = p.idleHandles[i+1:] + return NewManagedHandle(p.location, idle.handle, p, p.options) + } + + if len(p.idleHandles) > 0 { + p.idleHandles = []*idleHandle{} + } + return nil +} + +// This adds an idle resource to the pool. +func (p *simpleResourcePool) queueIdleHandles(handle interface{}) { + var toClose []*idleHandle + defer func() { + // NOTE: Must keep the closure around to late bind the toClose slice. + p.closeHandles(toClose) + }() + + now := p.options.getCurrentTime() + var keepUntil *time.Time + if p.options.MaxIdleTime != nil { + // NOTE: Assign to temp variable first to work around compiler bug + x := now.Add(*p.options.MaxIdleTime) + keepUntil = &x + } + + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.isLameDuck { + toClose = []*idleHandle{ + {handle: handle}, + } + return + } + + p.idleHandles = append( + p.idleHandles, + &idleHandle{ + handle: handle, + keepUntil: keepUntil, + }) + + nIdleHandles := uint32(len(p.idleHandles)) + if nIdleHandles > p.options.MaxIdleHandles { + handlesToClose := nIdleHandles - p.options.MaxIdleHandles + toClose = p.idleHandles[0:handlesToClose] + p.idleHandles = p.idleHandles[handlesToClose:nIdleHandles] + } +} + +// Closes resources, at this point it is assumed that this resources +// are no longer referenced from the main idleHandles slice. +func (p *simpleResourcePool) closeHandles(handles []*idleHandle) { + for _, handle := range handles { + _ = p.options.Close(handle.handle) + } +}