[#1149] Upgraded oban from 0.6.0 to 0.7.1.

This commit is contained in:
Ivan Tashkinov 2019-08-23 09:23:10 +03:00
parent 7101ba1a21
commit c29686309e
13 changed files with 51 additions and 49 deletions

View file

@ -469,7 +469,6 @@
config :pleroma, :workers,
retries: [
compile_time_default: 1,
federator_incoming: 5,
federator_outgoing: 5
]

View file

@ -41,10 +41,7 @@ def start(_type, _args) do
hackney_pool_children() ++
[
Pleroma.Stats,
%{
id: Oban,
start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}
},
{Oban, Application.get_env(:pleroma, Oban)},
%{
id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},

View file

@ -11,55 +11,61 @@ defmodule Pleroma.Workers.BackgroundWorker do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "background",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}) do
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
user = User.get_by_id(user_id)
User.perform(:fetch_initial_posts, user)
end
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}) do
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
user = User.get_by_id(user_id)
User.perform(:deactivate_async, user, status)
end
def perform(%{"op" => "delete_user", "user_id" => user_id}) do
def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
user = User.get_by_id(user_id)
User.perform(:delete, user)
end
def perform(%{
def perform(
%{
"op" => "blocks_import",
"blocker_id" => blocker_id,
"blocked_identifiers" => blocked_identifiers
}) do
},
_job
) do
blocker = User.get_by_id(blocker_id)
User.perform(:blocks_import, blocker, blocked_identifiers)
end
def perform(%{
def perform(
%{
"op" => "follow_import",
"follower_id" => follower_id,
"followed_identifiers" => followed_identifiers
}) do
},
_job
) do
follower = User.get_by_id(follower_id)
User.perform(:follow_import, follower, followed_identifiers)
end
def perform(%{"op" => "clean_expired_tokens"}) do
def perform(%{"op" => "clean_expired_tokens"}, _job) do
CleanWorker.perform(:clean)
end
def perform(%{"op" => "media_proxy_preload", "message" => message}) do
def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
MediaProxyWarmingPolicy.perform(:preload, message)
end
def perform(%{"op" => "media_proxy_prefetch", "url" => url}) do
def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
MediaProxyWarmingPolicy.perform(:prefetch, url)
end
def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}) do
def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
activity = Activity.get_by_id(activity_id)
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
end

View file

@ -8,10 +8,10 @@ defmodule Pleroma.Workers.Mailer do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "mailer",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}) do
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
email =
encoded_email
|> Base.decode64!()
@ -20,7 +20,7 @@ def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => con
Pleroma.Emails.Mailer.deliver(email, config)
end
def perform(%{"op" => "digest_email", "user_id" => user_id}) do
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
user = User.get_by_id(user_id)
Pleroma.DigestEmailWorker.perform(user)
end

View file

@ -9,15 +9,15 @@ defmodule Pleroma.Workers.Publisher do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "federator_outgoing",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "publish", "activity_id" => activity_id}) do
def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
activity = Activity.get_by_id(activity_id)
Federator.perform(:publish, activity)
end
def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do
def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do
Federator.perform(:publish_one, String.to_atom(module_name), params)
end
end

View file

@ -8,14 +8,14 @@ defmodule Pleroma.Workers.Receiver do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "federator_incoming",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "incoming_doc", "body" => doc}) do
def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
Federator.perform(:incoming_doc, doc)
end
def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do
Federator.perform(:incoming_ap_doc, params)
end
end

View file

@ -6,10 +6,10 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "scheduled_activities",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "execute", "activity_id" => activity_id}) do
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)
end
end

View file

@ -10,19 +10,19 @@ defmodule Pleroma.Workers.Subscriber do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "federator_outgoing",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "refresh_subscriptions"}) do
def perform(%{"op" => "refresh_subscriptions"}, _job) do
Federator.perform(:refresh_subscriptions)
end
def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do
def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do
websub = Repo.get(WebsubClientSubscription, websub_id)
Federator.perform(:request_subscription, websub)
end
def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do
def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do
websub = Repo.get(WebsubClientSubscription, websub_id)
Federator.perform(:verify_websub, websub)
end

View file

@ -8,10 +8,10 @@ defmodule Pleroma.Workers.Transmogrifier do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "transmogrifier",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}) do
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
user = User.get_by_id(user_id)
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
end

