From c1127e321b151a98709072c1789a04c98bcf8c91 Mon Sep 17 00:00:00 2001 From: floatingghost Date: Sun, 13 Nov 2022 23:55:51 +0000 Subject: [PATCH] Add configurable timeline per oban job (#273) Heavily inspired by https://git.pleroma.social/pleroma/pleroma/-/merge_requests/3777 Co-authored-by: FloatingGhost Reviewed-on: https://akkoma.dev/AkkomaGang/akkoma/pulls/273 --- config/config.exs | 21 +++++++++++++++ config/description.exs | 26 +++++++++++++++++++ lib/pleroma/workers/backup_worker.ex | 6 +++++ lib/pleroma/workers/purge_expired_activity.ex | 5 ++++ lib/pleroma/workers/purge_expired_filter.ex | 5 ++++ lib/pleroma/workers/purge_expired_token.ex | 5 ++++ lib/pleroma/workers/worker_helper.ex | 6 +++++ .../workers/purge_expired_activity_test.exs | 5 ++++ .../scheduled_activity_worker_test.exs | 5 ++++ 9 files changed, 84 insertions(+) diff --git a/config/config.exs b/config/config.exs index fd470ed93..ba77d8b02 100644 --- a/config/config.exs +++ b/config/config.exs @@ -584,6 +584,27 @@ federator_incoming: 5, federator_outgoing: 5, search_indexing: 2 + ], + timeout: [ + activity_expiration: :timer.seconds(5), + token_expiration: :timer.seconds(5), + filter_expiration: :timer.seconds(5), + backup: :timer.seconds(900), + federator_incoming: :timer.seconds(10), + federator_outgoing: :timer.seconds(10), + ingestion_queue: :timer.seconds(5), + web_push: :timer.seconds(5), + mailer: :timer.seconds(5), + transmogrifier: :timer.seconds(5), + scheduled_activities: :timer.seconds(5), + poll_notifications: :timer.seconds(5), + background: :timer.seconds(5), + remote_fetcher: :timer.seconds(10), + attachments_cleanup: :timer.seconds(900), + new_users_digest: :timer.seconds(10), + mute_expire: :timer.seconds(5), + search_indexing: :timer.seconds(5), + nodeinfo_fetcher: :timer.seconds(10) ] config :pleroma, Pleroma.Formatter, diff --git a/config/description.exs b/config/description.exs index 4843c0aae..287abb747 100644 --- a/config/description.exs +++ b/config/description.exs @@ -1979,6 +1979,32 @@ federator_incoming: 5, federator_outgoing: 5 ] + }, + %{ + key: :timeout, + type: {:keyword, :integer}, + description: "Timeout for jobs, per `Oban` queue, in ms", + suggestions: [ + activity_expiration: :timer.seconds(5), + token_expiration: :timer.seconds(5), + filter_expiration: :timer.seconds(5), + backup: :timer.seconds(900), + federator_incoming: :timer.seconds(10), + federator_outgoing: :timer.seconds(10), + ingestion_queue: :timer.seconds(5), + web_push: :timer.seconds(5), + mailer: :timer.seconds(5), + transmogrifier: :timer.seconds(5), + scheduled_activities: :timer.seconds(5), + poll_notifications: :timer.seconds(5), + background: :timer.seconds(5), + remote_fetcher: :timer.seconds(10), + attachments_cleanup: :timer.seconds(900), + new_users_digest: :timer.seconds(10), + mute_expire: :timer.seconds(5), + search_indexing: :timer.seconds(5), + nodeinfo_fetcher: :timer.seconds(10) + ] } ] }, diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index 66c5c3591..4ab08706e 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -14,6 +14,11 @@ def process(backup, admin_user_id \\ nil) do |> Oban.insert() end + @impl Oban.Worker + def timeout(_job) do + Pleroma.Config.get([:workers, :timeout, :backup]) || :timer.minutes(1) + end + def schedule_deletion(backup) do days = Pleroma.Config.get([Backup, :purge_after_days]) time = 60 * 60 * 24 * days @@ -30,6 +35,7 @@ def delete(backup) do |> Oban.insert() end + @impl true def perform(%Job{ args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id} }) do diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex index 027171c1e..ece84df37 100644 --- a/lib/pleroma/workers/purge_expired_activity.ex +++ b/lib/pleroma/workers/purge_expired_activity.ex @@ -27,6 +27,11 @@ def enqueue(args) do end end + @impl Oban.Worker + def timeout(_job) do + Pleroma.Config.get([:workers, :timeout, :activity_expiration]) || :timer.minutes(1) + end + @impl true def perform(%Oban.Job{args: %{"activity_id" => id}}) do with %Activity{} = activity <- find_activity(id), diff --git a/lib/pleroma/workers/purge_expired_filter.ex b/lib/pleroma/workers/purge_expired_filter.ex index 4740d52e9..dd6a22a68 100644 --- a/lib/pleroma/workers/purge_expired_filter.ex +++ b/lib/pleroma/workers/purge_expired_filter.ex @@ -24,6 +24,11 @@ def enqueue(args) do |> Oban.insert() end + @impl Oban.Worker + def timeout(_job) do + Pleroma.Config.get([:workers, :timeout, :filter_expiration]) || :timer.minutes(1) + end + @impl true def perform(%Job{args: %{"filter_id" => id}}) do Pleroma.Filter diff --git a/lib/pleroma/workers/purge_expired_token.ex b/lib/pleroma/workers/purge_expired_token.ex index cfdf5c6dc..1773aeff9 100644 --- a/lib/pleroma/workers/purge_expired_token.ex +++ b/lib/pleroma/workers/purge_expired_token.ex @@ -19,6 +19,11 @@ def enqueue(args) do |> Oban.insert() end + @impl Oban.Worker + def timeout(_job) do + Pleroma.Config.get([:workers, :timeout, :token_expiration]) || :timer.minutes(1) + end + @impl true def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do module diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex index 4befbeb3b..97c0e4e5c 100644 --- a/lib/pleroma/workers/worker_helper.ex +++ b/lib/pleroma/workers/worker_helper.ex @@ -43,6 +43,12 @@ def enqueue(op, params, worker_args \\ []) do |> apply(:new, [params, worker_args]) |> Oban.insert() end + + @impl Oban.Worker + def timeout(_job) do + queue_atom = String.to_atom(unquote(queue)) + Config.get([:workers, :timeout, queue_atom]) || :timer.minutes(1) + end end end end diff --git a/test/pleroma/workers/purge_expired_activity_test.exs b/test/pleroma/workers/purge_expired_activity_test.exs index 98f30f61f..6285dca3e 100644 --- a/test/pleroma/workers/purge_expired_activity_test.exs +++ b/test/pleroma/workers/purge_expired_activity_test.exs @@ -56,4 +56,9 @@ test "error if actiivity was not found" do assert {:error, :activity_not_found} = perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"}) end + + test "has a timeout" do + clear_config([:workers, :timeout, :activity_expiration], 50) + assert Pleroma.Workers.PurgeExpiredActivity.timeout(%Oban.Job{}) == 50 + end end diff --git a/test/pleroma/workers/scheduled_activity_worker_test.exs b/test/pleroma/workers/scheduled_activity_worker_test.exs index 5558d5b5f..9f5f1b687 100644 --- a/test/pleroma/workers/scheduled_activity_worker_test.exs +++ b/test/pleroma/workers/scheduled_activity_worker_test.exs @@ -49,4 +49,9 @@ test "error message for non-existent scheduled activity" do ScheduledActivityWorker.perform(%Oban.Job{args: %{"activity_id" => 42}}) end) =~ "Couldn't find scheduled activity: 42" end + + test "has a timeout" do + clear_config([:workers, :timeout, :scheduled_activities], :timer.minutes(5)) + assert ScheduledActivityWorker.timeout(nil) == :timer.minutes(5) + end end