Merge branch 'websocket-data-fed' into 'develop'
Federate data through persistent websocket connections See merge request pleroma/pleroma!2408
This commit is contained in:
commit
875b6654ec
|
@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
|
||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- Experimental websocket-based federation between Pleroma instances.
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
- Renamed `:await_up_timeout` in `:connections_pool` namespace to `:connect_timeout`, old name is deprecated.
|
- Renamed `:await_up_timeout` in `:connections_pool` namespace to `:connect_timeout`, old name is deprecated.
|
||||||
|
|
|
@ -130,6 +130,7 @@
|
||||||
dispatch: [
|
dispatch: [
|
||||||
{:_,
|
{:_,
|
||||||
[
|
[
|
||||||
|
{"/api/fedsocket/v1", Pleroma.Web.FedSockets.IncomingHandler, []},
|
||||||
{"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []},
|
{"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []},
|
||||||
{"/websocket", Phoenix.Endpoint.CowboyWebSocket,
|
{"/websocket", Phoenix.Endpoint.CowboyWebSocket,
|
||||||
{Phoenix.Transports.WebSocket,
|
{Phoenix.Transports.WebSocket,
|
||||||
|
@ -148,6 +149,16 @@
|
||||||
"SameSite=Lax"
|
"SameSite=Lax"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
config :pleroma, :fed_sockets,
|
||||||
|
enabled: false,
|
||||||
|
connection_duration: :timer.hours(8),
|
||||||
|
rejection_duration: :timer.minutes(15),
|
||||||
|
fed_socket_fetches: [
|
||||||
|
default: 12_000,
|
||||||
|
interval: 3_000,
|
||||||
|
lazy: false
|
||||||
|
]
|
||||||
|
|
||||||
# Configures Elixir's Logger
|
# Configures Elixir's Logger
|
||||||
config :logger, :console,
|
config :logger, :console,
|
||||||
level: :debug,
|
level: :debug,
|
||||||
|
@ -532,6 +543,7 @@
|
||||||
token_expiration: 5,
|
token_expiration: 5,
|
||||||
federator_incoming: 50,
|
federator_incoming: 50,
|
||||||
federator_outgoing: 50,
|
federator_outgoing: 50,
|
||||||
|
ingestion_queue: 50,
|
||||||
web_push: 50,
|
web_push: 50,
|
||||||
mailer: 10,
|
mailer: 10,
|
||||||
transmogrifier: 20,
|
transmogrifier: 20,
|
||||||
|
|
|
@ -270,6 +270,19 @@
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
%{
|
||||||
|
group: :pleroma,
|
||||||
|
key: :fed_sockets,
|
||||||
|
type: :group,
|
||||||
|
description: "Websocket based federation",
|
||||||
|
children: [
|
||||||
|
%{
|
||||||
|
key: :enabled,
|
||||||
|
type: :boolean,
|
||||||
|
description: "Enable FedSockets"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
%{
|
%{
|
||||||
group: :pleroma,
|
group: :pleroma,
|
||||||
key: Pleroma.Emails.Mailer,
|
key: Pleroma.Emails.Mailer,
|
||||||
|
|
|
@ -19,6 +19,11 @@
|
||||||
level: :warn,
|
level: :warn,
|
||||||
format: "\n[$level] $message\n"
|
format: "\n[$level] $message\n"
|
||||||
|
|
||||||
|
config :pleroma, :fed_sockets,
|
||||||
|
enabled: false,
|
||||||
|
connection_duration: 5,
|
||||||
|
rejection_duration: 5
|
||||||
|
|
||||||
config :pleroma, :auth, oauth_consumer_strategies: []
|
config :pleroma, :auth, oauth_consumer_strategies: []
|
||||||
|
|
||||||
config :pleroma, Pleroma.Upload,
|
config :pleroma, Pleroma.Upload,
|
||||||
|
|
|
@ -225,6 +225,16 @@ Enables the worker which processes posts scheduled for deletion. Pinned posts ar
|
||||||
|
|
||||||
* `enabled`: whether expired activities will be sent to the job queue to be deleted
|
* `enabled`: whether expired activities will be sent to the job queue to be deleted
|
||||||
|
|
||||||
|
## FedSockets
|
||||||
|
FedSockets is an experimental feature allowing for Pleroma backends to federate using a persistant websocket connection as opposed to making each federation a seperate http connection. This feature is currently off by default. It is configurable throught he following options.
|
||||||
|
|
||||||
|
### :fedsockets
|
||||||
|
* `enabled`: Enables FedSockets for this instance. `false` by default.
|
||||||
|
* `connection_duration`: Time an idle websocket is kept open.
|
||||||
|
* `rejection_duration`: Failures to connect via FedSockets will not be retried for this period of time.
|
||||||
|
* `fed_socket_fetches` and `fed_socket_rejections`: Settings passed to `cachex` for the fetch registry, and rejection stacks. See `Pleroma.Web.FedSockets` for more details.
|
||||||
|
|
||||||
|
|
||||||
## Frontends
|
## Frontends
|
||||||
|
|
||||||
### :frontend_configurations
|
### :frontend_configurations
|
||||||
|
|
|
@ -99,7 +99,7 @@ def start(_type, _args) do
|
||||||
{Oban, Config.get(Oban)}
|
{Oban, Config.get(Oban)}
|
||||||
] ++
|
] ++
|
||||||
task_children(@env) ++
|
task_children(@env) ++
|
||||||
streamer_child(@env) ++
|
dont_run_in_test(@env) ++
|
||||||
chat_child(@env, chat_enabled?()) ++
|
chat_child(@env, chat_enabled?()) ++
|
||||||
[
|
[
|
||||||
Pleroma.Web.Endpoint,
|
Pleroma.Web.Endpoint,
|
||||||
|
@ -188,16 +188,17 @@ def build_cachex(type, opts),
|
||||||
|
|
||||||
defp chat_enabled?, do: Config.get([:chat, :enabled])
|
defp chat_enabled?, do: Config.get([:chat, :enabled])
|
||||||
|
|
||||||
defp streamer_child(env) when env in [:test, :benchmark], do: []
|
defp dont_run_in_test(env) when env in [:test, :benchmark], do: []
|
||||||
|
|
||||||
defp streamer_child(_) do
|
defp dont_run_in_test(_) do
|
||||||
[
|
[
|
||||||
{Registry,
|
{Registry,
|
||||||
[
|
[
|
||||||
name: Pleroma.Web.Streamer.registry(),
|
name: Pleroma.Web.Streamer.registry(),
|
||||||
keys: :duplicate,
|
keys: :duplicate,
|
||||||
partitions: System.schedulers_online()
|
partitions: System.schedulers_online()
|
||||||
]}
|
]},
|
||||||
|
Pleroma.Web.FedSockets.Supervisor
|
||||||
]
|
]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ defmodule Pleroma.Object.Fetcher do
|
||||||
alias Pleroma.Web.ActivityPub.ObjectValidator
|
alias Pleroma.Web.ActivityPub.ObjectValidator
|
||||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||||
alias Pleroma.Web.Federator
|
alias Pleroma.Web.Federator
|
||||||
|
alias Pleroma.Web.FedSockets
|
||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
require Pleroma.Constants
|
require Pleroma.Constants
|
||||||
|
@ -182,27 +183,20 @@ defp maybe_date_fetch(headers, date) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
|
def fetch_and_contain_remote_object_from_id(prm, opts \\ [])
|
||||||
|
|
||||||
|
def fetch_and_contain_remote_object_from_id(%{"id" => id}, opts),
|
||||||
|
do: fetch_and_contain_remote_object_from_id(id, opts)
|
||||||
|
|
||||||
|
def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do
|
||||||
Logger.debug("Fetching object #{id} via AP")
|
Logger.debug("Fetching object #{id} via AP")
|
||||||
|
|
||||||
date = Pleroma.Signature.signed_date()
|
|
||||||
|
|
||||||
headers =
|
|
||||||
[{"accept", "application/activity+json"}]
|
|
||||||
|> maybe_date_fetch(date)
|
|
||||||
|> sign_fetch(id, date)
|
|
||||||
|
|
||||||
Logger.debug("Fetch headers: #{inspect(headers)}")
|
|
||||||
|
|
||||||
with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
|
with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
|
||||||
{:ok, %{body: body, status: code}} when code in 200..299 <- HTTP.get(id, headers),
|
{:ok, body} <- get_object(id, opts),
|
||||||
{:ok, data} <- Jason.decode(body),
|
{:ok, data} <- safe_json_decode(body),
|
||||||
:ok <- Containment.contain_origin_from_id(id, data) do
|
:ok <- Containment.contain_origin_from_id(id, data) do
|
||||||
{:ok, data}
|
{:ok, data}
|
||||||
else
|
else
|
||||||
{:ok, %{status: code}} when code in [404, 410] ->
|
|
||||||
{:error, "Object has been deleted"}
|
|
||||||
|
|
||||||
{:scheme, _} ->
|
{:scheme, _} ->
|
||||||
{:error, "Unsupported URI scheme"}
|
{:error, "Unsupported URI scheme"}
|
||||||
|
|
||||||
|
@ -214,8 +208,44 @@ def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def fetch_and_contain_remote_object_from_id(%{"id" => id}),
|
def fetch_and_contain_remote_object_from_id(_id, _opts),
|
||||||
do: fetch_and_contain_remote_object_from_id(id)
|
do: {:error, "id must be a string"}
|
||||||
|
|
||||||
def fetch_and_contain_remote_object_from_id(_id), do: {:error, "id must be a string"}
|
defp get_object(id, opts) do
|
||||||
|
with false <- Keyword.get(opts, :force_http, false),
|
||||||
|
{:ok, fedsocket} <- FedSockets.get_or_create_fed_socket(id) do
|
||||||
|
Logger.debug("fetching via fedsocket - #{inspect(id)}")
|
||||||
|
FedSockets.fetch(fedsocket, id)
|
||||||
|
else
|
||||||
|
_other ->
|
||||||
|
Logger.debug("fetching via http - #{inspect(id)}")
|
||||||
|
get_object_http(id)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_object_http(id) do
|
||||||
|
date = Pleroma.Signature.signed_date()
|
||||||
|
|
||||||
|
headers =
|
||||||
|
[{"accept", "application/activity+json"}]
|
||||||
|
|> maybe_date_fetch(date)
|
||||||
|
|> sign_fetch(id, date)
|
||||||
|
|
||||||
|
case HTTP.get(id, headers) do
|
||||||
|
{:ok, %{body: body, status: code}} when code in 200..299 ->
|
||||||
|
{:ok, body}
|
||||||
|
|
||||||
|
{:ok, %{status: code}} when code in [404, 410] ->
|
||||||
|
{:error, "Object has been deleted"}
|
||||||
|
|
||||||
|
{:error, e} ->
|
||||||
|
{:error, e}
|
||||||
|
|
||||||
|
e ->
|
||||||
|
{:error, e}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp safe_json_decode(nil), do: {:ok, nil}
|
||||||
|
defp safe_json_decode(json), do: Jason.decode(json)
|
||||||
end
|
end
|
||||||
|
|
|
@ -39,7 +39,7 @@ def key_id_to_actor_id(key_id) do
|
||||||
def fetch_public_key(conn) do
|
def fetch_public_key(conn) do
|
||||||
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
|
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
|
||||||
{:ok, actor_id} <- key_id_to_actor_id(kid),
|
{:ok, actor_id} <- key_id_to_actor_id(kid),
|
||||||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do
|
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do
|
||||||
{:ok, public_key}
|
{:ok, public_key}
|
||||||
else
|
else
|
||||||
e ->
|
e ->
|
||||||
|
@ -50,8 +50,8 @@ def fetch_public_key(conn) do
|
||||||
def refetch_public_key(conn) do
|
def refetch_public_key(conn) do
|
||||||
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
|
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
|
||||||
{:ok, actor_id} <- key_id_to_actor_id(kid),
|
{:ok, actor_id} <- key_id_to_actor_id(kid),
|
||||||
{:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id),
|
{:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id, force_http: true),
|
||||||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do
|
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do
|
||||||
{:ok, public_key}
|
{:ok, public_key}
|
||||||
else
|
else
|
||||||
e ->
|
e ->
|
||||||
|
|
|
@ -1820,12 +1820,12 @@ def html_filter_policy(%User{no_rich_text: true}) do
|
||||||
|
|
||||||
def html_filter_policy(_), do: Config.get([:markup, :scrub_policy])
|
def html_filter_policy(_), do: Config.get([:markup, :scrub_policy])
|
||||||
|
|
||||||
def fetch_by_ap_id(ap_id), do: ActivityPub.make_user_from_ap_id(ap_id)
|
def fetch_by_ap_id(ap_id, opts \\ []), do: ActivityPub.make_user_from_ap_id(ap_id, opts)
|
||||||
|
|
||||||
def get_or_fetch_by_ap_id(ap_id) do
|
def get_or_fetch_by_ap_id(ap_id, opts \\ []) do
|
||||||
cached_user = get_cached_by_ap_id(ap_id)
|
cached_user = get_cached_by_ap_id(ap_id)
|
||||||
|
|
||||||
maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id)
|
maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id, opts)
|
||||||
|
|
||||||
case {cached_user, maybe_fetched_user} do
|
case {cached_user, maybe_fetched_user} do
|
||||||
{_, {:ok, %User{} = user}} ->
|
{_, {:ok, %User{} = user}} ->
|
||||||
|
@ -1898,8 +1898,8 @@ def public_key(%{public_key: public_key_pem}) when is_binary(public_key_pem) do
|
||||||
|
|
||||||
def public_key(_), do: {:error, "key not found"}
|
def public_key(_), do: {:error, "key not found"}
|
||||||
|
|
||||||
def get_public_key_for_ap_id(ap_id) do
|
def get_public_key_for_ap_id(ap_id, opts \\ []) do
|
||||||
with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id),
|
with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id, opts),
|
||||||
{:ok, public_key} <- public_key(user) do
|
{:ok, public_key} <- public_key(user) do
|
||||||
{:ok, public_key}
|
{:ok, public_key}
|
||||||
else
|
else
|
||||||
|
|
|
@ -1270,10 +1270,12 @@ defp object_to_user_data(data) do
|
||||||
|
|
||||||
def fetch_follow_information_for_user(user) do
|
def fetch_follow_information_for_user(user) do
|
||||||
with {:ok, following_data} <-
|
with {:ok, following_data} <-
|
||||||
Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
|
Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
|
||||||
|
force_http: true
|
||||||
|
),
|
||||||
{:ok, hide_follows} <- collection_private(following_data),
|
{:ok, hide_follows} <- collection_private(following_data),
|
||||||
{:ok, followers_data} <-
|
{:ok, followers_data} <-
|
||||||
Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
|
Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
|
||||||
{:ok, hide_followers} <- collection_private(followers_data) do
|
{:ok, hide_followers} <- collection_private(followers_data) do
|
||||||
{:ok,
|
{:ok,
|
||||||
%{
|
%{
|
||||||
|
@ -1347,8 +1349,8 @@ def user_data_from_user_object(data) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def fetch_and_prepare_user_from_ap_id(ap_id) do
|
def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
|
||||||
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
|
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
|
||||||
{:ok, data} <- user_data_from_user_object(data) do
|
{:ok, data} <- user_data_from_user_object(data) do
|
||||||
{:ok, maybe_update_follow_information(data)}
|
{:ok, maybe_update_follow_information(data)}
|
||||||
else
|
else
|
||||||
|
@ -1390,13 +1392,13 @@ def maybe_handle_clashing_nickname(data) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def make_user_from_ap_id(ap_id) do
|
def make_user_from_ap_id(ap_id, opts \\ []) do
|
||||||
user = User.get_cached_by_ap_id(ap_id)
|
user = User.get_cached_by_ap_id(ap_id)
|
||||||
|
|
||||||
if user && !User.ap_enabled?(user) do
|
if user && !User.ap_enabled?(user) do
|
||||||
Transmogrifier.upgrade_user_from_ap_id(ap_id)
|
Transmogrifier.upgrade_user_from_ap_id(ap_id)
|
||||||
else
|
else
|
||||||
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
|
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
|
||||||
if user do
|
if user do
|
||||||
user
|
user
|
||||||
|> User.remote_user_changeset(data)
|
|> User.remote_user_changeset(data)
|
||||||
|
|
|
@ -13,6 +13,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.Web.ActivityPub.Relay
|
alias Pleroma.Web.ActivityPub.Relay
|
||||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||||
|
alias Pleroma.Web.FedSockets
|
||||||
|
|
||||||
require Pleroma.Constants
|
require Pleroma.Constants
|
||||||
|
|
||||||
|
@ -50,15 +51,35 @@ def is_representable?(%Activity{} = activity) do
|
||||||
def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do
|
def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do
|
||||||
Logger.debug("Federating #{id} to #{inbox}")
|
Logger.debug("Federating #{id} to #{inbox}")
|
||||||
|
|
||||||
uri = URI.parse(inbox)
|
case FedSockets.get_or_create_fed_socket(inbox) do
|
||||||
|
{:ok, fedsocket} ->
|
||||||
|
Logger.debug("publishing via fedsockets - #{inspect(inbox)}")
|
||||||
|
FedSockets.publish(fedsocket, json)
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
Logger.debug("publishing via http - #{inspect(inbox)}")
|
||||||
|
http_publish(inbox, actor, json, params)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def publish_one(%{actor_id: actor_id} = params) do
|
||||||
|
actor = User.get_cached_by_id(actor_id)
|
||||||
|
|
||||||
|
params
|
||||||
|
|> Map.delete(:actor_id)
|
||||||
|
|> Map.put(:actor, actor)
|
||||||
|
|> publish_one()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp http_publish(inbox, actor, json, params) do
|
||||||
|
uri = %{path: path} = URI.parse(inbox)
|
||||||
digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
|
digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
|
||||||
|
|
||||||
date = Pleroma.Signature.signed_date()
|
date = Pleroma.Signature.signed_date()
|
||||||
|
|
||||||
signature =
|
signature =
|
||||||
Pleroma.Signature.sign(actor, %{
|
Pleroma.Signature.sign(actor, %{
|
||||||
"(request-target)": "post #{uri.path}",
|
"(request-target)": "post #{path}",
|
||||||
host: signature_host(uri),
|
host: signature_host(uri),
|
||||||
"content-length": byte_size(json),
|
"content-length": byte_size(json),
|
||||||
digest: digest,
|
digest: digest,
|
||||||
|
@ -89,15 +110,6 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def publish_one(%{actor_id: actor_id} = params) do
|
|
||||||
actor = User.get_cached_by_id(actor_id)
|
|
||||||
|
|
||||||
params
|
|
||||||
|> Map.delete(:actor_id)
|
|
||||||
|> Map.put(:actor, actor)
|
|
||||||
|> publish_one()
|
|
||||||
end
|
|
||||||
|
|
||||||
defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
|
defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
|
||||||
if port == URI.default_port(scheme) do
|
if port == URI.default_port(scheme) do
|
||||||
host
|
host
|
||||||
|
|
|
@ -1000,7 +1000,7 @@ def perform(:user_upgrade, user) do
|
||||||
|
|
||||||
def upgrade_user_from_ap_id(ap_id) do
|
def upgrade_user_from_ap_id(ap_id) do
|
||||||
with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id),
|
with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id),
|
||||||
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id),
|
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id, force_http: true),
|
||||||
{:ok, user} <- update_user(user, data) do
|
{:ok, user} <- update_user(user, data) do
|
||||||
TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
|
TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
|
||||||
{:ok, user}
|
{:ok, user}
|
||||||
|
|
185
lib/pleroma/web/fed_sockets/fed_registry.ex
Normal file
185
lib/pleroma/web/fed_sockets/fed_registry.ex
Normal file
|
@ -0,0 +1,185 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.FedRegistry do
|
||||||
|
@moduledoc """
|
||||||
|
The FedRegistry stores the active FedSockets for quick retrieval.
|
||||||
|
|
||||||
|
The storage and retrieval portion of the FedRegistry is done in process through
|
||||||
|
elixir's `Registry` module for speed and its ability to monitor for terminated processes.
|
||||||
|
|
||||||
|
Dropped connections will be caught by `Registry` and deleted. Since the next
|
||||||
|
message will initiate a new connection there is no reason to try and reconnect at that point.
|
||||||
|
|
||||||
|
Normally outside modules should have no need to call or use the FedRegistry themselves.
|
||||||
|
"""
|
||||||
|
|
||||||
|
alias Pleroma.Web.FedSockets.FedSocket
|
||||||
|
alias Pleroma.Web.FedSockets.SocketInfo
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
@default_rejection_duration 15 * 60 * 1000
|
||||||
|
@rejections :fed_socket_rejections
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Retrieves a FedSocket from the Registry given it's origin.
|
||||||
|
|
||||||
|
The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
|
||||||
|
|
||||||
|
Will return:
|
||||||
|
* {:ok, fed_socket} for working FedSockets
|
||||||
|
* {:error, :rejected} for origins that have been tried and refused within the rejection duration interval
|
||||||
|
* {:error, some_reason} usually :missing for unknown origins
|
||||||
|
"""
|
||||||
|
def get_fed_socket(origin) do
|
||||||
|
case get_registry_data(origin) do
|
||||||
|
{:error, reason} ->
|
||||||
|
{:error, reason}
|
||||||
|
|
||||||
|
{:ok, %{state: :connected} = socket_info} ->
|
||||||
|
{:ok, socket_info}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Adds a connected FedSocket to the Registry.
|
||||||
|
|
||||||
|
Always returns {:ok, fed_socket}
|
||||||
|
"""
|
||||||
|
def add_fed_socket(origin, pid \\ nil) do
|
||||||
|
origin
|
||||||
|
|> SocketInfo.build(pid)
|
||||||
|
|> SocketInfo.connect()
|
||||||
|
|> add_socket_info
|
||||||
|
end
|
||||||
|
|
||||||
|
defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do
|
||||||
|
case Registry.register(FedSockets.Registry, origin, socket_info) do
|
||||||
|
{:ok, _owner} ->
|
||||||
|
clear_prior_rejection(origin)
|
||||||
|
Logger.debug("fedsocket added: #{inspect(origin)}")
|
||||||
|
|
||||||
|
{:ok, socket_info}
|
||||||
|
|
||||||
|
{:error, {:already_registered, _pid}} ->
|
||||||
|
FedSocket.close(socket_info)
|
||||||
|
existing_socket_info = Registry.lookup(FedSockets.Registry, origin)
|
||||||
|
|
||||||
|
{:ok, existing_socket_info}
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
{:error, :error_adding_socket}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Mark this origin as having rejected a connection attempt.
|
||||||
|
This will keep it from getting additional connection attempts
|
||||||
|
for a period of time specified in the config.
|
||||||
|
|
||||||
|
Always returns {:ok, new_reg_data}
|
||||||
|
"""
|
||||||
|
def set_host_rejected(uri) do
|
||||||
|
new_reg_data =
|
||||||
|
uri
|
||||||
|
|> SocketInfo.origin()
|
||||||
|
|> get_or_create_registry_data()
|
||||||
|
|> set_to_rejected()
|
||||||
|
|> save_registry_data()
|
||||||
|
|
||||||
|
{:ok, new_reg_data}
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Retrieves the FedRegistryData from the Registry given it's origin.
|
||||||
|
|
||||||
|
The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
|
||||||
|
|
||||||
|
Will return:
|
||||||
|
* {:ok, fed_registry_data} for known origins
|
||||||
|
* {:error, :missing} for uniknown origins
|
||||||
|
* {:error, :cache_error} indicating some low level runtime issues
|
||||||
|
"""
|
||||||
|
def get_registry_data(origin) do
|
||||||
|
case Registry.lookup(FedSockets.Registry, origin) do
|
||||||
|
[] ->
|
||||||
|
if is_rejected?(origin) do
|
||||||
|
Logger.debug("previously rejected fedsocket requested")
|
||||||
|
{:error, :rejected}
|
||||||
|
else
|
||||||
|
{:error, :missing}
|
||||||
|
end
|
||||||
|
|
||||||
|
[{_pid, %{state: :connected} = socket_info}] ->
|
||||||
|
{:ok, socket_info}
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
{:error, :cache_error}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo
|
||||||
|
"""
|
||||||
|
def list_all do
|
||||||
|
(list_all_connected() ++ list_all_rejected())
|
||||||
|
|> Enum.into(%{})
|
||||||
|
end
|
||||||
|
|
||||||
|
defp list_all_connected do
|
||||||
|
FedSockets.Registry
|
||||||
|
|> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}])
|
||||||
|
end
|
||||||
|
|
||||||
|
defp list_all_rejected do
|
||||||
|
{:ok, keys} = Cachex.keys(@rejections)
|
||||||
|
|
||||||
|
{:ok, registry_data} =
|
||||||
|
Cachex.execute(@rejections, fn worker ->
|
||||||
|
Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
registry_data
|
||||||
|
end
|
||||||
|
|
||||||
|
defp clear_prior_rejection(origin),
|
||||||
|
do: Cachex.del(@rejections, origin)
|
||||||
|
|
||||||
|
defp is_rejected?(origin) do
|
||||||
|
case Cachex.get(@rejections, origin) do
|
||||||
|
{:ok, nil} ->
|
||||||
|
false
|
||||||
|
|
||||||
|
{:ok, _} ->
|
||||||
|
true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_or_create_registry_data(origin) do
|
||||||
|
case get_registry_data(origin) do
|
||||||
|
{:error, :missing} ->
|
||||||
|
%SocketInfo{origin: origin}
|
||||||
|
|
||||||
|
{:ok, socket_info} ->
|
||||||
|
socket_info
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do
|
||||||
|
{:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end)
|
||||||
|
socket_info
|
||||||
|
end
|
||||||
|
|
||||||
|
defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do
|
||||||
|
rejection_expiration =
|
||||||
|
Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration)
|
||||||
|
|
||||||
|
{:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration)
|
||||||
|
socket_info
|
||||||
|
end
|
||||||
|
|
||||||
|
defp set_to_rejected(%SocketInfo{} = socket_info),
|
||||||
|
do: %SocketInfo{socket_info | state: :rejected}
|
||||||
|
end
|
137
lib/pleroma/web/fed_sockets/fed_socket.ex
Normal file
137
lib/pleroma/web/fed_sockets/fed_socket.ex
Normal file
|
@ -0,0 +1,137 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.FedSocket do
|
||||||
|
@moduledoc """
|
||||||
|
The FedSocket module abstracts the actions to be taken taken on connections regardless of
|
||||||
|
whether the connection started as inbound or outbound.
|
||||||
|
|
||||||
|
|
||||||
|
Normally outside modules will have no need to call the FedSocket module directly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
alias Pleroma.Object
|
||||||
|
alias Pleroma.Object.Containment
|
||||||
|
alias Pleroma.User
|
||||||
|
alias Pleroma.Web.ActivityPub.ObjectView
|
||||||
|
alias Pleroma.Web.ActivityPub.UserView
|
||||||
|
alias Pleroma.Web.ActivityPub.Visibility
|
||||||
|
alias Pleroma.Web.FedSockets.FetchRegistry
|
||||||
|
alias Pleroma.Web.FedSockets.IngesterWorker
|
||||||
|
alias Pleroma.Web.FedSockets.OutgoingHandler
|
||||||
|
alias Pleroma.Web.FedSockets.SocketInfo
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
@shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
|
||||||
|
|
||||||
|
def connect_to_host(uri) do
|
||||||
|
case OutgoingHandler.start_link(uri) do
|
||||||
|
{:ok, pid} ->
|
||||||
|
{:ok, pid}
|
||||||
|
|
||||||
|
error ->
|
||||||
|
{:error, error}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def close(%SocketInfo{pid: socket_pid}),
|
||||||
|
do: Process.send(socket_pid, :close, [])
|
||||||
|
|
||||||
|
def publish(%SocketInfo{pid: socket_pid}, json) do
|
||||||
|
%{action: :publish, data: json}
|
||||||
|
|> Jason.encode!()
|
||||||
|
|> send_packet(socket_pid)
|
||||||
|
end
|
||||||
|
|
||||||
|
def fetch(%SocketInfo{pid: socket_pid}, id) do
|
||||||
|
fetch_uuid = FetchRegistry.register_fetch(id)
|
||||||
|
|
||||||
|
%{action: :fetch, data: id, uuid: fetch_uuid}
|
||||||
|
|> Jason.encode!()
|
||||||
|
|> send_packet(socket_pid)
|
||||||
|
|
||||||
|
wait_for_fetch_to_return(fetch_uuid, 0)
|
||||||
|
end
|
||||||
|
|
||||||
|
def receive_package(%SocketInfo{} = fed_socket, json) do
|
||||||
|
json
|
||||||
|
|> Jason.decode!()
|
||||||
|
|> process_package(fed_socket)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp wait_for_fetch_to_return(uuid, cntr) do
|
||||||
|
case FetchRegistry.check_fetch(uuid) do
|
||||||
|
{:error, :waiting} ->
|
||||||
|
Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
|
||||||
|
wait_for_fetch_to_return(uuid, cntr + 1)
|
||||||
|
|
||||||
|
{:error, :missing} ->
|
||||||
|
Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
|
||||||
|
{:error, :timeout}
|
||||||
|
|
||||||
|
{:ok, _fr} ->
|
||||||
|
FetchRegistry.pop_fetch(uuid)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
|
||||||
|
if Containment.contain_origin(origin, data) do
|
||||||
|
IngesterWorker.enqueue("ingest", %{"object" => data})
|
||||||
|
end
|
||||||
|
|
||||||
|
{:reply, %{"action" => "publish_reply", "status" => "processed"}}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
|
||||||
|
FetchRegistry.register_fetch_received(uuid, data)
|
||||||
|
{:noreply, nil}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
|
||||||
|
{:ok, data} = render_fetched_data(ap_id, uuid)
|
||||||
|
{:reply, data}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
|
||||||
|
{:noreply, nil}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp process_package(other, _fed_socket) do
|
||||||
|
Logger.warn("unknown json packages received #{inspect(other)}")
|
||||||
|
{:noreply, nil}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp render_fetched_data(ap_id, uuid) do
|
||||||
|
{:ok,
|
||||||
|
%{
|
||||||
|
"action" => "fetch_reply",
|
||||||
|
"status" => "processed",
|
||||||
|
"uuid" => uuid,
|
||||||
|
"data" => represent_item(ap_id)
|
||||||
|
}}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp represent_item(ap_id) do
|
||||||
|
case User.get_by_ap_id(ap_id) do
|
||||||
|
nil ->
|
||||||
|
object = Object.get_cached_by_ap_id(ap_id)
|
||||||
|
|
||||||
|
if Visibility.is_public?(object) do
|
||||||
|
Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
|
||||||
|
else
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
|
user ->
|
||||||
|
Phoenix.View.render_to_string(UserView, "user.json", user: user)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp send_packet(data, socket_pid) do
|
||||||
|
Process.send(socket_pid, {:send, data}, [])
|
||||||
|
end
|
||||||
|
|
||||||
|
def shake, do: @shake
|
||||||
|
end
|
182
lib/pleroma/web/fed_sockets/fed_sockets.ex
Normal file
182
lib/pleroma/web/fed_sockets/fed_sockets.ex
Normal file
|
@ -0,0 +1,182 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets do
|
||||||
|
@moduledoc """
|
||||||
|
This documents the FedSockets framework. A framework for federating
|
||||||
|
ActivityPub objects between servers via persistant WebSocket connections.
|
||||||
|
|
||||||
|
FedSockets allow servers to authenticate on first contact and maintain that
|
||||||
|
connection, eliminating the need to authenticate every time data needs to be shared.
|
||||||
|
|
||||||
|
## Protocol
|
||||||
|
FedSockets currently support 2 types of data transfer:
|
||||||
|
* `publish` method which doesn't require a response
|
||||||
|
* `fetch` method requires a response be sent
|
||||||
|
|
||||||
|
### Publish
|
||||||
|
The publish operation sends a json encoded map of the shape:
|
||||||
|
%{action: :publish, data: json}
|
||||||
|
and accepts (but does not require) a reply of form:
|
||||||
|
%{"action" => "publish_reply"}
|
||||||
|
|
||||||
|
The outgoing params represent
|
||||||
|
* data: ActivityPub object encoded into json
|
||||||
|
|
||||||
|
|
||||||
|
### Fetch
|
||||||
|
The fetch operation sends a json encoded map of the shape:
|
||||||
|
%{action: :fetch, data: id, uuid: fetch_uuid}
|
||||||
|
and requires a reply of form:
|
||||||
|
%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}
|
||||||
|
|
||||||
|
The outgoing params represent
|
||||||
|
* id: an ActivityPub object URI
|
||||||
|
* uuid: a unique uuid generated by the sender
|
||||||
|
|
||||||
|
The reply params represent
|
||||||
|
* data: an ActivityPub object encoded into json
|
||||||
|
* uuid: the uuid sent along with the fetch request
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module.
|
||||||
|
|
||||||
|
A typical publish operation can be performed through the following code, and a fetch operation in a similar manner.
|
||||||
|
|
||||||
|
case FedSockets.get_or_create_fed_socket(inbox) do
|
||||||
|
{:ok, fedsocket} ->
|
||||||
|
FedSockets.publish(fedsocket, json)
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
alternative_publish(inbox, actor, json, params)
|
||||||
|
end
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
FedSockets have the following config settings
|
||||||
|
|
||||||
|
config :pleroma, :fed_sockets,
|
||||||
|
enabled: true,
|
||||||
|
ping_interval: :timer.seconds(15),
|
||||||
|
connection_duration: :timer.hours(1),
|
||||||
|
rejection_duration: :timer.hours(1),
|
||||||
|
fed_socket_fetches: [
|
||||||
|
default: 12_000,
|
||||||
|
interval: 3_000,
|
||||||
|
lazy: false
|
||||||
|
]
|
||||||
|
* enabled - turn FedSockets on or off with this flag. Can be toggled at runtime.
|
||||||
|
* connection_duration - How long a FedSocket can sit idle before it's culled.
|
||||||
|
* rejection_duration - After failing to make a FedSocket connection a host will be excluded
|
||||||
|
from further connections for this amount of time
|
||||||
|
* fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry
|
||||||
|
* fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry
|
||||||
|
|
||||||
|
Cachex options are
|
||||||
|
* default: the minimum amount of time a fetch can wait before it times out.
|
||||||
|
* interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed
|
||||||
|
* lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement
|
||||||
|
|
||||||
|
"""
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Pleroma.Web.FedSockets.FedRegistry
|
||||||
|
alias Pleroma.Web.FedSockets.FedSocket
|
||||||
|
alias Pleroma.Web.FedSockets.SocketInfo
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
returns a FedSocket for the given origin. Will reuse an existing one or create a new one.
|
||||||
|
|
||||||
|
address is expected to be a fully formed URL such as:
|
||||||
|
"http://www.example.com" or "http://www.example.com:8080"
|
||||||
|
|
||||||
|
It can and usually does include additional path parameters,
|
||||||
|
but these are ignored as the FedSockets are organized by host and port info alone.
|
||||||
|
"""
|
||||||
|
def get_or_create_fed_socket(address) do
|
||||||
|
with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)},
|
||||||
|
{:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)},
|
||||||
|
{:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do
|
||||||
|
Logger.debug("fedsocket created for - #{inspect(address)}")
|
||||||
|
{:ok, fed_socket}
|
||||||
|
else
|
||||||
|
{:cache, {:ok, socket}} ->
|
||||||
|
Logger.debug("fedsocket found in cache - #{inspect(address)}")
|
||||||
|
{:ok, socket}
|
||||||
|
|
||||||
|
{:connect, {:error, _host}} ->
|
||||||
|
Logger.debug("set host rejected for - #{inspect(address)}")
|
||||||
|
FedRegistry.set_host_rejected(address)
|
||||||
|
{:error, :rejected}
|
||||||
|
|
||||||
|
{_, {:error, :disabled}} ->
|
||||||
|
{:error, :disabled}
|
||||||
|
|
||||||
|
{_, {:error, reason}} ->
|
||||||
|
Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}")
|
||||||
|
{:error, reason}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist.
|
||||||
|
|
||||||
|
address is expected to be a fully formed URL such as:
|
||||||
|
"http://www.example.com" or "http://www.example.com:8080"
|
||||||
|
"""
|
||||||
|
def get_fed_socket(address) do
|
||||||
|
origin = SocketInfo.origin(address)
|
||||||
|
|
||||||
|
with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)},
|
||||||
|
{:ok, socket} <- FedRegistry.get_fed_socket(origin) do
|
||||||
|
{:ok, socket}
|
||||||
|
else
|
||||||
|
{:config, _} ->
|
||||||
|
{:error, :disabled}
|
||||||
|
|
||||||
|
{:error, :rejected} ->
|
||||||
|
Logger.debug("FedSocket previously rejected - #{inspect(origin)}")
|
||||||
|
{:error, :rejected}
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
{:error, reason}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Sends the supplied data via the publish protocol.
|
||||||
|
It will not block waiting for a reply.
|
||||||
|
Returns :ok but this is not an indication of a successful transfer.
|
||||||
|
|
||||||
|
the data is expected to be JSON encoded binary data.
|
||||||
|
"""
|
||||||
|
def publish(%SocketInfo{} = fed_socket, json) do
|
||||||
|
FedSocket.publish(fed_socket, json)
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Sends the supplied data via the fetch protocol.
|
||||||
|
It will block waiting for a reply or timeout.
|
||||||
|
|
||||||
|
Returns {:ok, object} where object is the requested object (or nil)
|
||||||
|
{:error, :timeout} in the event the message was not responded to
|
||||||
|
|
||||||
|
the id is expected to be the URI of an ActivityPub object.
|
||||||
|
"""
|
||||||
|
def fetch(%SocketInfo{} = fed_socket, id) do
|
||||||
|
FedSocket.fetch(fed_socket, id)
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Disconnect all and restart FedSockets.
|
||||||
|
This is mainly used in development and testing but could be useful in production.
|
||||||
|
"""
|
||||||
|
def reset do
|
||||||
|
FedRegistry
|
||||||
|
|> Process.whereis()
|
||||||
|
|> Process.exit(:testing)
|
||||||
|
end
|
||||||
|
|
||||||
|
def uri_for_origin(origin),
|
||||||
|
do: "ws://#{origin}/api/fedsocket/v1"
|
||||||
|
end
|
151
lib/pleroma/web/fed_sockets/fetch_registry.ex
Normal file
151
lib/pleroma/web/fed_sockets/fetch_registry.ex
Normal file
|
@ -0,0 +1,151 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.FetchRegistry do
|
||||||
|
@moduledoc """
|
||||||
|
The FetchRegistry acts as a broker for fetch requests and return values.
|
||||||
|
This allows calling processes to block while waiting for a reply.
|
||||||
|
It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing
|
||||||
|
multi threaded processes to avoid bottlenecking.
|
||||||
|
|
||||||
|
Normally outside modules will have no need to call or use the FetchRegistry themselves.
|
||||||
|
|
||||||
|
The `Cachex` parameters can be controlled from the config. Since exact timeout intervals
|
||||||
|
aren't necessary the following settings are used by default:
|
||||||
|
|
||||||
|
config :pleroma, :fed_sockets,
|
||||||
|
fed_socket_fetches: [
|
||||||
|
default: 12_000,
|
||||||
|
interval: 3_000,
|
||||||
|
lazy: false
|
||||||
|
]
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
defmodule FetchRegistryData do
|
||||||
|
defstruct uuid: nil,
|
||||||
|
sent_json: nil,
|
||||||
|
received_json: nil,
|
||||||
|
sent_at: nil,
|
||||||
|
received_at: nil
|
||||||
|
end
|
||||||
|
|
||||||
|
alias Ecto.UUID
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
@fetches :fed_socket_fetches
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Registers a json request wth the FetchRegistry and returns the identifying UUID.
|
||||||
|
"""
|
||||||
|
def register_fetch(json) do
|
||||||
|
%FetchRegistryData{uuid: uuid} =
|
||||||
|
json
|
||||||
|
|> new_registry_data
|
||||||
|
|> save_registry_data
|
||||||
|
|
||||||
|
uuid
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Reports on the status of a Fetch given the identifying UUID.
|
||||||
|
|
||||||
|
Will return
|
||||||
|
* {:ok, fetched_object} if a fetch has completed
|
||||||
|
* {:error, :waiting} if a fetch is still pending
|
||||||
|
* {:error, other_error} usually :missing to indicate a fetch that has timed out
|
||||||
|
"""
|
||||||
|
def check_fetch(uuid) do
|
||||||
|
case get_registry_data(uuid) do
|
||||||
|
{:ok, %FetchRegistryData{received_at: nil}} ->
|
||||||
|
{:error, :waiting}
|
||||||
|
|
||||||
|
{:ok, %FetchRegistryData{} = reg_data} ->
|
||||||
|
{:ok, reg_data}
|
||||||
|
|
||||||
|
e ->
|
||||||
|
e
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Retrieves the response to a fetch given the identifying UUID.
|
||||||
|
The completed fetch will be deleted from the FetchRegistry
|
||||||
|
|
||||||
|
Will return
|
||||||
|
* {:ok, fetched_object} if a fetch has completed
|
||||||
|
* {:error, :waiting} if a fetch is still pending
|
||||||
|
* {:error, other_error} usually :missing to indicate a fetch that has timed out
|
||||||
|
"""
|
||||||
|
def pop_fetch(uuid) do
|
||||||
|
case check_fetch(uuid) do
|
||||||
|
{:ok, %FetchRegistryData{received_json: received_json}} ->
|
||||||
|
delete_registry_data(uuid)
|
||||||
|
{:ok, received_json}
|
||||||
|
|
||||||
|
e ->
|
||||||
|
e
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
This is called to register a fetch has returned.
|
||||||
|
It expects the result data along with the UUID that was sent in the request
|
||||||
|
|
||||||
|
Will return the fetched object or :error
|
||||||
|
"""
|
||||||
|
def register_fetch_received(uuid, data) do
|
||||||
|
case get_registry_data(uuid) do
|
||||||
|
{:ok, %FetchRegistryData{received_at: nil} = reg_data} ->
|
||||||
|
reg_data
|
||||||
|
|> set_fetch_received(data)
|
||||||
|
|> save_registry_data()
|
||||||
|
|
||||||
|
{:ok, %FetchRegistryData{} = reg_data} ->
|
||||||
|
Logger.warn("tried to add fetched data twice - #{uuid}")
|
||||||
|
reg_data
|
||||||
|
|
||||||
|
{:error, _} ->
|
||||||
|
Logger.warn("Error adding fetch to registry - #{uuid}")
|
||||||
|
:error
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp new_registry_data(json) do
|
||||||
|
%FetchRegistryData{
|
||||||
|
uuid: UUID.generate(),
|
||||||
|
sent_json: json,
|
||||||
|
sent_at: :erlang.monotonic_time(:millisecond)
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_registry_data(origin) do
|
||||||
|
case Cachex.get(@fetches, origin) do
|
||||||
|
{:ok, nil} ->
|
||||||
|
{:error, :missing}
|
||||||
|
|
||||||
|
{:ok, reg_data} ->
|
||||||
|
{:ok, reg_data}
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
{:error, :cache_error}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp set_fetch_received(%FetchRegistryData{} = reg_data, data),
|
||||||
|
do: %FetchRegistryData{
|
||||||
|
reg_data
|
||||||
|
| received_at: :erlang.monotonic_time(:millisecond),
|
||||||
|
received_json: data
|
||||||
|
}
|
||||||
|
|
||||||
|
defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do
|
||||||
|
{:ok, true} = Cachex.put(@fetches, uuid, reg_data)
|
||||||
|
reg_data
|
||||||
|
end
|
||||||
|
|
||||||
|
defp delete_registry_data(origin),
|
||||||
|
do: {:ok, true} = Cachex.del(@fetches, origin)
|
||||||
|
end
|
88
lib/pleroma/web/fed_sockets/incoming_handler.ex
Normal file
88
lib/pleroma/web/fed_sockets/incoming_handler.ex
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.IncomingHandler do
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Pleroma.Web.FedSockets.FedRegistry
|
||||||
|
alias Pleroma.Web.FedSockets.FedSocket
|
||||||
|
alias Pleroma.Web.FedSockets.SocketInfo
|
||||||
|
|
||||||
|
import HTTPSignatures, only: [validate_conn: 1, split_signature: 1]
|
||||||
|
|
||||||
|
@behaviour :cowboy_websocket
|
||||||
|
|
||||||
|
def init(req, state) do
|
||||||
|
shake = FedSocket.shake()
|
||||||
|
|
||||||
|
with true <- Pleroma.Config.get([:fed_sockets, :enabled]),
|
||||||
|
sec_protocol <- :cowboy_req.header("sec-websocket-protocol", req, nil),
|
||||||
|
headers = %{"(request-target)" => ^shake} <- :cowboy_req.headers(req),
|
||||||
|
true <- validate_conn(%{req_headers: headers}),
|
||||||
|
%{"keyId" => origin} <- split_signature(headers["signature"]) do
|
||||||
|
req =
|
||||||
|
if is_nil(sec_protocol) do
|
||||||
|
req
|
||||||
|
else
|
||||||
|
:cowboy_req.set_resp_header("sec-websocket-protocol", sec_protocol, req)
|
||||||
|
end
|
||||||
|
|
||||||
|
{:cowboy_websocket, req, %{origin: origin}, %{}}
|
||||||
|
else
|
||||||
|
_ ->
|
||||||
|
{:ok, req, state}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def websocket_init(%{origin: origin}) do
|
||||||
|
case FedRegistry.add_fed_socket(origin) do
|
||||||
|
{:ok, socket_info} ->
|
||||||
|
{:ok, socket_info}
|
||||||
|
|
||||||
|
e ->
|
||||||
|
Logger.error("FedSocket websocket_init failed - #{inspect(e)}")
|
||||||
|
{:error, inspect(e)}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Use the ping to check if the connection should be expired
|
||||||
|
def websocket_handle(:ping, socket_info) do
|
||||||
|
if SocketInfo.expired?(socket_info) do
|
||||||
|
{:stop, socket_info}
|
||||||
|
else
|
||||||
|
{:ok, socket_info, :hibernate}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def websocket_handle({:text, data}, socket_info) do
|
||||||
|
socket_info = SocketInfo.touch(socket_info)
|
||||||
|
|
||||||
|
case FedSocket.receive_package(socket_info, data) do
|
||||||
|
{:noreply, _} ->
|
||||||
|
{:ok, socket_info}
|
||||||
|
|
||||||
|
{:reply, reply} ->
|
||||||
|
{:reply, {:text, Jason.encode!(reply)}, socket_info}
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.error("incoming error - receive_package: #{inspect(reason)}")
|
||||||
|
{:ok, socket_info}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def websocket_info({:send, message}, socket_info) do
|
||||||
|
socket_info = SocketInfo.touch(socket_info)
|
||||||
|
|
||||||
|
{:reply, {:text, message}, socket_info}
|
||||||
|
end
|
||||||
|
|
||||||
|
def websocket_info(:close, state) do
|
||||||
|
{:stop, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def websocket_info(message, state) do
|
||||||
|
Logger.debug("#{__MODULE__} unknown message #{inspect(message)}")
|
||||||
|
{:ok, state}
|
||||||
|
end
|
||||||
|
end
|
33
lib/pleroma/web/fed_sockets/ingester_worker.ex
Normal file
33
lib/pleroma/web/fed_sockets/ingester_worker.ex
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.IngesterWorker do
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "ingestion_queue"
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Pleroma.Web.Federator
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%Job{args: %{"op" => "ingest", "object" => ingestee}}) do
|
||||||
|
try do
|
||||||
|
ingestee
|
||||||
|
|> Jason.decode!()
|
||||||
|
|> do_ingestion()
|
||||||
|
rescue
|
||||||
|
e ->
|
||||||
|
Logger.error("IngesterWorker error - #{inspect(e)}")
|
||||||
|
e
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_ingestion(params) do
|
||||||
|
case Federator.incoming_ap_doc(params) do
|
||||||
|
{:error, reason} ->
|
||||||
|
{:error, reason}
|
||||||
|
|
||||||
|
{:ok, object} ->
|
||||||
|
{:ok, object}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
146
lib/pleroma/web/fed_sockets/outgoing_handler.ex
Normal file
146
lib/pleroma/web/fed_sockets/outgoing_handler.ex
Normal file
|
@ -0,0 +1,146 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.OutgoingHandler do
|
||||||
|
use GenServer
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Pleroma.Web.ActivityPub.InternalFetchActor
|
||||||
|
alias Pleroma.Web.FedSockets
|
||||||
|
alias Pleroma.Web.FedSockets.FedRegistry
|
||||||
|
alias Pleroma.Web.FedSockets.FedSocket
|
||||||
|
alias Pleroma.Web.FedSockets.SocketInfo
|
||||||
|
|
||||||
|
def start_link(uri) do
|
||||||
|
GenServer.start_link(__MODULE__, %{uri: uri})
|
||||||
|
end
|
||||||
|
|
||||||
|
def init(%{uri: uri}) do
|
||||||
|
case initiate_connection(uri) do
|
||||||
|
{:ok, ws_origin, conn_pid} ->
|
||||||
|
FedRegistry.add_fed_socket(ws_origin, conn_pid)
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.debug("Outgoing connection failed - #{inspect(reason)}")
|
||||||
|
:ignore
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do
|
||||||
|
socket_info = SocketInfo.touch(socket_info)
|
||||||
|
|
||||||
|
case FedSocket.receive_package(socket_info, data) do
|
||||||
|
{:noreply, _} ->
|
||||||
|
{:noreply, socket_info}
|
||||||
|
|
||||||
|
{:reply, reply} ->
|
||||||
|
:gun.ws_send(conn_pid, {:text, Jason.encode!(reply)})
|
||||||
|
{:noreply, socket_info}
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.error("incoming error - receive_package: #{inspect(reason)}")
|
||||||
|
{:noreply, socket_info}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info(:close, state) do
|
||||||
|
Logger.debug("Sending close frame !!!!!!!")
|
||||||
|
{:close, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
|
||||||
|
{:stop, :normal, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do
|
||||||
|
socket_info = SocketInfo.touch(socket_info)
|
||||||
|
:gun.ws_send(conn_pid, {:text, data})
|
||||||
|
{:noreply, socket_info}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info({:gun_ws, _, _, :pong}, state) do
|
||||||
|
{:noreply, state, :hibernate}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info(msg, state) do
|
||||||
|
Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}")
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def terminate(reason, state) do
|
||||||
|
Logger.debug(
|
||||||
|
"#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
{:ok, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def initiate_connection(uri) do
|
||||||
|
ws_uri =
|
||||||
|
uri
|
||||||
|
|> SocketInfo.origin()
|
||||||
|
|> FedSockets.uri_for_origin()
|
||||||
|
|
||||||
|
%{host: host, port: port, path: path} = URI.parse(ws_uri)
|
||||||
|
|
||||||
|
with {:ok, conn_pid} <- :gun.open(to_charlist(host), port),
|
||||||
|
{:ok, _} <- :gun.await_up(conn_pid),
|
||||||
|
reference <- :gun.get(conn_pid, to_charlist(path)),
|
||||||
|
{:response, :fin, 204, _} <- :gun.await(conn_pid, reference),
|
||||||
|
headers <- build_headers(uri),
|
||||||
|
ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
|
||||||
|
receive do
|
||||||
|
{:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
|
||||||
|
{:ok, ws_uri, conn_pid}
|
||||||
|
after
|
||||||
|
15_000 ->
|
||||||
|
Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
|
||||||
|
{:error, :timeout}
|
||||||
|
end
|
||||||
|
else
|
||||||
|
{:response, :nofin, 404, _} ->
|
||||||
|
{:error, :fedsockets_not_supported}
|
||||||
|
|
||||||
|
e ->
|
||||||
|
Logger.debug("Fedsocket error connecting to #{inspect(uri)}")
|
||||||
|
{:error, e}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp build_headers(uri) do
|
||||||
|
host_for_sig = uri |> URI.parse() |> host_signature()
|
||||||
|
|
||||||
|
shake = FedSocket.shake()
|
||||||
|
digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64())
|
||||||
|
date = Pleroma.Signature.signed_date()
|
||||||
|
shake_size = byte_size(shake)
|
||||||
|
|
||||||
|
signature_opts = %{
|
||||||
|
"(request-target)": shake,
|
||||||
|
"content-length": to_charlist("#{shake_size}"),
|
||||||
|
date: date,
|
||||||
|
digest: digest,
|
||||||
|
host: host_for_sig
|
||||||
|
}
|
||||||
|
|
||||||
|
signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts)
|
||||||
|
|
||||||
|
[
|
||||||
|
{'signature', to_charlist(signature)},
|
||||||
|
{'date', date},
|
||||||
|
{'digest', to_charlist(digest)},
|
||||||
|
{'content-length', to_charlist("#{shake_size}")},
|
||||||
|
{to_charlist("(request-target)"), to_charlist(shake)}
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
defp host_signature(%{host: host, scheme: scheme, port: port}) do
|
||||||
|
if port == URI.default_port(scheme) do
|
||||||
|
host
|
||||||
|
else
|
||||||
|
"#{host}:#{port}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
52
lib/pleroma/web/fed_sockets/socket_info.ex
Normal file
52
lib/pleroma/web/fed_sockets/socket_info.ex
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.SocketInfo do
|
||||||
|
defstruct origin: nil,
|
||||||
|
pid: nil,
|
||||||
|
conn_pid: nil,
|
||||||
|
state: :default,
|
||||||
|
connected_until: nil
|
||||||
|
|
||||||
|
alias Pleroma.Web.FedSockets.SocketInfo
|
||||||
|
@default_connection_duration 15 * 60 * 1000
|
||||||
|
|
||||||
|
def build(uri, conn_pid \\ nil) do
|
||||||
|
uri
|
||||||
|
|> build_origin()
|
||||||
|
|> build_pids(conn_pid)
|
||||||
|
|> touch()
|
||||||
|
end
|
||||||
|
|
||||||
|
def touch(%SocketInfo{} = socket_info),
|
||||||
|
do: %{socket_info | connected_until: new_ttl()}
|
||||||
|
|
||||||
|
def connect(%SocketInfo{} = socket_info),
|
||||||
|
do: %{socket_info | state: :connected}
|
||||||
|
|
||||||
|
def expired?(%{connected_until: connected_until}),
|
||||||
|
do: connected_until < :erlang.monotonic_time(:millisecond)
|
||||||
|
|
||||||
|
def origin(uri),
|
||||||
|
do: build_origin(uri).origin
|
||||||
|
|
||||||
|
defp build_pids(socket_info, conn_pid),
|
||||||
|
do: struct(socket_info, pid: self(), conn_pid: conn_pid)
|
||||||
|
|
||||||
|
defp build_origin(uri) when is_binary(uri),
|
||||||
|
do: uri |> URI.parse() |> build_origin
|
||||||
|
|
||||||
|
defp build_origin(%{host: host, port: nil, scheme: scheme}),
|
||||||
|
do: build_origin(%{host: host, port: URI.default_port(scheme)})
|
||||||
|
|
||||||
|
defp build_origin(%{host: host, port: port}),
|
||||||
|
do: %SocketInfo{origin: "#{host}:#{port}"}
|
||||||
|
|
||||||
|
defp new_ttl do
|
||||||
|
connection_duration =
|
||||||
|
Pleroma.Config.get([:fed_sockets, :connection_duration], @default_connection_duration)
|
||||||
|
|
||||||
|
:erlang.monotonic_time(:millisecond) + connection_duration
|
||||||
|
end
|
||||||
|
end
|
59
lib/pleroma/web/fed_sockets/supervisor.ex
Normal file
59
lib/pleroma/web/fed_sockets/supervisor.ex
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.Supervisor do
|
||||||
|
use Supervisor
|
||||||
|
import Cachex.Spec
|
||||||
|
|
||||||
|
def start_link(opts) do
|
||||||
|
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
|
||||||
|
end
|
||||||
|
|
||||||
|
def init(args) do
|
||||||
|
children = [
|
||||||
|
build_cache(:fed_socket_fetches, args),
|
||||||
|
build_cache(:fed_socket_rejections, args),
|
||||||
|
{Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]}
|
||||||
|
]
|
||||||
|
|
||||||
|
opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor]
|
||||||
|
Supervisor.init(children, opts)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp build_cache(name, args) do
|
||||||
|
opts = get_opts(name, args)
|
||||||
|
|
||||||
|
%{
|
||||||
|
id: String.to_atom("#{name}_cache"),
|
||||||
|
start: {Cachex, :start_link, [name, opts]},
|
||||||
|
type: :worker
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_opts(cache_name, args)
|
||||||
|
when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do
|
||||||
|
default = get_opts_or_config(args, cache_name, :default, 15_000)
|
||||||
|
interval = get_opts_or_config(args, cache_name, :interval, 3_000)
|
||||||
|
lazy = get_opts_or_config(args, cache_name, :lazy, false)
|
||||||
|
|
||||||
|
[expiration: expiration(default: default, interval: interval, lazy: lazy)]
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_opts(name, args) do
|
||||||
|
Keyword.get(args, name, [])
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_opts_or_config(args, name, key, default) do
|
||||||
|
args
|
||||||
|
|> Keyword.get(name, [])
|
||||||
|
|> Keyword.get(key)
|
||||||
|
|> case do
|
||||||
|
nil ->
|
||||||
|
Pleroma.Config.get([:fed_sockets, name, key], default)
|
||||||
|
|
||||||
|
value ->
|
||||||
|
value
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
2
mix.lock
2
mix.lock
|
@ -118,5 +118,5 @@
|
||||||
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
|
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
|
||||||
"unsafe": {:hex, :unsafe, "1.0.1", "a27e1874f72ee49312e0a9ec2e0b27924214a05e3ddac90e91727bc76f8613d8", [:mix], [], "hexpm", "6c7729a2d214806450d29766abc2afaa7a2cbecf415be64f36a6691afebb50e5"},
|
"unsafe": {:hex, :unsafe, "1.0.1", "a27e1874f72ee49312e0a9ec2e0b27924214a05e3ddac90e91727bc76f8613d8", [:mix], [], "hexpm", "6c7729a2d214806450d29766abc2afaa7a2cbecf415be64f36a6691afebb50e5"},
|
||||||
"web_push_encryption": {:hex, :web_push_encryption, "0.3.0", "598b5135e696fd1404dc8d0d7c0fa2c027244a4e5d5e5a98ba267f14fdeaabc8", [:mix], [{:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:jose, "~> 1.8", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "f10bdd1afe527ede694749fb77a2f22f146a51b054c7fa541c9fd920fba7c875"},
|
"web_push_encryption": {:hex, :web_push_encryption, "0.3.0", "598b5135e696fd1404dc8d0d7c0fa2c027244a4e5d5e5a98ba267f14fdeaabc8", [:mix], [{:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:jose, "~> 1.8", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "f10bdd1afe527ede694749fb77a2f22f146a51b054c7fa541c9fd920fba7c875"},
|
||||||
"websocket_client": {:git, "https://github.com/jeremyong/websocket_client.git", "9a6f65d05ebf2725d62fb19262b21f1805a59fbf", []},
|
"websocket_client": {:git, "https://github.com/jeremyong/websocket_client.git", "9a6f65d05ebf2725d62fb19262b21f1805a59fbf", []}
|
||||||
}
|
}
|
||||||
|
|
124
test/web/fed_sockets/fed_registry_test.exs
Normal file
124
test/web/fed_sockets/fed_registry_test.exs
Normal file
|
@ -0,0 +1,124 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.FedRegistryTest do
|
||||||
|
use ExUnit.Case
|
||||||
|
|
||||||
|
alias Pleroma.Web.FedSockets
|
||||||
|
alias Pleroma.Web.FedSockets.FedRegistry
|
||||||
|
alias Pleroma.Web.FedSockets.SocketInfo
|
||||||
|
|
||||||
|
@good_domain "http://good.domain"
|
||||||
|
@good_domain_origin "good.domain:80"
|
||||||
|
|
||||||
|
setup do
|
||||||
|
start_supervised({Pleroma.Web.FedSockets.Supervisor, []})
|
||||||
|
build_test_socket(@good_domain)
|
||||||
|
Process.sleep(10)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "add_fed_socket/1 without conflicting sockets" do
|
||||||
|
test "can be added" do
|
||||||
|
Process.sleep(10)
|
||||||
|
assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin)
|
||||||
|
assert origin == "good.domain:80"
|
||||||
|
end
|
||||||
|
|
||||||
|
test "multiple origins can be added" do
|
||||||
|
build_test_socket("http://anothergood.domain")
|
||||||
|
Process.sleep(10)
|
||||||
|
|
||||||
|
assert {:ok, %SocketInfo{origin: origin_1}} =
|
||||||
|
FedRegistry.get_fed_socket(@good_domain_origin)
|
||||||
|
|
||||||
|
assert {:ok, %SocketInfo{origin: origin_2}} =
|
||||||
|
FedRegistry.get_fed_socket("anothergood.domain:80")
|
||||||
|
|
||||||
|
assert origin_1 == "good.domain:80"
|
||||||
|
assert origin_2 == "anothergood.domain:80"
|
||||||
|
assert FedRegistry.list_all() |> Enum.count() == 2
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "add_fed_socket/1 when duplicate sockets conflict" do
|
||||||
|
setup do
|
||||||
|
build_test_socket(@good_domain)
|
||||||
|
build_test_socket(@good_domain)
|
||||||
|
Process.sleep(10)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
test "will be ignored" do
|
||||||
|
assert {:ok, %SocketInfo{origin: origin, pid: pid_one}} =
|
||||||
|
FedRegistry.get_fed_socket(@good_domain_origin)
|
||||||
|
|
||||||
|
assert origin == "good.domain:80"
|
||||||
|
|
||||||
|
assert FedRegistry.list_all() |> Enum.count() == 1
|
||||||
|
end
|
||||||
|
|
||||||
|
test "the newer process will be closed" do
|
||||||
|
pid_two = build_test_socket(@good_domain)
|
||||||
|
|
||||||
|
assert {:ok, %SocketInfo{origin: origin, pid: pid_one}} =
|
||||||
|
FedRegistry.get_fed_socket(@good_domain_origin)
|
||||||
|
|
||||||
|
assert origin == "good.domain:80"
|
||||||
|
Process.sleep(10)
|
||||||
|
|
||||||
|
refute Process.alive?(pid_two)
|
||||||
|
|
||||||
|
assert FedRegistry.list_all() |> Enum.count() == 1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "get_fed_socket/1" do
|
||||||
|
test "returns missing for unknown hosts" do
|
||||||
|
assert {:error, :missing} = FedRegistry.get_fed_socket("not_a_dmoain")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "returns rejected for hosts previously rejected" do
|
||||||
|
"rejected.domain:80"
|
||||||
|
|> FedSockets.uri_for_origin()
|
||||||
|
|> FedRegistry.set_host_rejected()
|
||||||
|
|
||||||
|
assert {:error, :rejected} = FedRegistry.get_fed_socket("rejected.domain:80")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "can retrieve a previously added SocketInfo" do
|
||||||
|
build_test_socket(@good_domain)
|
||||||
|
Process.sleep(10)
|
||||||
|
assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin)
|
||||||
|
assert origin == "good.domain:80"
|
||||||
|
end
|
||||||
|
|
||||||
|
test "removes references to SocketInfos when the process crashes" do
|
||||||
|
assert {:ok, %SocketInfo{origin: origin, pid: pid}} =
|
||||||
|
FedRegistry.get_fed_socket(@good_domain_origin)
|
||||||
|
|
||||||
|
assert origin == "good.domain:80"
|
||||||
|
|
||||||
|
Process.exit(pid, :testing)
|
||||||
|
Process.sleep(100)
|
||||||
|
assert {:error, :missing} = FedRegistry.get_fed_socket(@good_domain_origin)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def build_test_socket(uri) do
|
||||||
|
Kernel.spawn(fn -> fed_socket_almost(uri) end)
|
||||||
|
end
|
||||||
|
|
||||||
|
def fed_socket_almost(origin) do
|
||||||
|
FedRegistry.add_fed_socket(origin)
|
||||||
|
|
||||||
|
receive do
|
||||||
|
:close ->
|
||||||
|
:ok
|
||||||
|
after
|
||||||
|
5_000 -> :timeout
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
67
test/web/fed_sockets/fetch_registry_test.exs
Normal file
67
test/web/fed_sockets/fetch_registry_test.exs
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.FetchRegistryTest do
|
||||||
|
use ExUnit.Case
|
||||||
|
|
||||||
|
alias Pleroma.Web.FedSockets.FetchRegistry
|
||||||
|
alias Pleroma.Web.FedSockets.FetchRegistry.FetchRegistryData
|
||||||
|
|
||||||
|
@json_message "hello"
|
||||||
|
@json_reply "hello back"
|
||||||
|
|
||||||
|
setup do
|
||||||
|
start_supervised(
|
||||||
|
{Pleroma.Web.FedSockets.Supervisor,
|
||||||
|
[
|
||||||
|
ping_interval: 8,
|
||||||
|
connection_duration: 15,
|
||||||
|
rejection_duration: 5,
|
||||||
|
fed_socket_fetches: [default: 10, interval: 10]
|
||||||
|
]}
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
test "fetches can be stored" do
|
||||||
|
uuid = FetchRegistry.register_fetch(@json_message)
|
||||||
|
|
||||||
|
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "fetches can return" do
|
||||||
|
uuid = FetchRegistry.register_fetch(@json_message)
|
||||||
|
task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end)
|
||||||
|
|
||||||
|
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
|
||||||
|
Task.await(task)
|
||||||
|
|
||||||
|
assert {:ok, %FetchRegistryData{received_json: received_json}} =
|
||||||
|
FetchRegistry.check_fetch(uuid)
|
||||||
|
|
||||||
|
assert received_json == @json_reply
|
||||||
|
end
|
||||||
|
|
||||||
|
test "fetches are deleted once popped from stack" do
|
||||||
|
uuid = FetchRegistry.register_fetch(@json_message)
|
||||||
|
task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end)
|
||||||
|
Task.await(task)
|
||||||
|
|
||||||
|
assert {:ok, %FetchRegistryData{received_json: received_json}} =
|
||||||
|
FetchRegistry.check_fetch(uuid)
|
||||||
|
|
||||||
|
assert received_json == @json_reply
|
||||||
|
assert {:ok, @json_reply} = FetchRegistry.pop_fetch(uuid)
|
||||||
|
|
||||||
|
assert {:error, :missing} = FetchRegistry.check_fetch(uuid)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "fetches can time out" do
|
||||||
|
uuid = FetchRegistry.register_fetch(@json_message)
|
||||||
|
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
|
||||||
|
Process.sleep(500)
|
||||||
|
assert {:error, :missing} = FetchRegistry.check_fetch(uuid)
|
||||||
|
end
|
||||||
|
end
|
118
test/web/fed_sockets/socket_info_test.exs
Normal file
118
test/web/fed_sockets/socket_info_test.exs
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.FedSockets.SocketInfoTest do
|
||||||
|
use ExUnit.Case
|
||||||
|
|
||||||
|
alias Pleroma.Web.FedSockets
|
||||||
|
alias Pleroma.Web.FedSockets.SocketInfo
|
||||||
|
|
||||||
|
describe "uri_for_origin" do
|
||||||
|
test "provides the fed_socket URL given the origin information" do
|
||||||
|
endpoint = "example.com:4000"
|
||||||
|
assert FedSockets.uri_for_origin(endpoint) =~ "ws://"
|
||||||
|
assert FedSockets.uri_for_origin(endpoint) =~ endpoint
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "origin" do
|
||||||
|
test "will provide the origin field given a url" do
|
||||||
|
endpoint = "example.com:4000"
|
||||||
|
assert SocketInfo.origin("ws://#{endpoint}") == endpoint
|
||||||
|
assert SocketInfo.origin("http://#{endpoint}") == endpoint
|
||||||
|
assert SocketInfo.origin("https://#{endpoint}") == endpoint
|
||||||
|
end
|
||||||
|
|
||||||
|
test "will proide the origin field given a uri" do
|
||||||
|
endpoint = "example.com:4000"
|
||||||
|
uri = URI.parse("http://#{endpoint}")
|
||||||
|
|
||||||
|
assert SocketInfo.origin(uri) == endpoint
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "touch" do
|
||||||
|
test "will update the TTL" do
|
||||||
|
endpoint = "example.com:4000"
|
||||||
|
socket = SocketInfo.build("ws://#{endpoint}")
|
||||||
|
Process.sleep(2)
|
||||||
|
touched_socket = SocketInfo.touch(socket)
|
||||||
|
|
||||||
|
assert socket.connected_until < touched_socket.connected_until
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "expired?" do
|
||||||
|
setup do
|
||||||
|
start_supervised(
|
||||||
|
{Pleroma.Web.FedSockets.Supervisor,
|
||||||
|
[
|
||||||
|
ping_interval: 8,
|
||||||
|
connection_duration: 5,
|
||||||
|
rejection_duration: 5,
|
||||||
|
fed_socket_rejections: [lazy: true]
|
||||||
|
]}
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
test "tests if the TTL is exceeded" do
|
||||||
|
endpoint = "example.com:4000"
|
||||||
|
socket = SocketInfo.build("ws://#{endpoint}")
|
||||||
|
refute SocketInfo.expired?(socket)
|
||||||
|
Process.sleep(10)
|
||||||
|
|
||||||
|
assert SocketInfo.expired?(socket)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "creating outgoing connection records" do
|
||||||
|
test "can be passed a string" do
|
||||||
|
assert %{conn_pid: :pid, origin: _origin} = SocketInfo.build("example.com:4000", :pid)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "can be passed a URI" do
|
||||||
|
uri = URI.parse("http://example.com:4000")
|
||||||
|
assert %{conn_pid: :pid, origin: origin} = SocketInfo.build(uri, :pid)
|
||||||
|
assert origin =~ "example.com:4000"
|
||||||
|
end
|
||||||
|
|
||||||
|
test "will include the port number" do
|
||||||
|
assert %{conn_pid: :pid, origin: origin} = SocketInfo.build("http://example.com:4000", :pid)
|
||||||
|
|
||||||
|
assert origin =~ ":4000"
|
||||||
|
end
|
||||||
|
|
||||||
|
test "will provide the port if missing" do
|
||||||
|
assert %{conn_pid: :pid, origin: "example.com:80"} =
|
||||||
|
SocketInfo.build("http://example.com", :pid)
|
||||||
|
|
||||||
|
assert %{conn_pid: :pid, origin: "example.com:443"} =
|
||||||
|
SocketInfo.build("https://example.com", :pid)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "creating incoming connection records" do
|
||||||
|
test "can be passed a string" do
|
||||||
|
assert %{pid: _, origin: _origin} = SocketInfo.build("example.com:4000")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "can be passed a URI" do
|
||||||
|
uri = URI.parse("example.com:4000")
|
||||||
|
assert %{pid: _, origin: _origin} = SocketInfo.build(uri)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "will include the port number" do
|
||||||
|
assert %{pid: _, origin: origin} = SocketInfo.build("http://example.com:4000")
|
||||||
|
|
||||||
|
assert origin =~ ":4000"
|
||||||
|
end
|
||||||
|
|
||||||
|
test "will provide the port if missing" do
|
||||||
|
assert %{pid: _, origin: "example.com:80"} = SocketInfo.build("http://example.com")
|
||||||
|
assert %{pid: _, origin: "example.com:443"} = SocketInfo.build("https://example.com")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in a new issue