diff --git a/config/config.exs b/config/config.exs
index 6fc84efc2..577ccc198 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -647,8 +647,10 @@
prepare: :unnamed
config :pleroma, :connections_pool,
+ reclaim_multiplier: 0.1,
checkin_timeout: 250,
max_connections: 250,
+ max_idle_time: 30_000,
retry: 1,
retry_timeout: 1000,
await_up_timeout: 5_000
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 3282c6882..be14c1f9f 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -223,9 +223,7 @@ defp task_children(_) do
# start hackney and gun pools in tests
defp http_children(_, :test) do
- hackney_options = Config.get([:hackney_pools, :federation])
- hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)
- [hackney_pool, Pleroma.Pool.Supervisor]
+ http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil)
end
defp http_children(Tesla.Adapter.Hackney, _) do
@@ -244,7 +242,9 @@ defp http_children(Tesla.Adapter.Hackney, _) do
end
end
- defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor]
+ defp http_children(Tesla.Adapter.Gun, _) do
+ [{Registry, keys: :unique, name: Pleroma.Gun.ConnectionPool}]
+ end
defp http_children(_, _), do: []
end
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
index cd25a2e74..77f78c7ff 100644
--- a/lib/pleroma/gun/conn.ex
+++ b/lib/pleroma/gun/conn.ex
@@ -3,40 +3,11 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Gun.Conn do
- @moduledoc """
- Struct for gun connection data
- """
alias Pleroma.Gun
- alias Pleroma.Pool.Connections
require Logger
- @type gun_state :: :up | :down
- @type conn_state :: :active | :idle
-
- @type t :: %__MODULE__{
- conn: pid(),
- gun_state: gun_state(),
- conn_state: conn_state(),
- used_by: [pid()],
- last_reference: pos_integer(),
- crf: float(),
- retries: pos_integer()
- }
-
- defstruct conn: nil,
- gun_state: :open,
- conn_state: :init,
- used_by: [],
- last_reference: 0,
- crf: 1,
- retries: 0
-
- @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
- def open(url, name, opts \\ [])
- def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
-
- def open(%URI{} = uri, name, opts) do
+ def open(%URI{} = uri, opts) do
pool_opts = Pleroma.Config.get([:connections_pool], [])
opts =
@@ -45,30 +16,10 @@ def open(%URI{} = uri, name, opts) do
|> Map.put_new(:retry, pool_opts[:retry] || 1)
|> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000)
|> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
+ |> Map.put_new(:supervise, false)
|> maybe_add_tls_opts(uri)
- key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-
- max_connections = pool_opts[:max_connections] || 250
-
- conn_pid =
- if Connections.count(name) < max_connections do
- do_open(uri, opts)
- else
- close_least_used_and_do_open(name, uri, opts)
- end
-
- if is_pid(conn_pid) do
- conn = %Pleroma.Gun.Conn{
- conn: conn_pid,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }
-
- :ok = Gun.set_owner(conn_pid, Process.whereis(name))
- Connections.add_conn(name, key, conn)
- end
+ do_open(uri, opts)
end
defp maybe_add_tls_opts(opts, %URI{scheme: "http"}), do: opts
@@ -81,7 +32,7 @@ defp maybe_add_tls_opts(opts, %URI{scheme: "https", host: host}) do
reuse_sessions: false,
verify_fun:
{&:ssl_verify_hostname.verify_fun/3,
- [check_hostname: Pleroma.HTTP.Connection.format_host(host)]}
+ [check_hostname: Pleroma.HTTP.AdapterHelper.format_host(host)]}
]
tls_opts =
@@ -105,7 +56,7 @@ defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
stream <- Gun.connect(conn, connect_opts),
{:response, :fin, 200, _} <- Gun.await(conn, stream) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
@@ -141,7 +92,7 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
@@ -155,11 +106,11 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
end
defp do_open(%URI{host: host, port: port} = uri, opts) do
- host = Pleroma.HTTP.Connection.parse_host(host)
+ host = Pleroma.HTTP.AdapterHelper.parse_host(host)
with {:ok, conn} <- Gun.open(host, port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
@@ -171,7 +122,7 @@ defp do_open(%URI{host: host, port: port} = uri, opts) do
end
defp destination_opts(%URI{host: host, port: port}) do
- host = Pleroma.HTTP.Connection.parse_host(host)
+ host = Pleroma.HTTP.AdapterHelper.parse_host(host)
%{host: host, port: port}
end
@@ -181,17 +132,6 @@ defp add_http2_opts(opts, "https", tls_opts) do
defp add_http2_opts(opts, _, _), do: opts
- defp close_least_used_and_do_open(name, uri, opts) do
- with [{key, conn} | _conns] <- Connections.get_unused_conns(name),
- :ok <- Gun.close(conn.conn) do
- Connections.remove_conn(name, key)
-
- do_open(uri, opts)
- else
- [] -> {:error, :pool_overflowed}
- end
- end
-
def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
"#{scheme}://#{host}#{path}"
end
diff --git a/lib/pleroma/gun/connection_pool.ex b/lib/pleroma/gun/connection_pool.ex
new file mode 100644
index 000000000..e6abee69c
--- /dev/null
+++ b/lib/pleroma/gun/connection_pool.ex
@@ -0,0 +1,129 @@
+defmodule Pleroma.Gun.ConnectionPool do
+ @registry __MODULE__
+
+ def get_conn(uri, opts) do
+ case enforce_pool_limits() do
+ :ok ->
+ key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+
+ case Registry.lookup(@registry, key) do
+ # The key has already been registered, but connection is not up yet
+ [{worker_pid, {nil, _used_by, _crf, _last_reference}}] ->
+ get_gun_pid_from_worker(worker_pid)
+
+ [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
+ GenServer.cast(worker_pid, {:add_client, self(), false})
+ {:ok, gun_pid}
+
+ [] ->
+ # :gun.set_owner fails in :connected state for whatevever reason,
+ # so we open the connection in the process directly and send it's pid back
+ # We trust gun to handle timeouts by itself
+ case GenServer.start(Pleroma.Gun.ConnectionPool.Worker, [uri, key, opts, self()],
+ timeout: :infinity
+ ) do
+ {:ok, _worker_pid} ->
+ receive do
+ {:conn_pid, pid} -> {:ok, pid}
+ end
+
+ {:error, {:error, {:already_registered, worker_pid}}} ->
+ get_gun_pid_from_worker(worker_pid)
+
+ err ->
+ err
+ end
+ end
+
+ :error ->
+ {:error, :pool_full}
+ end
+ end
+
+ @enforcer_key "enforcer"
+ defp enforce_pool_limits() do
+ max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
+
+ if Registry.count(@registry) >= max_connections do
+ case Registry.lookup(@registry, @enforcer_key) do
+ [] ->
+ pid =
+ spawn(fn ->
+ {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
+
+ reclaim_max =
+ [:connections_pool, :reclaim_multiplier]
+ |> Pleroma.Config.get()
+ |> Kernel.*(max_connections)
+ |> round
+ |> max(1)
+
+ unused_conns =
+ Registry.select(
+ @registry,
+ [
+ {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
+ [{{:"$1", :"$3", :"$4"}}]}
+ ]
+ )
+
+ case unused_conns do
+ [] ->
+ exit(:pool_full)
+
+ unused_conns ->
+ unused_conns
+ |> Enum.sort(fn {_pid1, crf1, last_reference1},
+ {_pid2, crf2, last_reference2} ->
+ crf1 <= crf2 and last_reference1 <= last_reference2
+ end)
+ |> Enum.take(reclaim_max)
+ |> Enum.each(fn {pid, _, _} -> GenServer.call(pid, :idle_close) end)
+ end
+ end)
+
+ wait_for_enforcer_finish(pid)
+
+ [{pid, _}] ->
+ wait_for_enforcer_finish(pid)
+ end
+ else
+ :ok
+ end
+ end
+
+ defp wait_for_enforcer_finish(pid) do
+ ref = Process.monitor(pid)
+
+ receive do
+ {:DOWN, ^ref, :process, ^pid, :pool_full} ->
+ :error
+
+ {:DOWN, ^ref, :process, ^pid, :normal} ->
+ :ok
+ end
+ end
+
+ defp get_gun_pid_from_worker(worker_pid) do
+ # GenServer.call will block the process for timeout length if
+ # the server crashes on startup (which will happen if gun fails to connect)
+ # so instead we use cast + monitor
+
+ ref = Process.monitor(worker_pid)
+ GenServer.cast(worker_pid, {:add_client, self(), true})
+
+ receive do
+ {:conn_pid, pid} -> {:ok, pid}
+ {:DOWN, ^ref, :process, ^worker_pid, reason} -> reason
+ end
+ end
+
+ def release_conn(conn_pid) do
+ [worker_pid] =
+ Registry.select(@registry, [
+ {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
+ ])
+
+ GenServer.cast(worker_pid, {:remove_client, self()})
+ end
+end
diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex
new file mode 100644
index 000000000..ebde4bbf6
--- /dev/null
+++ b/lib/pleroma/gun/connection_pool/worker.ex
@@ -0,0 +1,95 @@
+defmodule Pleroma.Gun.ConnectionPool.Worker do
+ alias Pleroma.Gun
+ use GenServer
+
+ @registry Pleroma.Gun.ConnectionPool
+
+ @impl true
+ def init([uri, key, opts, client_pid]) do
+ time = :os.system_time(:second)
+ # Register before opening connection to prevent race conditions
+ with {:ok, _owner} <- Registry.register(@registry, key, {nil, [client_pid], 1, time}),
+ {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
+ Process.link(conn_pid) do
+ {_, _} =
+ Registry.update_value(@registry, key, fn {_, used_by, crf, last_reference} ->
+ {conn_pid, used_by, crf, last_reference}
+ end)
+
+ send(client_pid, {:conn_pid, conn_pid})
+ {:ok, %{key: key, timer: nil}, :hibernate}
+ else
+ err -> {:stop, err}
+ end
+ end
+
+ @impl true
+ def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do
+ time = :os.system_time(:second)
+
+ {{conn_pid, _, _, _}, _} =
+ Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+ {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
+ end)
+
+ if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid})
+
+ state =
+ if state.timer != nil do
+ Process.cancel_timer(state[:timer])
+ %{state | timer: nil}
+ else
+ state
+ end
+
+ {:noreply, state, :hibernate}
+ end
+
+ @impl true
+ def handle_cast({:remove_client, client_pid}, %{key: key} = state) do
+ {{_conn_pid, used_by, _crf, _last_reference}, _} =
+ Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+ {conn_pid, List.delete(used_by, client_pid), crf, last_reference}
+ end)
+
+ timer =
+ if used_by == [] do
+ max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
+ Process.send_after(self(), :idle_close, max_idle)
+ else
+ nil
+ end
+
+ {:noreply, %{state | timer: timer}, :hibernate}
+ end
+
+ @impl true
+ def handle_info(:idle_close, state) do
+ # Gun monitors the owner process, and will close the connection automatically
+ # when it's terminated
+ {:stop, :normal, state}
+ end
+
+ # Gracefully shutdown if the connection got closed without any streams left
+ @impl true
+ def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
+ {:stop, :normal, state}
+ end
+
+ # Otherwise, shutdown with an error
+ @impl true
+ def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
+ {:stop, {:error, down_message}, state}
+ end
+
+ @impl true
+ def handle_call(:idle_close, _, %{key: key} = state) do
+ Registry.unregister(@registry, key)
+ {:stop, :normal, state}
+ end
+
+ # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
+ defp crf(time_delta, prev_crf) do
+ 1 + :math.pow(0.5, time_delta / 100) * prev_crf
+ end
+end
diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex
index 510722ff9..0532ea31d 100644
--- a/lib/pleroma/http/adapter_helper.ex
+++ b/lib/pleroma/http/adapter_helper.ex
@@ -3,7 +3,21 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.AdapterHelper do
- alias Pleroma.HTTP.Connection
+ @moduledoc """
+ Configure Tesla.Client with default and customized adapter options.
+ """
+ @defaults [pool: :federation]
+
+ @type ip_address :: ipv4_address() | ipv6_address()
+ @type ipv4_address :: {0..255, 0..255, 0..255, 0..255}
+ @type ipv6_address ::
+ {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535}
+ @type proxy_type() :: :socks4 | :socks5
+ @type host() :: charlist() | ip_address()
+
+ alias Pleroma.Config
+ alias Pleroma.HTTP.AdapterHelper
+ require Logger
@type proxy ::
{Connection.host(), pos_integer()}
@@ -11,24 +25,13 @@ defmodule Pleroma.HTTP.AdapterHelper do
@callback options(keyword(), URI.t()) :: keyword()
@callback after_request(keyword()) :: :ok
-
- @spec options(keyword(), URI.t()) :: keyword()
- def options(opts, _uri) do
- proxy = Pleroma.Config.get([:http, :proxy_url], nil)
- maybe_add_proxy(opts, format_proxy(proxy))
- end
-
- @spec maybe_get_conn(URI.t(), keyword()) :: keyword()
- def maybe_get_conn(_uri, opts), do: opts
-
- @spec after_request(keyword()) :: :ok
- def after_request(_opts), do: :ok
+ @callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()}
@spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil
def format_proxy(nil), do: nil
def format_proxy(proxy_url) do
- case Connection.parse_proxy(proxy_url) do
+ case parse_proxy(proxy_url) do
{:ok, host, port} -> {host, port}
{:ok, type, host, port} -> {type, host, port}
_ -> nil
@@ -38,4 +41,106 @@ def format_proxy(proxy_url) do
@spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword()
def maybe_add_proxy(opts, nil), do: opts
def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy)
+
+ @doc """
+ Merge default connection & adapter options with received ones.
+ """
+
+ @spec options(URI.t(), keyword()) :: keyword()
+ def options(%URI{} = uri, opts \\ []) do
+ @defaults
+ |> pool_timeout()
+ |> Keyword.merge(opts)
+ |> adapter_helper().options(uri)
+ end
+
+ defp pool_timeout(opts) do
+ {config_key, default} =
+ if adapter() == Tesla.Adapter.Gun do
+ {:pools, Config.get([:pools, :default, :timeout])}
+ else
+ {:hackney_pools, 10_000}
+ end
+
+ timeout = Config.get([config_key, opts[:pool], :timeout], default)
+
+ Keyword.merge(opts, timeout: timeout)
+ end
+
+ @spec after_request(keyword()) :: :ok
+ def after_request(opts), do: adapter_helper().after_request(opts)
+
+ def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
+ defp adapter, do: Application.get_env(:tesla, :adapter)
+
+ defp adapter_helper do
+ case adapter() do
+ Tesla.Adapter.Gun -> AdapterHelper.Gun
+ Tesla.Adapter.Hackney -> AdapterHelper.Hackney
+ _ -> AdapterHelper.Default
+ end
+ end
+
+ @spec parse_proxy(String.t() | tuple() | nil) ::
+ {:ok, host(), pos_integer()}
+ | {:ok, proxy_type(), host(), pos_integer()}
+ | {:error, atom()}
+ | nil
+
+ def parse_proxy(nil), do: nil
+
+ def parse_proxy(proxy) when is_binary(proxy) do
+ with [host, port] <- String.split(proxy, ":"),
+ {port, ""} <- Integer.parse(port) do
+ {:ok, parse_host(host), port}
+ else
+ {_, _} ->
+ Logger.warn("Parsing port failed #{inspect(proxy)}")
+ {:error, :invalid_proxy_port}
+
+ :error ->
+ Logger.warn("Parsing port failed #{inspect(proxy)}")
+ {:error, :invalid_proxy_port}
+
+ _ ->
+ Logger.warn("Parsing proxy failed #{inspect(proxy)}")
+ {:error, :invalid_proxy}
+ end
+ end
+
+ def parse_proxy(proxy) when is_tuple(proxy) do
+ with {type, host, port} <- proxy do
+ {:ok, type, parse_host(host), port}
+ else
+ _ ->
+ Logger.warn("Parsing proxy failed #{inspect(proxy)}")
+ {:error, :invalid_proxy}
+ end
+ end
+
+ @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address()
+ def parse_host(host) when is_list(host), do: host
+ def parse_host(host) when is_atom(host), do: to_charlist(host)
+
+ def parse_host(host) when is_binary(host) do
+ host = to_charlist(host)
+
+ case :inet.parse_address(host) do
+ {:error, :einval} -> host
+ {:ok, ip} -> ip
+ end
+ end
+
+ @spec format_host(String.t()) :: charlist()
+ def format_host(host) do
+ host_charlist = to_charlist(host)
+
+ case :inet.parse_address(host_charlist) do
+ {:error, :einval} ->
+ :idna.encode(host_charlist)
+
+ {:ok, _ip} ->
+ host_charlist
+ end
+ end
end
diff --git a/lib/pleroma/http/adapter_helper/default.ex b/lib/pleroma/http/adapter_helper/default.ex
new file mode 100644
index 000000000..218cfacc0
--- /dev/null
+++ b/lib/pleroma/http/adapter_helper/default.ex
@@ -0,0 +1,17 @@
+defmodule Pleroma.HTTP.AdapterHelper.Default do
+ alias Pleroma.HTTP.AdapterHelper
+
+ @behaviour Pleroma.HTTP.AdapterHelper
+
+ @spec options(keyword(), URI.t()) :: keyword()
+ def options(opts, _uri) do
+ proxy = Pleroma.Config.get([:http, :proxy_url], nil)
+ AdapterHelper.maybe_add_proxy(opts, AdapterHelper.format_proxy(proxy))
+ end
+
+ @spec after_request(keyword()) :: :ok
+ def after_request(_opts), do: :ok
+
+ @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
+ def get_conn(_uri, opts), do: {:ok, opts}
+end
diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex
index ead7cdc6b..6f7cc9784 100644
--- a/lib/pleroma/http/adapter_helper/gun.ex
+++ b/lib/pleroma/http/adapter_helper/gun.ex
@@ -5,8 +5,8 @@
defmodule Pleroma.HTTP.AdapterHelper.Gun do
@behaviour Pleroma.HTTP.AdapterHelper
+ alias Pleroma.Gun.ConnectionPool
alias Pleroma.HTTP.AdapterHelper
- alias Pleroma.Pool.Connections
require Logger
@@ -31,13 +31,13 @@ def options(incoming_opts \\ [], %URI{} = uri) do
|> Keyword.merge(config_opts)
|> add_scheme_opts(uri)
|> AdapterHelper.maybe_add_proxy(proxy)
- |> maybe_get_conn(uri, incoming_opts)
+ |> Keyword.merge(incoming_opts)
end
@spec after_request(keyword()) :: :ok
def after_request(opts) do
if opts[:conn] && opts[:body_as] != :chunks do
- Connections.checkout(opts[:conn], self(), :gun_connections)
+ ConnectionPool.release_conn(opts[:conn])
end
:ok
@@ -51,27 +51,11 @@ defp add_scheme_opts(opts, %{scheme: "https"}) do
|> Keyword.put(:tls_opts, log_level: :warning)
end
- defp maybe_get_conn(adapter_opts, uri, incoming_opts) do
- {receive_conn?, opts} =
- adapter_opts
- |> Keyword.merge(incoming_opts)
- |> Keyword.pop(:receive_conn, true)
-
- if Connections.alive?(:gun_connections) and receive_conn? do
- checkin_conn(uri, opts)
- else
- opts
- end
- end
-
- defp checkin_conn(uri, opts) do
- case Connections.checkin(uri, :gun_connections) do
- nil ->
- Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
- opts
-
- conn when is_pid(conn) ->
- Keyword.merge(opts, conn: conn, close_conn: false)
+ @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
+ def get_conn(uri, opts) do
+ case ConnectionPool.get_conn(uri, opts) do
+ {:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)}
+ err -> err
end
end
end
diff --git a/lib/pleroma/http/adapter_helper/hackney.ex b/lib/pleroma/http/adapter_helper/hackney.ex
index 3972a03a9..42d552740 100644
--- a/lib/pleroma/http/adapter_helper/hackney.ex
+++ b/lib/pleroma/http/adapter_helper/hackney.ex
@@ -25,4 +25,7 @@ def options(connection_opts \\ [], %URI{} = uri) do
defp add_scheme_opts(opts, _), do: opts
def after_request(_), do: :ok
+
+ @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
+ def get_conn(_uri, opts), do: {:ok, opts}
end
diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex
deleted file mode 100644
index ebacf7902..000000000
--- a/lib/pleroma/http/connection.ex
+++ /dev/null
@@ -1,124 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.HTTP.Connection do
- @moduledoc """
- Configure Tesla.Client with default and customized adapter options.
- """
-
- alias Pleroma.Config
- alias Pleroma.HTTP.AdapterHelper
-
- require Logger
-
- @defaults [pool: :federation]
-
- @type ip_address :: ipv4_address() | ipv6_address()
- @type ipv4_address :: {0..255, 0..255, 0..255, 0..255}
- @type ipv6_address ::
- {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535}
- @type proxy_type() :: :socks4 | :socks5
- @type host() :: charlist() | ip_address()
-
- @doc """
- Merge default connection & adapter options with received ones.
- """
-
- @spec options(URI.t(), keyword()) :: keyword()
- def options(%URI{} = uri, opts \\ []) do
- @defaults
- |> pool_timeout()
- |> Keyword.merge(opts)
- |> adapter_helper().options(uri)
- end
-
- defp pool_timeout(opts) do
- {config_key, default} =
- if adapter() == Tesla.Adapter.Gun do
- {:pools, Config.get([:pools, :default, :timeout])}
- else
- {:hackney_pools, 10_000}
- end
-
- timeout = Config.get([config_key, opts[:pool], :timeout], default)
-
- Keyword.merge(opts, timeout: timeout)
- end
-
- @spec after_request(keyword()) :: :ok
- def after_request(opts), do: adapter_helper().after_request(opts)
-
- defp adapter, do: Application.get_env(:tesla, :adapter)
-
- defp adapter_helper do
- case adapter() do
- Tesla.Adapter.Gun -> AdapterHelper.Gun
- Tesla.Adapter.Hackney -> AdapterHelper.Hackney
- _ -> AdapterHelper
- end
- end
-
- @spec parse_proxy(String.t() | tuple() | nil) ::
- {:ok, host(), pos_integer()}
- | {:ok, proxy_type(), host(), pos_integer()}
- | {:error, atom()}
- | nil
-
- def parse_proxy(nil), do: nil
-
- def parse_proxy(proxy) when is_binary(proxy) do
- with [host, port] <- String.split(proxy, ":"),
- {port, ""} <- Integer.parse(port) do
- {:ok, parse_host(host), port}
- else
- {_, _} ->
- Logger.warn("Parsing port failed #{inspect(proxy)}")
- {:error, :invalid_proxy_port}
-
- :error ->
- Logger.warn("Parsing port failed #{inspect(proxy)}")
- {:error, :invalid_proxy_port}
-
- _ ->
- Logger.warn("Parsing proxy failed #{inspect(proxy)}")
- {:error, :invalid_proxy}
- end
- end
-
- def parse_proxy(proxy) when is_tuple(proxy) do
- with {type, host, port} <- proxy do
- {:ok, type, parse_host(host), port}
- else
- _ ->
- Logger.warn("Parsing proxy failed #{inspect(proxy)}")
- {:error, :invalid_proxy}
- end
- end
-
- @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address()
- def parse_host(host) when is_list(host), do: host
- def parse_host(host) when is_atom(host), do: to_charlist(host)
-
- def parse_host(host) when is_binary(host) do
- host = to_charlist(host)
-
- case :inet.parse_address(host) do
- {:error, :einval} -> host
- {:ok, ip} -> ip
- end
- end
-
- @spec format_host(String.t()) :: charlist()
- def format_host(host) do
- host_charlist = to_charlist(host)
-
- case :inet.parse_address(host_charlist) do
- {:error, :einval} ->
- :idna.encode(host_charlist)
-
- {:ok, _ip} ->
- host_charlist
- end
- end
-end
diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex
index 66ca75367..8ded76601 100644
--- a/lib/pleroma/http/http.ex
+++ b/lib/pleroma/http/http.ex
@@ -7,7 +7,7 @@ defmodule Pleroma.HTTP do
Wrapper for `Tesla.request/2`.
"""
- alias Pleroma.HTTP.Connection
+ alias Pleroma.HTTP.AdapterHelper
alias Pleroma.HTTP.Request
alias Pleroma.HTTP.RequestBuilder, as: Builder
alias Tesla.Client
@@ -60,49 +60,26 @@ def post(url, body, headers \\ [], options \\ []),
{:ok, Env.t()} | {:error, any()}
def request(method, url, body, headers, options) when is_binary(url) do
uri = URI.parse(url)
- adapter_opts = Connection.options(uri, options[:adapter] || [])
- options = put_in(options[:adapter], adapter_opts)
- params = options[:params] || []
- request = build_request(method, headers, options, url, body, params)
+ adapter_opts = AdapterHelper.options(uri, options[:adapter] || [])
- adapter = Application.get_env(:tesla, :adapter)
- client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter)
+ case AdapterHelper.get_conn(uri, adapter_opts) do
+ {:ok, adapter_opts} ->
+ options = put_in(options[:adapter], adapter_opts)
+ params = options[:params] || []
+ request = build_request(method, headers, options, url, body, params)
- pid = Process.whereis(adapter_opts[:pool])
+ adapter = Application.get_env(:tesla, :adapter)
+ client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter)
- pool_alive? =
- if adapter == Tesla.Adapter.Gun && pid do
- Process.alive?(pid)
- else
- false
- end
+ response = request(client, request)
- request_opts =
- adapter_opts
- |> Enum.into(%{})
- |> Map.put(:env, Pleroma.Config.get([:env]))
- |> Map.put(:pool_alive?, pool_alive?)
+ AdapterHelper.after_request(adapter_opts)
- response = request(client, request, request_opts)
+ response
- Connection.after_request(adapter_opts)
-
- response
- end
-
- @spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()}
- def request(%Client{} = client, request, %{env: :test}), do: request(client, request)
-
- def request(%Client{} = client, request, %{body_as: :chunks}), do: request(client, request)
-
- def request(%Client{} = client, request, %{pool_alive?: false}), do: request(client, request)
-
- def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do
- :poolboy.transaction(
- pool,
- &Pleroma.Pool.Request.execute(&1, client, request, timeout),
- timeout
- )
+ err ->
+ err
+ end
end
@spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex
deleted file mode 100644
index acafe1bea..000000000
--- a/lib/pleroma/pool/connections.ex
+++ /dev/null
@@ -1,283 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Connections do
- use GenServer
-
- alias Pleroma.Config
- alias Pleroma.Gun
-
- require Logger
-
- @type domain :: String.t()
- @type conn :: Pleroma.Gun.Conn.t()
-
- @type t :: %__MODULE__{
- conns: %{domain() => conn()},
- opts: keyword()
- }
-
- defstruct conns: %{}, opts: []
-
- @spec start_link({atom(), keyword()}) :: {:ok, pid()}
- def start_link({name, opts}) do
- GenServer.start_link(__MODULE__, opts, name: name)
- end
-
- @impl true
- def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
-
- @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
- def checkin(url, name)
- def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
-
- def checkin(%URI{} = uri, name) do
- timeout = Config.get([:connections_pool, :checkin_timeout], 250)
-
- GenServer.call(name, {:checkin, uri}, timeout)
- end
-
- @spec alive?(atom()) :: boolean()
- def alive?(name) do
- if pid = Process.whereis(name) do
- Process.alive?(pid)
- else
- false
- end
- end
-
- @spec get_state(atom()) :: t()
- def get_state(name) do
- GenServer.call(name, :state)
- end
-
- @spec count(atom()) :: pos_integer()
- def count(name) do
- GenServer.call(name, :count)
- end
-
- @spec get_unused_conns(atom()) :: [{domain(), conn()}]
- def get_unused_conns(name) do
- GenServer.call(name, :unused_conns)
- end
-
- @spec checkout(pid(), pid(), atom()) :: :ok
- def checkout(conn, pid, name) do
- GenServer.cast(name, {:checkout, conn, pid})
- end
-
- @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
- def add_conn(name, key, conn) do
- GenServer.cast(name, {:add_conn, key, conn})
- end
-
- @spec remove_conn(atom(), String.t()) :: :ok
- def remove_conn(name, key) do
- GenServer.cast(name, {:remove_conn, key})
- end
-
- @impl true
- def handle_cast({:add_conn, key, conn}, state) do
- state = put_in(state.conns[key], conn)
-
- Process.monitor(conn.conn)
- {:noreply, state}
- end
-
- @impl true
- def handle_cast({:checkout, conn_pid, pid}, state) do
- state =
- with true <- Process.alive?(conn_pid),
- {key, conn} <- find_conn(state.conns, conn_pid),
- used_by <- List.keydelete(conn.used_by, pid, 0) do
- conn_state = if used_by == [], do: :idle, else: conn.conn_state
-
- put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
- else
- false ->
- Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
- state
-
- nil ->
- Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
- state
- end
-
- {:noreply, state}
- end
-
- @impl true
- def handle_cast({:remove_conn, key}, state) do
- state = put_in(state.conns, Map.delete(state.conns, key))
- {:noreply, state}
- end
-
- @impl true
- def handle_call({:checkin, uri}, from, state) do
- key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-
- case state.conns[key] do
- %{conn: pid, gun_state: :up} = conn ->
- time = :os.system_time(:second)
- last_reference = time - conn.last_reference
- crf = crf(last_reference, 100, conn.crf)
-
- state =
- put_in(state.conns[key], %{
- conn
- | last_reference: time,
- crf: crf,
- conn_state: :active,
- used_by: [from | conn.used_by]
- })
-
- {:reply, pid, state}
-
- %{gun_state: :down} ->
- {:reply, nil, state}
-
- nil ->
- {:reply, nil, state}
- end
- end
-
- @impl true
- def handle_call(:state, _from, state), do: {:reply, state, state}
-
- @impl true
- def handle_call(:count, _from, state) do
- {:reply, Enum.count(state.conns), state}
- end
-
- @impl true
- def handle_call(:unused_conns, _from, state) do
- unused_conns =
- state.conns
- |> Enum.filter(&filter_conns/1)
- |> Enum.sort(&sort_conns/2)
-
- {:reply, unused_conns, state}
- end
-
- defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
- defp filter_conns(_), do: false
-
- defp sort_conns({_, c1}, {_, c2}) do
- c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
- end
-
- @impl true
- def handle_info({:gun_up, conn_pid, _protocol}, state) do
- %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
-
- host =
- case :inet.ntoa(host) do
- {:error, :einval} -> host
- ip -> ip
- end
-
- key = "#{scheme}:#{host}:#{port}"
-
- state =
- with {key, conn} <- find_conn(state.conns, conn_pid, key),
- {true, key} <- {Process.alive?(conn_pid), key} do
- put_in(state.conns[key], %{
- conn
- | gun_state: :up,
- conn_state: :active,
- retries: 0
- })
- else
- {false, key} ->
- put_in(
- state.conns,
- Map.delete(state.conns, key)
- )
-
- nil ->
- :ok = Gun.close(conn_pid)
-
- state
- end
-
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
- retries = Config.get([:connections_pool, :retry], 1)
- # we can't get info on this pid, because pid is dead
- state =
- with {key, conn} <- find_conn(state.conns, conn_pid),
- {true, key} <- {Process.alive?(conn_pid), key} do
- if conn.retries == retries do
- :ok = Gun.close(conn.conn)
-
- put_in(
- state.conns,
- Map.delete(state.conns, key)
- )
- else
- put_in(state.conns[key], %{
- conn
- | gun_state: :down,
- retries: conn.retries + 1
- })
- end
- else
- {false, key} ->
- put_in(
- state.conns,
- Map.delete(state.conns, key)
- )
-
- nil ->
- Logger.debug(":gun_down for conn which isn't found in state")
-
- state
- end
-
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
- Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
-
- state =
- with {key, conn} <- find_conn(state.conns, conn_pid) do
- Enum.each(conn.used_by, fn {pid, _ref} ->
- Process.exit(pid, reason)
- end)
-
- put_in(
- state.conns,
- Map.delete(state.conns, key)
- )
- else
- nil ->
- Logger.debug(":DOWN for conn which isn't found in state")
-
- state
- end
-
- {:noreply, state}
- end
-
- defp find_conn(conns, conn_pid) do
- Enum.find(conns, fn {_key, conn} ->
- conn.conn == conn_pid
- end)
- end
-
- defp find_conn(conns, conn_pid, conn_key) do
- Enum.find(conns, fn {key, conn} ->
- key == conn_key and conn.conn == conn_pid
- end)
- end
-
- def crf(current, steps, crf) do
- 1 + :math.pow(0.5, current / steps) * crf
- end
-end
diff --git a/lib/pleroma/pool/pool.ex b/lib/pleroma/pool/pool.ex
deleted file mode 100644
index 21a6fbbc5..000000000
--- a/lib/pleroma/pool/pool.ex
+++ /dev/null
@@ -1,22 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool do
- def child_spec(opts) do
- poolboy_opts =
- opts
- |> Keyword.put(:worker_module, Pleroma.Pool.Request)
- |> Keyword.put(:name, {:local, opts[:name]})
- |> Keyword.put(:size, opts[:size])
- |> Keyword.put(:max_overflow, opts[:max_overflow])
-
- %{
- id: opts[:id] || {__MODULE__, make_ref()},
- start: {:poolboy, :start_link, [poolboy_opts, [name: opts[:name]]]},
- restart: :permanent,
- shutdown: 5000,
- type: :worker
- }
- end
-end
diff --git a/lib/pleroma/pool/request.ex b/lib/pleroma/pool/request.ex
deleted file mode 100644
index 3fb930db7..000000000
--- a/lib/pleroma/pool/request.ex
+++ /dev/null
@@ -1,65 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Request do
- use GenServer
-
- require Logger
-
- def start_link(args) do
- GenServer.start_link(__MODULE__, args)
- end
-
- @impl true
- def init(_), do: {:ok, []}
-
- @spec execute(pid() | atom(), Tesla.Client.t(), keyword(), pos_integer()) ::
- {:ok, Tesla.Env.t()} | {:error, any()}
- def execute(pid, client, request, timeout) do
- GenServer.call(pid, {:execute, client, request}, timeout)
- end
-
- @impl true
- def handle_call({:execute, client, request}, _from, state) do
- response = Pleroma.HTTP.request(client, request)
-
- {:reply, response, state}
- end
-
- @impl true
- def handle_info({:gun_data, _conn, _stream, _, _}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_up, _conn, _protocol}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_down, _conn, _protocol, _reason, _killed}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_error, _conn, _stream, _error}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_push, _conn, _stream, _new_stream, _method, _uri, _headers}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_response, _conn, _stream, _, _status, _headers}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info(msg, state) do
- Logger.warn("Received unexpected message #{inspect(__MODULE__)} #{inspect(msg)}")
- {:noreply, state}
- end
-end
diff --git a/lib/pleroma/pool/supervisor.ex b/lib/pleroma/pool/supervisor.ex
deleted file mode 100644
index faf646cb2..000000000
--- a/lib/pleroma/pool/supervisor.ex
+++ /dev/null
@@ -1,42 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Supervisor do
- use Supervisor
-
- alias Pleroma.Config
- alias Pleroma.Pool
-
- def start_link(args) do
- Supervisor.start_link(__MODULE__, args, name: __MODULE__)
- end
-
- def init(_) do
- conns_child = %{
- id: Pool.Connections,
- start:
- {Pool.Connections, :start_link, [{:gun_connections, Config.get([:connections_pool])}]}
- }
-
- Supervisor.init([conns_child | pools()], strategy: :one_for_one)
- end
-
- defp pools do
- pools = Config.get(:pools)
-
- pools =
- if Config.get([Pleroma.Upload, :proxy_remote]) == false do
- Keyword.delete(pools, :upload)
- else
- pools
- end
-
- for {pool_name, pool_opts} <- pools do
- pool_opts
- |> Keyword.put(:id, {Pool, pool_name})
- |> Keyword.put(:name, pool_name)
- |> Pool.child_spec()
- end
- end
-end
diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex
index e81ea8bde..65785445d 100644
--- a/lib/pleroma/reverse_proxy/client/tesla.ex
+++ b/lib/pleroma/reverse_proxy/client/tesla.ex
@@ -48,7 +48,7 @@ def stream_body(%{pid: pid, opts: opts, fin: true}) do
# if there were redirects we need to checkout old conn
conn = opts[:old_conn] || opts[:conn]
- if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections)
+ if conn, do: :ok = Pleroma.Gun.ConnectionPool.release_conn(conn)
:done
end