View file

@ -9,10 +9,10 @@ defmodule Pleroma.Workers.WebPusher do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "web_push",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "web_push", "notification_id" => notification_id}) do
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
notification = Repo.get(Notification, notification_id)
Pleroma.Web.Push.Impl.perform(notification)
end

View file

@ -101,7 +101,7 @@ defp deps do
{:phoenix_ecto, "~> 4.0"},
{:ecto_sql, "~> 3.1"},
{:postgrex, ">= 0.13.5"},
{:oban, "~> 0.6"},
{:oban, "~> 0.7"},
{:gettext, "~> 0.15"},
{:comeonin, "~> 4.1.1"},
{:pbkdf2_elixir, "~> 0.12.3"},

View file

@ -17,12 +17,12 @@
"credo": {:hex, :credo, "0.9.3", "76fa3e9e497ab282e0cf64b98a624aa11da702854c52c82db1bf24e54ab7c97a", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
"crontab": {:hex, :crontab, "1.1.7", "b9219f0bdc8678b94143655a8f229716c5810c0636a4489f98c0956137e53985", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm"},
"crypt": {:git, "https://github.com/msantos/crypt", "1f2b58927ab57e72910191a7ebaeff984382a1d3", [ref: "1f2b58927ab57e72910191a7ebaeff984382a1d3"]},
"db_connection": {:hex, :db_connection, "2.0.6", "bde2f85d047969c5b5800cb8f4b3ed6316c8cb11487afedac4aa5f93fd39abfa", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
"db_connection": {:hex, :db_connection, "2.1.1", "a51e8a2ee54ef2ae6ec41a668c85787ed40cb8944928c191280fe34c15b76ae5", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
"decimal": {:hex, :decimal, "1.8.0", "ca462e0d885f09a1c5a342dbd7c1dcf27ea63548c65a65e67334f4b61803822e", [:mix], [], "hexpm"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm"},
"ecto": {:hex, :ecto, "3.1.4", "69d852da7a9f04ede725855a35ede48d158ca11a404fe94f8b2fb3b2162cd3c9", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"ecto_sql": {:hex, :ecto_sql, "3.1.3", "2c536139190492d9de33c5fefac7323c5eaaa82e1b9bf93482a14649042f7cd9", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.1.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:myxql, "~> 0.2.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"ecto": {:hex, :ecto, "3.1.7", "fa21d06ef56cdc2fdaa62574e8c3ba34a2751d44ea34c30bc65f0728421043e5", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"ecto_sql": {:hex, :ecto_sql, "3.1.6", "1e80e30d16138a729c717f73dcb938590bcdb3a4502f3012414d0cbb261045d8", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.1.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:myxql, "~> 0.2.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0 or ~> 0.15.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"esshd": {:hex, :esshd, "0.1.0", "6f93a2062adb43637edad0ea7357db2702a4b80dd9683482fe00f5134e97f4c1", [:mix], [], "hexpm"},
"eternal": {:hex, :eternal, "1.2.0", "e2a6b6ce3b8c248f7dc31451aefca57e3bdf0e48d73ae5043229380a67614c41", [:mix], [], "hexpm"},
"ex2ms": {:hex, :ex2ms, "1.5.0", "19e27f9212be9a96093fed8cdfbef0a2b56c21237196d26760f11dfcfae58e97", [:mix], [], "hexpm"},
@ -57,7 +57,7 @@
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
"mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
"oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"oban": {:hex, :oban, "0.7.1", "171bdd1b69c1a4a839f8c768f5e962fc22d1de1513d459fb6b8e0cbd34817a9a", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
"pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
"phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
@ -71,7 +71,7 @@
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
"plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"postgrex": {:hex, :postgrex, "0.15.0", "dd5349161019caeea93efa42f9b22f9d79995c3a86bdffb796427b4c9863b0f0", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"},
"prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"},
"prometheus_ex": {:hex, :prometheus_ex, "3.0.5", "fa58cfd983487fc5ead331e9a3e0aa622c67232b3ec71710ced122c4c453a02f", [:mix], [{:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm"},

View file

@ -16,7 +16,7 @@ def perform_all do
end
def perform(%Oban.Job{} = job) do
res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job])
res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job.args, job])
Repo.delete(job)
res
end