add a job queue
This commit is contained in:
parent
6383fa3a5d
commit
55affbca7f
|
@ -271,14 +271,16 @@
|
||||||
"web"
|
"web"
|
||||||
]
|
]
|
||||||
|
|
||||||
config :pleroma, Pleroma.Web.Federator, max_jobs: 50
|
|
||||||
|
|
||||||
config :pleroma, Pleroma.Web.Federator.RetryQueue,
|
config :pleroma, Pleroma.Web.Federator.RetryQueue,
|
||||||
enabled: false,
|
enabled: false,
|
||||||
max_jobs: 20,
|
max_jobs: 20,
|
||||||
initial_timeout: 30,
|
initial_timeout: 30,
|
||||||
max_retries: 5
|
max_retries: 5
|
||||||
|
|
||||||
|
config :pleroma, Pleroma.Jobs,
|
||||||
|
federator_incoming: [max_jobs: 50],
|
||||||
|
federator_outgoing: [max_jobs: 50]
|
||||||
|
|
||||||
# Import environment specific config. This must remain at the bottom
|
# Import environment specific config. This must remain at the bottom
|
||||||
# of this file so it overrides the configuration defined above.
|
# of this file so it overrides the configuration defined above.
|
||||||
import_config "#{Mix.env()}.exs"
|
import_config "#{Mix.env()}.exs"
|
||||||
|
|
|
@ -43,6 +43,8 @@
|
||||||
"BLH1qVhJItRGCfxgTtONfsOKDc9VRAraXw-3NsmjMngWSh7NxOizN6bkuRA7iLTMPS82PjwJAr3UoK9EC1IFrz4",
|
"BLH1qVhJItRGCfxgTtONfsOKDc9VRAraXw-3NsmjMngWSh7NxOizN6bkuRA7iLTMPS82PjwJAr3UoK9EC1IFrz4",
|
||||||
private_key: "_-XZ0iebPrRfZ_o0-IatTdszYa8VCH1yLN-JauK7HHA"
|
private_key: "_-XZ0iebPrRfZ_o0-IatTdszYa8VCH1yLN-JauK7HHA"
|
||||||
|
|
||||||
|
config :pleroma, Pleroma.Jobs, testing: [max_jobs: 2]
|
||||||
|
|
||||||
try do
|
try do
|
||||||
import_config "test.secret.exs"
|
import_config "test.secret.exs"
|
||||||
rescue
|
rescue
|
||||||
|
|
|
@ -36,14 +36,15 @@ This filter replaces the filename (not the path) of an upload. For complete obfu
|
||||||
|
|
||||||
An example for Sendgrid adapter:
|
An example for Sendgrid adapter:
|
||||||
|
|
||||||
```
|
```exs
|
||||||
config :pleroma, Pleroma.Mailer,
|
config :pleroma, Pleroma.Mailer,
|
||||||
adapter: Swoosh.Adapters.Sendgrid,
|
adapter: Swoosh.Adapters.Sendgrid,
|
||||||
api_key: "YOUR_API_KEY"
|
api_key: "YOUR_API_KEY"
|
||||||
```
|
```
|
||||||
|
|
||||||
An example for SMTP adapter:
|
An example for SMTP adapter:
|
||||||
```
|
|
||||||
|
```exs
|
||||||
config :pleroma, Pleroma.Mailer,
|
config :pleroma, Pleroma.Mailer,
|
||||||
adapter: Swoosh.Adapters.SMTP,
|
adapter: Swoosh.Adapters.SMTP,
|
||||||
relay: "smtp.gmail.com",
|
relay: "smtp.gmail.com",
|
||||||
|
@ -163,7 +164,7 @@ their ActivityPub ID.
|
||||||
|
|
||||||
An example:
|
An example:
|
||||||
|
|
||||||
```
|
```exs
|
||||||
config :pleroma, :mrf_user_allowlist,
|
config :pleroma, :mrf_user_allowlist,
|
||||||
"example.org": ["https://example.org/users/admin"]
|
"example.org": ["https://example.org/users/admin"]
|
||||||
```
|
```
|
||||||
|
@ -192,18 +193,34 @@ the source code is here: https://github.com/koto-bank/kocaptcha. The default end
|
||||||
|
|
||||||
Allows to set a token that can be used to authenticate with the admin api without using an actual user by giving it as the 'admin_token' parameter. Example:
|
Allows to set a token that can be used to authenticate with the admin api without using an actual user by giving it as the 'admin_token' parameter. Example:
|
||||||
|
|
||||||
```
|
```exs
|
||||||
config :pleroma, :admin_token, "somerandomtoken"
|
config :pleroma, :admin_token, "somerandomtoken"
|
||||||
```
|
```
|
||||||
|
|
||||||
You can then do
|
You can then do
|
||||||
```
|
|
||||||
|
```sh
|
||||||
curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken"
|
curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken"
|
||||||
```
|
```
|
||||||
|
|
||||||
## Pleroma.Web.Federator
|
## Pleroma.Jobs
|
||||||
|
|
||||||
|
A list of job queues and their settings.
|
||||||
|
|
||||||
|
Job queue settings:
|
||||||
|
|
||||||
|
* `max_jobs`: The maximum amount of parallel jobs running at the same time.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
```exs
|
||||||
|
config :pleroma, Pleroma.Jobs,
|
||||||
|
federator_incoming: [max_jobs: 50],
|
||||||
|
federator_outgoing: [max_jobs: 50]
|
||||||
|
```
|
||||||
|
|
||||||
|
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
|
||||||
|
|
||||||
* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
|
|
||||||
|
|
||||||
## Pleroma.Web.Federator.RetryQueue
|
## Pleroma.Web.Federator.RetryQueue
|
||||||
|
|
||||||
|
|
|
@ -101,9 +101,10 @@ def start(_type, _args) do
|
||||||
),
|
),
|
||||||
worker(Pleroma.FlakeId, []),
|
worker(Pleroma.FlakeId, []),
|
||||||
worker(Pleroma.Web.Federator.RetryQueue, []),
|
worker(Pleroma.Web.Federator.RetryQueue, []),
|
||||||
worker(Pleroma.Web.Federator, []),
|
|
||||||
worker(Pleroma.Stats, []),
|
worker(Pleroma.Stats, []),
|
||||||
worker(Pleroma.Web.Push, [])
|
worker(Pleroma.Web.Push, []),
|
||||||
|
worker(Pleroma.Jobs, []),
|
||||||
|
worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary)
|
||||||
] ++
|
] ++
|
||||||
streamer_child() ++
|
streamer_child() ++
|
||||||
chat_child() ++
|
chat_child() ++
|
||||||
|
|
153
lib/pleroma/jobs.ex
Normal file
153
lib/pleroma/jobs.ex
Normal file
|
@ -0,0 +1,153 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Jobs do
|
||||||
|
@moduledoc """
|
||||||
|
A basic job queue
|
||||||
|
"""
|
||||||
|
use GenServer
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
def init(args) do
|
||||||
|
{:ok, args}
|
||||||
|
end
|
||||||
|
|
||||||
|
def start_link do
|
||||||
|
queues =
|
||||||
|
Pleroma.Config.get(Pleroma.Jobs)
|
||||||
|
|> Enum.map(fn {name, _} -> create_queue(name) end)
|
||||||
|
|> Enum.into(%{})
|
||||||
|
|
||||||
|
state = %{
|
||||||
|
queues: queues,
|
||||||
|
refs: %{}
|
||||||
|
}
|
||||||
|
|
||||||
|
GenServer.start_link(__MODULE__, state, name: __MODULE__)
|
||||||
|
end
|
||||||
|
|
||||||
|
def create_queue(name) do
|
||||||
|
{name, {:sets.new(), []}}
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Enqueues a job.
|
||||||
|
|
||||||
|
Returns `:ok`.
|
||||||
|
|
||||||
|
## Arguments
|
||||||
|
|
||||||
|
- `queue_name` - a queue name(must be specified in the config).
|
||||||
|
- `mod` - a worker module, must have `perform` function.
|
||||||
|
- `args` - a list of arguments for the `perform` function of the worker module.
|
||||||
|
- `priority` - a job priority (`0` by default).
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
Enqueue `Module.perform/0` with `priority=1`:
|
||||||
|
|
||||||
|
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [])
|
||||||
|
:ok
|
||||||
|
|
||||||
|
Enqueue `Module.perform(:job_name)` with `priority=5`:
|
||||||
|
|
||||||
|
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5)
|
||||||
|
:ok
|
||||||
|
|
||||||
|
Enqueue `Module.perform(:another_job, data)` with `priority=1`:
|
||||||
|
|
||||||
|
iex> data = "foobar"
|
||||||
|
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data])
|
||||||
|
:ok
|
||||||
|
|
||||||
|
Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`:
|
||||||
|
|
||||||
|
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42])
|
||||||
|
:ok
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def enqueue(queue_name, mod, args, priority \\ 1)
|
||||||
|
|
||||||
|
if Mix.env() == :test do
|
||||||
|
def enqueue(_queue_name, mod, args, _priority) do
|
||||||
|
apply(mod, :perform, args)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
@spec enqueue(atom(), atom(), [any()], integer()) :: :ok
|
||||||
|
def enqueue(queue_name, mod, args, priority \\ 1) do
|
||||||
|
GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
|
||||||
|
{running_jobs, queue} = state[:queues][queue_name]
|
||||||
|
|
||||||
|
queue = enqueue_sorted(queue, {mod, args}, priority)
|
||||||
|
|
||||||
|
state =
|
||||||
|
state
|
||||||
|
|> update_queue(queue_name, {running_jobs, queue})
|
||||||
|
|> maybe_start_job(queue_name, running_jobs, queue)
|
||||||
|
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_cast(m, state) do
|
||||||
|
IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
|
||||||
|
queue_name = state.refs[ref]
|
||||||
|
|
||||||
|
{running_jobs, queue} = state[:queues][queue_name]
|
||||||
|
|
||||||
|
running_jobs = :sets.del_element(ref, running_jobs)
|
||||||
|
|
||||||
|
state = state |> remove_ref(ref) |> maybe_start_job(queue_name, running_jobs, queue)
|
||||||
|
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def maybe_start_job(state, queue_name, running_jobs, queue) do
|
||||||
|
if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) &&
|
||||||
|
queue != [] do
|
||||||
|
{{mod, args}, queue} = queue_pop(queue)
|
||||||
|
{:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
|
||||||
|
mref = Process.monitor(pid)
|
||||||
|
|
||||||
|
state
|
||||||
|
|> add_ref(queue_name, mref)
|
||||||
|
|> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue})
|
||||||
|
else
|
||||||
|
update_queue(state, queue_name, {running_jobs, queue})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def enqueue_sorted(queue, element, priority) do
|
||||||
|
[%{item: element, priority: priority} | queue]
|
||||||
|
|> Enum.sort_by(fn %{priority: priority} -> priority end)
|
||||||
|
end
|
||||||
|
|
||||||
|
def queue_pop([%{item: element} | queue]) do
|
||||||
|
{element, queue}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp add_ref(state, queue_name, ref) do
|
||||||
|
refs = Map.put(state[:refs], ref, queue_name)
|
||||||
|
Map.put(state, :refs, refs)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp remove_ref(state, ref) do
|
||||||
|
refs = Map.delete(state[:refs], ref)
|
||||||
|
Map.put(state, :refs, refs)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp update_queue(state, queue_name, data) do
|
||||||
|
queues = Map.put(state[:queues], queue_name, data)
|
||||||
|
Map.put(state, :queues, queues)
|
||||||
|
end
|
||||||
|
end
|
|
@ -711,20 +711,18 @@ def publish(actor, activity) do
|
||||||
|
|
||||||
public = is_public?(activity)
|
public = is_public?(activity)
|
||||||
|
|
||||||
remote_inboxes =
|
|
||||||
(Pleroma.Web.Salmon.remote_users(activity) ++ followers)
|
|
||||||
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|
|
||||||
|> Enum.map(fn %{info: %{source_data: data}} ->
|
|
||||||
(is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
|
|
||||||
end)
|
|
||||||
|> Enum.uniq()
|
|
||||||
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|
|
||||||
|
|
||||||
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
||||||
json = Jason.encode!(data)
|
json = Jason.encode!(data)
|
||||||
|
|
||||||
Enum.each(remote_inboxes, fn inbox ->
|
(Pleroma.Web.Salmon.remote_users(activity) ++ followers)
|
||||||
Federator.enqueue(:publish_single_ap, %{
|
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|
||||||
|
|> Enum.map(fn %{info: %{source_data: data}} ->
|
||||||
|
(is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
|
||||||
|
end)
|
||||||
|
|> Enum.uniq()
|
||||||
|
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|
||||||
|
|> Enum.each(fn inbox ->
|
||||||
|
Federator.publish_single_ap(%{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
json: json,
|
json: json,
|
||||||
actor: actor,
|
actor: actor,
|
||||||
|
|
|
@ -150,13 +150,13 @@ def inbox(%{assigns: %{valid_signature: true}} = conn, %{"nickname" => nickname}
|
||||||
with %User{} = user <- User.get_cached_by_nickname(nickname),
|
with %User{} = user <- User.get_cached_by_nickname(nickname),
|
||||||
true <- Utils.recipient_in_message(user.ap_id, params),
|
true <- Utils.recipient_in_message(user.ap_id, params),
|
||||||
params <- Utils.maybe_splice_recipient(user.ap_id, params) do
|
params <- Utils.maybe_splice_recipient(user.ap_id, params) do
|
||||||
Federator.enqueue(:incoming_ap_doc, params)
|
Federator.incoming_ap_doc(params)
|
||||||
json(conn, "ok")
|
json(conn, "ok")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def inbox(%{assigns: %{valid_signature: true}} = conn, params) do
|
def inbox(%{assigns: %{valid_signature: true}} = conn, params) do
|
||||||
Federator.enqueue(:incoming_ap_doc, params)
|
Federator.incoming_ap_doc(params)
|
||||||
json(conn, "ok")
|
json(conn, "ok")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -156,7 +156,7 @@ def maybe_federate(%Activity{local: true} = activity) do
|
||||||
_ -> 5
|
_ -> 5
|
||||||
end
|
end
|
||||||
|
|
||||||
Pleroma.Web.Federator.enqueue(:publish, activity, priority)
|
Pleroma.Web.Federator.publish(activity, priority)
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,9 @@
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
defmodule Pleroma.Web.Federator do
|
defmodule Pleroma.Web.Federator do
|
||||||
use GenServer
|
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.Jobs
|
||||||
alias Pleroma.Web.{WebFinger, Websub}
|
alias Pleroma.Web.{WebFinger, Websub}
|
||||||
alias Pleroma.Web.Federator.RetryQueue
|
alias Pleroma.Web.Federator.RetryQueue
|
||||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
|
@ -18,39 +18,60 @@ defmodule Pleroma.Web.Federator do
|
||||||
@websub Application.get_env(:pleroma, :websub)
|
@websub Application.get_env(:pleroma, :websub)
|
||||||
@ostatus Application.get_env(:pleroma, :ostatus)
|
@ostatus Application.get_env(:pleroma, :ostatus)
|
||||||
|
|
||||||
def init(args) do
|
def init() do
|
||||||
{:ok, args}
|
# 1 minute
|
||||||
|
Process.sleep(1000 * 60 * 1)
|
||||||
|
refresh_subscriptions()
|
||||||
end
|
end
|
||||||
|
|
||||||
def start_link do
|
# Client API
|
||||||
spawn(fn ->
|
|
||||||
# 1 minute
|
|
||||||
Process.sleep(1000 * 60 * 1)
|
|
||||||
enqueue(:refresh_subscriptions, nil)
|
|
||||||
end)
|
|
||||||
|
|
||||||
GenServer.start_link(
|
def incoming_doc(doc) do
|
||||||
__MODULE__,
|
Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
|
||||||
%{
|
|
||||||
in: {:sets.new(), []},
|
|
||||||
out: {:sets.new(), []}
|
|
||||||
},
|
|
||||||
name: __MODULE__
|
|
||||||
)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:refresh_subscriptions, _) do
|
def incoming_ap_doc(params) do
|
||||||
|
Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
|
||||||
|
end
|
||||||
|
|
||||||
|
def publish(activity, priority \\ 1) do
|
||||||
|
Jobs.enqueue(:federator_out, __MODULE__, [:publish, activity], priority)
|
||||||
|
end
|
||||||
|
|
||||||
|
def publish_single_ap(params) do
|
||||||
|
Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_ap, params])
|
||||||
|
end
|
||||||
|
|
||||||
|
def publish_single_websub(websub) do
|
||||||
|
Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_websub, websub])
|
||||||
|
end
|
||||||
|
|
||||||
|
def verify_websub(websub) do
|
||||||
|
Jobs.enqueue(:federator_out, __MODULE__, [:verify_websub, websub])
|
||||||
|
end
|
||||||
|
|
||||||
|
def request_subscription(sub) do
|
||||||
|
Jobs.enqueue(:federator_out, __MODULE__, [:request_subscription, sub])
|
||||||
|
end
|
||||||
|
|
||||||
|
def refresh_subscriptions() do
|
||||||
|
Jobs.enqueue(:federator_out, __MODULE__, [:refresh_subscriptions])
|
||||||
|
end
|
||||||
|
|
||||||
|
# Job Worker Callbacks
|
||||||
|
|
||||||
|
def perform(:refresh_subscriptions) do
|
||||||
Logger.debug("Federator running refresh subscriptions")
|
Logger.debug("Federator running refresh subscriptions")
|
||||||
Websub.refresh_subscriptions()
|
Websub.refresh_subscriptions()
|
||||||
|
|
||||||
spawn(fn ->
|
spawn(fn ->
|
||||||
# 6 hours
|
# 6 hours
|
||||||
Process.sleep(1000 * 60 * 60 * 6)
|
Process.sleep(1000 * 60 * 60 * 6)
|
||||||
enqueue(:refresh_subscriptions, nil)
|
refresh_subscriptions()
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:request_subscription, websub) do
|
def perform(:request_subscription, websub) do
|
||||||
Logger.debug("Refreshing #{websub.topic}")
|
Logger.debug("Refreshing #{websub.topic}")
|
||||||
|
|
||||||
with {:ok, websub} <- Websub.request_subscription(websub) do
|
with {:ok, websub} <- Websub.request_subscription(websub) do
|
||||||
|
@ -60,7 +81,7 @@ def handle(:request_subscription, websub) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:publish, activity) do
|
def perform(:publish, activity) do
|
||||||
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
|
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
|
||||||
|
|
||||||
with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
|
with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
|
||||||
|
@ -86,7 +107,7 @@ def handle(:publish, activity) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:verify_websub, websub) do
|
def perform(:verify_websub, websub) do
|
||||||
Logger.debug(fn ->
|
Logger.debug(fn ->
|
||||||
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
|
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
|
||||||
end)
|
end)
|
||||||
|
@ -94,12 +115,12 @@ def handle(:verify_websub, websub) do
|
||||||
@websub.verify(websub)
|
@websub.verify(websub)
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:incoming_doc, doc) do
|
def perform(:incoming_doc, doc) do
|
||||||
Logger.info("Got document, trying to parse")
|
Logger.info("Got document, trying to parse")
|
||||||
@ostatus.handle_incoming(doc)
|
@ostatus.handle_incoming(doc)
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:incoming_ap_doc, params) do
|
def perform(:incoming_ap_doc, params) do
|
||||||
Logger.info("Handling incoming AP activity")
|
Logger.info("Handling incoming AP activity")
|
||||||
|
|
||||||
params = Utils.normalize_params(params)
|
params = Utils.normalize_params(params)
|
||||||
|
@ -124,7 +145,7 @@ def handle(:incoming_ap_doc, params) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:publish_single_ap, params) do
|
def perform(:publish_single_ap, params) do
|
||||||
case ActivityPub.publish_one(params) do
|
case ActivityPub.publish_one(params) do
|
||||||
{:ok, _} ->
|
{:ok, _} ->
|
||||||
:ok
|
:ok
|
||||||
|
@ -134,7 +155,7 @@ def handle(:publish_single_ap, params) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(
|
def perform(
|
||||||
:publish_single_websub,
|
:publish_single_websub,
|
||||||
%{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
|
%{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
|
||||||
) do
|
) do
|
||||||
|
@ -147,75 +168,11 @@ def handle(
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(type, _) do
|
def perform(type, _) do
|
||||||
Logger.debug(fn -> "Unknown task: #{type}" end)
|
Logger.debug(fn -> "Unknown task: #{type}" end)
|
||||||
{:error, "Don't know what to do with this"}
|
{:error, "Don't know what to do with this"}
|
||||||
end
|
end
|
||||||
|
|
||||||
if Mix.env() == :test do
|
|
||||||
def enqueue(type, payload, _priority \\ 1) do
|
|
||||||
if Pleroma.Config.get([:instance, :federating]) do
|
|
||||||
handle(type, payload)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
else
|
|
||||||
def enqueue(type, payload, priority \\ 1) do
|
|
||||||
if Pleroma.Config.get([:instance, :federating]) do
|
|
||||||
GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def maybe_start_job(running_jobs, queue) do
|
|
||||||
if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do
|
|
||||||
{{type, payload}, queue} = queue_pop(queue)
|
|
||||||
{:ok, pid} = Task.start(fn -> handle(type, payload) end)
|
|
||||||
mref = Process.monitor(pid)
|
|
||||||
{:sets.add_element(mref, running_jobs), queue}
|
|
||||||
else
|
|
||||||
{running_jobs, queue}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast({:enqueue, type, payload, _priority}, state)
|
|
||||||
when type in [:incoming_doc, :incoming_ap_doc] do
|
|
||||||
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
|
|
||||||
i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
|
|
||||||
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
|
|
||||||
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast({:enqueue, type, payload, _priority}, state) do
|
|
||||||
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
|
|
||||||
o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
|
|
||||||
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
|
|
||||||
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(m, state) do
|
|
||||||
IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
|
|
||||||
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
|
|
||||||
i_running_jobs = :sets.del_element(ref, i_running_jobs)
|
|
||||||
o_running_jobs = :sets.del_element(ref, o_running_jobs)
|
|
||||||
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
|
|
||||||
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
|
|
||||||
|
|
||||||
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def enqueue_sorted(queue, element, priority) do
|
|
||||||
[%{item: element, priority: priority} | queue]
|
|
||||||
|> Enum.sort_by(fn %{priority: priority} -> priority end)
|
|
||||||
end
|
|
||||||
|
|
||||||
def queue_pop([%{item: element} | queue]) do
|
|
||||||
{element, queue}
|
|
||||||
end
|
|
||||||
|
|
||||||
def ap_enabled_actor(id) do
|
def ap_enabled_actor(id) do
|
||||||
user = User.get_by_ap_id(id)
|
user = User.get_by_ap_id(id)
|
||||||
|
|
||||||
|
|
|
@ -82,7 +82,7 @@ def salmon_incoming(conn, _) do
|
||||||
{:ok, body, _conn} = read_body(conn)
|
{:ok, body, _conn} = read_body(conn)
|
||||||
{:ok, doc} = decode_or_retry(body)
|
{:ok, doc} = decode_or_retry(body)
|
||||||
|
|
||||||
Federator.enqueue(:incoming_doc, doc)
|
Federator.incoming_doc(doc)
|
||||||
|
|
||||||
conn
|
conn
|
||||||
|> send_resp(200, "")
|
|> send_resp(200, "")
|
||||||
|
|
|
@ -7,7 +7,7 @@ defmodule Pleroma.Web.Websub do
|
||||||
alias Pleroma.Repo
|
alias Pleroma.Repo
|
||||||
alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
|
alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
|
||||||
alias Pleroma.Web.OStatus.FeedRepresenter
|
alias Pleroma.Web.OStatus.FeedRepresenter
|
||||||
alias Pleroma.Web.{XML, Endpoint, OStatus}
|
alias Pleroma.Web.{XML, Endpoint, OStatus, Federator}
|
||||||
alias Pleroma.Web.Router.Helpers
|
alias Pleroma.Web.Router.Helpers
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ def publish(topic, user, %{data: %{"type" => type}} = activity)
|
||||||
secret: sub.secret
|
secret: sub.secret
|
||||||
}
|
}
|
||||||
|
|
||||||
Pleroma.Web.Federator.enqueue(:publish_single_websub, data)
|
Federator.publish_single_websub(data)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ def incoming_subscription_request(user, %{"hub.mode" => "subscribe"} = params) d
|
||||||
|
|
||||||
websub = Repo.update!(change)
|
websub = Repo.update!(change)
|
||||||
|
|
||||||
Pleroma.Web.Federator.enqueue(:verify_websub, websub)
|
Federator.verify_websub(websub)
|
||||||
|
|
||||||
{:ok, websub}
|
{:ok, websub}
|
||||||
else
|
else
|
||||||
|
@ -259,7 +259,7 @@ def refresh_subscriptions(delta \\ 60 * 60 * 24) do
|
||||||
subs = Repo.all(query)
|
subs = Repo.all(query)
|
||||||
|
|
||||||
Enum.each(subs, fn sub ->
|
Enum.each(subs, fn sub ->
|
||||||
Pleroma.Web.Federator.enqueue(:request_subscription, sub)
|
Federator.request_subscription(sub)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ def websub_incoming(conn, %{"id" => id}) do
|
||||||
%WebsubClientSubscription{} = websub <- Repo.get(WebsubClientSubscription, id),
|
%WebsubClientSubscription{} = websub <- Repo.get(WebsubClientSubscription, id),
|
||||||
{:ok, body, _conn} = read_body(conn),
|
{:ok, body, _conn} = read_body(conn),
|
||||||
^signature <- Websub.sign(websub.secret, body) do
|
^signature <- Websub.sign(websub.secret, body) do
|
||||||
Federator.enqueue(:incoming_doc, body)
|
Federator.incoming_doc(body)
|
||||||
|
|
||||||
conn
|
conn
|
||||||
|> send_resp(200, "OK")
|
|> send_resp(200, "OK")
|
||||||
|
|
83
test/jobs_test.exs
Normal file
83
test/jobs_test.exs
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.JobsTest do
|
||||||
|
use ExUnit.Case, async: true
|
||||||
|
|
||||||
|
alias Pleroma.Jobs
|
||||||
|
alias Jobs.WorkerMock
|
||||||
|
|
||||||
|
setup do
|
||||||
|
state = %{
|
||||||
|
queues: Enum.into([Jobs.create_queue(:testing)], %{}),
|
||||||
|
refs: %{}
|
||||||
|
}
|
||||||
|
|
||||||
|
[state: state]
|
||||||
|
end
|
||||||
|
|
||||||
|
test "creates queue" do
|
||||||
|
queue = Jobs.create_queue(:foobar)
|
||||||
|
|
||||||
|
assert {:foobar, set} = queue
|
||||||
|
assert :set == elem(set, 0) |> elem(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "enqueues an element according to priority" do
|
||||||
|
queue = [%{item: 1, priority: 2}]
|
||||||
|
|
||||||
|
new_queue = Jobs.enqueue_sorted(queue, 2, 1)
|
||||||
|
assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
|
||||||
|
|
||||||
|
new_queue = Jobs.enqueue_sorted(queue, 2, 3)
|
||||||
|
assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}]
|
||||||
|
end
|
||||||
|
|
||||||
|
test "pop first item" do
|
||||||
|
queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
|
||||||
|
|
||||||
|
assert {2, [%{item: 1, priority: 2}]} = Jobs.queue_pop(queue)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "enqueue a job", %{state: state} do
|
||||||
|
assert {:noreply, new_state} =
|
||||||
|
Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
|
||||||
|
|
||||||
|
assert %{queues: %{testing: {running_jobs, []}}, refs: _} = new_state
|
||||||
|
assert :sets.size(running_jobs) == 1
|
||||||
|
assert [ref] = :sets.to_list(running_jobs)
|
||||||
|
assert %{refs: %{^ref => :testing}} = new_state
|
||||||
|
end
|
||||||
|
|
||||||
|
test "max jobs setting", %{state: state} do
|
||||||
|
max_jobs = Pleroma.Config.get([Jobs, :testing, :max_jobs])
|
||||||
|
|
||||||
|
{:noreply, state} =
|
||||||
|
Enum.reduce(1..(max_jobs + 1), {:noreply, state}, fn _, {:noreply, state} ->
|
||||||
|
Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert %{
|
||||||
|
queues: %{
|
||||||
|
testing:
|
||||||
|
{running_jobs, [%{item: {WorkerMock, [:test_job, :foo, :bar]}, priority: 3}]}
|
||||||
|
}
|
||||||
|
} = state
|
||||||
|
|
||||||
|
assert :sets.size(running_jobs) == max_jobs
|
||||||
|
end
|
||||||
|
|
||||||
|
test "remove job after it finished", %{state: state} do
|
||||||
|
{:noreply, new_state} =
|
||||||
|
Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
|
||||||
|
|
||||||
|
%{queues: %{testing: {running_jobs, []}}} = new_state
|
||||||
|
[ref] = :sets.to_list(running_jobs)
|
||||||
|
|
||||||
|
assert {:noreply, %{queues: %{testing: {running_jobs, []}}, refs: %{}}} =
|
||||||
|
Jobs.handle_info({:DOWN, ref, :process, nil, nil}, new_state)
|
||||||
|
|
||||||
|
assert :sets.size(running_jobs) == 0
|
||||||
|
end
|
||||||
|
end
|
19
test/support/jobs_worker_mock.ex
Normal file
19
test/support/jobs_worker_mock.ex
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Jobs.WorkerMock do
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
def perform(:test_job, arg, arg2) do
|
||||||
|
Logger.debug({:perform, :test_job, arg, arg2})
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(:test_job, payload) do
|
||||||
|
Logger.debug({:perform, :test_job, payload})
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_job(payload) do
|
||||||
|
Pleroma.Jobs.enqueue(:testing, __MODULE__, [:test_job, payload])
|
||||||
|
end
|
||||||
|
end
|
|
@ -14,22 +14,6 @@ defmodule Pleroma.Web.FederatorTest do
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
test "enqueues an element according to priority" do
|
|
||||||
queue = [%{item: 1, priority: 2}]
|
|
||||||
|
|
||||||
new_queue = Federator.enqueue_sorted(queue, 2, 1)
|
|
||||||
assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
|
|
||||||
|
|
||||||
new_queue = Federator.enqueue_sorted(queue, 2, 3)
|
|
||||||
assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}]
|
|
||||||
end
|
|
||||||
|
|
||||||
test "pop first item" do
|
|
||||||
queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
|
|
||||||
|
|
||||||
assert {2, [%{item: 1, priority: 2}]} = Federator.queue_pop(queue)
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "Publish an activity" do
|
describe "Publish an activity" do
|
||||||
setup do
|
setup do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
|
@ -49,7 +33,7 @@ test "with relays active, it publishes to the relay", %{
|
||||||
relay_mock: relay_mock
|
relay_mock: relay_mock
|
||||||
} do
|
} do
|
||||||
with_mocks([relay_mock]) do
|
with_mocks([relay_mock]) do
|
||||||
Federator.handle(:publish, activity)
|
Federator.publish(activity)
|
||||||
end
|
end
|
||||||
|
|
||||||
assert_received :relay_publish
|
assert_received :relay_publish
|
||||||
|
@ -62,7 +46,7 @@ test "with relays deactivated, it does not publish to the relay", %{
|
||||||
Pleroma.Config.put([:instance, :allow_relay], false)
|
Pleroma.Config.put([:instance, :allow_relay], false)
|
||||||
|
|
||||||
with_mocks([relay_mock]) do
|
with_mocks([relay_mock]) do
|
||||||
Federator.handle(:publish, activity)
|
Federator.publish(activity)
|
||||||
end
|
end
|
||||||
|
|
||||||
refute_received :relay_publish
|
refute_received :relay_publish
|
||||||
|
@ -87,7 +71,7 @@ test "successfully processes incoming AP docs with correct origin" do
|
||||||
"to" => ["https://www.w3.org/ns/activitystreams#Public"]
|
"to" => ["https://www.w3.org/ns/activitystreams#Public"]
|
||||||
}
|
}
|
||||||
|
|
||||||
{:ok, _activity} = Federator.handle(:incoming_ap_doc, params)
|
{:ok, _activity} = Federator.incoming_ap_doc(params)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "rejects incoming AP docs with incorrect origin" do
|
test "rejects incoming AP docs with incorrect origin" do
|
||||||
|
@ -105,7 +89,7 @@ test "rejects incoming AP docs with incorrect origin" do
|
||||||
"to" => ["https://www.w3.org/ns/activitystreams#Public"]
|
"to" => ["https://www.w3.org/ns/activitystreams#Public"]
|
||||||
}
|
}
|
||||||
|
|
||||||
:error = Federator.handle(:incoming_ap_doc, params)
|
:error = Federator.incoming_ap_doc(params)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue