Merge branch 'fix/attachments-cleanup' into 'develop'
Delete attachments in queue with infinite timeout and ensure object.data.url is an array See merge request pleroma/pleroma!2103
This commit is contained in:
commit
71bffbf0b7
|
@ -502,7 +502,8 @@
|
||||||
mailer: 10,
|
mailer: 10,
|
||||||
transmogrifier: 20,
|
transmogrifier: 20,
|
||||||
scheduled_activities: 10,
|
scheduled_activities: 10,
|
||||||
background: 5
|
background: 5,
|
||||||
|
attachments_cleanup: 5
|
||||||
]
|
]
|
||||||
|
|
||||||
config :pleroma, :workers,
|
config :pleroma, :workers,
|
||||||
|
|
|
@ -19,6 +19,8 @@ defmodule Pleroma.Object do
|
||||||
|
|
||||||
@type t() :: %__MODULE__{}
|
@type t() :: %__MODULE__{}
|
||||||
|
|
||||||
|
@derive {Jason.Encoder, only: [:data]}
|
||||||
|
|
||||||
schema "objects" do
|
schema "objects" do
|
||||||
field(:data, :map)
|
field(:data, :map)
|
||||||
|
|
||||||
|
@ -180,85 +182,17 @@ def swap_object_with_tombstone(object) do
|
||||||
|
|
||||||
def delete(%Object{data: %{"id" => id}} = object) do
|
def delete(%Object{data: %{"id" => id}} = object) do
|
||||||
with {:ok, _obj} = swap_object_with_tombstone(object),
|
with {:ok, _obj} = swap_object_with_tombstone(object),
|
||||||
:ok <- delete_attachments(object),
|
|
||||||
deleted_activity = Activity.delete_all_by_object_ap_id(id),
|
deleted_activity = Activity.delete_all_by_object_ap_id(id),
|
||||||
{:ok, true} <- Cachex.del(:object_cache, "object:#{id}"),
|
{:ok, true} <- Cachex.del(:object_cache, "object:#{id}"),
|
||||||
{:ok, _} <- Cachex.del(:web_resp_cache, URI.parse(id).path) do
|
{:ok, _} <- Cachex.del(:web_resp_cache, URI.parse(id).path),
|
||||||
|
{:ok, _} <-
|
||||||
|
Pleroma.Workers.AttachmentsCleanupWorker.enqueue("cleanup_attachments", %{
|
||||||
|
"object" => object
|
||||||
|
}) do
|
||||||
{:ok, object, deleted_activity}
|
{:ok, object, deleted_activity}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp delete_attachments(%{data: %{"attachment" => [_ | _] = attachments, "actor" => actor}}) do
|
|
||||||
hrefs =
|
|
||||||
Enum.flat_map(attachments, fn attachment ->
|
|
||||||
Enum.map(attachment["url"], & &1["href"])
|
|
||||||
end)
|
|
||||||
|
|
||||||
names = Enum.map(attachments, & &1["name"])
|
|
||||||
|
|
||||||
uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
|
|
||||||
|
|
||||||
# find all objects for copies of the attachments, name and actor doesn't matter here
|
|
||||||
delete_ids =
|
|
||||||
from(o in Object,
|
|
||||||
where:
|
|
||||||
fragment(
|
|
||||||
"to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href'))::jsonb \\?| (?)",
|
|
||||||
o.data,
|
|
||||||
^hrefs
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|> Repo.all()
|
|
||||||
# we should delete 1 object for any given attachment, but don't delete files if
|
|
||||||
# there are more than 1 object for it
|
|
||||||
|> Enum.reduce(%{}, fn %{
|
|
||||||
id: id,
|
|
||||||
data: %{
|
|
||||||
"url" => [%{"href" => href}],
|
|
||||||
"actor" => obj_actor,
|
|
||||||
"name" => name
|
|
||||||
}
|
|
||||||
},
|
|
||||||
acc ->
|
|
||||||
Map.update(acc, href, %{id: id, count: 1}, fn val ->
|
|
||||||
case obj_actor == actor and name in names do
|
|
||||||
true ->
|
|
||||||
# set id of the actor's object that will be deleted
|
|
||||||
%{val | id: id, count: val.count + 1}
|
|
||||||
|
|
||||||
false ->
|
|
||||||
# another actor's object, just increase count to not delete file
|
|
||||||
%{val | count: val.count + 1}
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
end)
|
|
||||||
|> Enum.map(fn {href, %{id: id, count: count}} ->
|
|
||||||
# only delete files that have single instance
|
|
||||||
with 1 <- count do
|
|
||||||
prefix =
|
|
||||||
case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
|
|
||||||
nil -> "media"
|
|
||||||
_ -> ""
|
|
||||||
end
|
|
||||||
|
|
||||||
base_url = Pleroma.Config.get([__MODULE__, :base_url], Pleroma.Web.base_url())
|
|
||||||
|
|
||||||
file_path = String.trim_leading(href, "#{base_url}/#{prefix}")
|
|
||||||
|
|
||||||
uploader.delete_file(file_path)
|
|
||||||
end
|
|
||||||
|
|
||||||
id
|
|
||||||
end)
|
|
||||||
|
|
||||||
from(o in Object, where: o.id in ^delete_ids)
|
|
||||||
|> Repo.delete_all()
|
|
||||||
|
|
||||||
:ok
|
|
||||||
end
|
|
||||||
|
|
||||||
defp delete_attachments(%{data: _data}), do: :ok
|
|
||||||
|
|
||||||
def prune(%Object{data: %{"id" => id}} = object) do
|
def prune(%Object{data: %{"id" => id}} = object) do
|
||||||
with {:ok, object} <- Repo.delete(object),
|
with {:ok, object} <- Repo.delete(object),
|
||||||
{:ok, true} <- Cachex.del(:object_cache, "object:#{id}"),
|
{:ok, true} <- Cachex.del(:object_cache, "object:#{id}"),
|
||||||
|
|
88
lib/pleroma/workers/attachments_cleanup_worker.ex
Normal file
88
lib/pleroma/workers/attachments_cleanup_worker.ex
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.AttachmentsCleanupWorker do
|
||||||
|
import Ecto.Query
|
||||||
|
|
||||||
|
alias Pleroma.Object
|
||||||
|
alias Pleroma.Repo
|
||||||
|
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(
|
||||||
|
%{"object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}},
|
||||||
|
_job
|
||||||
|
) do
|
||||||
|
hrefs =
|
||||||
|
Enum.flat_map(attachments, fn attachment ->
|
||||||
|
Enum.map(attachment["url"], & &1["href"])
|
||||||
|
end)
|
||||||
|
|
||||||
|
names = Enum.map(attachments, & &1["name"])
|
||||||
|
|
||||||
|
uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
|
||||||
|
|
||||||
|
# find all objects for copies of the attachments, name and actor doesn't matter here
|
||||||
|
delete_ids =
|
||||||
|
from(o in Object,
|
||||||
|
where:
|
||||||
|
fragment(
|
||||||
|
"to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
|
||||||
|
o.data,
|
||||||
|
o.data,
|
||||||
|
^hrefs
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# The query above can be time consumptive on large instances until we
|
||||||
|
# refactor how uploads are stored
|
||||||
|
|> Repo.all(timout: :infinity)
|
||||||
|
# we should delete 1 object for any given attachment, but don't delete
|
||||||
|
# files if there are more than 1 object for it
|
||||||
|
|> Enum.reduce(%{}, fn %{
|
||||||
|
id: id,
|
||||||
|
data: %{
|
||||||
|
"url" => [%{"href" => href}],
|
||||||
|
"actor" => obj_actor,
|
||||||
|
"name" => name
|
||||||
|
}
|
||||||
|
},
|
||||||
|
acc ->
|
||||||
|
Map.update(acc, href, %{id: id, count: 1}, fn val ->
|
||||||
|
case obj_actor == actor and name in names do
|
||||||
|
true ->
|
||||||
|
# set id of the actor's object that will be deleted
|
||||||
|
%{val | id: id, count: val.count + 1}
|
||||||
|
|
||||||
|
false ->
|
||||||
|
# another actor's object, just increase count to not delete file
|
||||||
|
%{val | count: val.count + 1}
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|> Enum.map(fn {href, %{id: id, count: count}} ->
|
||||||
|
# only delete files that have single instance
|
||||||
|
with 1 <- count do
|
||||||
|
prefix =
|
||||||
|
case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
|
||||||
|
nil -> "media"
|
||||||
|
_ -> ""
|
||||||
|
end
|
||||||
|
|
||||||
|
base_url = Pleroma.Config.get([__MODULE__, :base_url], Pleroma.Web.base_url())
|
||||||
|
|
||||||
|
file_path = String.trim_leading(href, "#{base_url}/#{prefix}")
|
||||||
|
|
||||||
|
uploader.delete_file(file_path)
|
||||||
|
end
|
||||||
|
|
||||||
|
id
|
||||||
|
end)
|
||||||
|
|
||||||
|
from(o in Object, where: o.id in ^delete_ids)
|
||||||
|
|> Repo.delete_all()
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"object" => _object}, _job), do: :ok
|
||||||
|
end
|
|
@ -4,12 +4,14 @@
|
||||||
|
|
||||||
defmodule Pleroma.ObjectTest do
|
defmodule Pleroma.ObjectTest do
|
||||||
use Pleroma.DataCase
|
use Pleroma.DataCase
|
||||||
|
use Oban.Testing, repo: Pleroma.Repo
|
||||||
import ExUnit.CaptureLog
|
import ExUnit.CaptureLog
|
||||||
import Pleroma.Factory
|
import Pleroma.Factory
|
||||||
import Tesla.Mock
|
import Tesla.Mock
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
alias Pleroma.Repo
|
alias Pleroma.Repo
|
||||||
|
alias Pleroma.Tests.ObanHelpers
|
||||||
alias Pleroma.Web.CommonAPI
|
alias Pleroma.Web.CommonAPI
|
||||||
|
|
||||||
setup do
|
setup do
|
||||||
|
@ -99,6 +101,8 @@ test "in subdirectories" do
|
||||||
|
|
||||||
Object.delete(note)
|
Object.delete(note)
|
||||||
|
|
||||||
|
ObanHelpers.perform(all_enqueued(worker: Pleroma.Workers.AttachmentsCleanupWorker))
|
||||||
|
|
||||||
assert Object.get_by_id(attachment.id) == nil
|
assert Object.get_by_id(attachment.id) == nil
|
||||||
|
|
||||||
assert {:ok, []} == File.ls("#{uploads_dir}/#{path}")
|
assert {:ok, []} == File.ls("#{uploads_dir}/#{path}")
|
||||||
|
@ -133,10 +137,46 @@ test "with dedupe enabled" do
|
||||||
|
|
||||||
Object.delete(note)
|
Object.delete(note)
|
||||||
|
|
||||||
|
ObanHelpers.perform(all_enqueued(worker: Pleroma.Workers.AttachmentsCleanupWorker))
|
||||||
|
|
||||||
assert Object.get_by_id(attachment.id) == nil
|
assert Object.get_by_id(attachment.id) == nil
|
||||||
assert {:ok, files} = File.ls(uploads_dir)
|
assert {:ok, files} = File.ls(uploads_dir)
|
||||||
refute filename in files
|
refute filename in files
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "with objects that have legacy data.url attribute" do
|
||||||
|
Pleroma.Config.put([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local)
|
||||||
|
|
||||||
|
file = %Plug.Upload{
|
||||||
|
content_type: "image/jpg",
|
||||||
|
path: Path.absname("test/fixtures/image.jpg"),
|
||||||
|
filename: "an_image.jpg"
|
||||||
|
}
|
||||||
|
|
||||||
|
user = insert(:user)
|
||||||
|
|
||||||
|
{:ok, %Object{} = attachment} =
|
||||||
|
Pleroma.Web.ActivityPub.ActivityPub.upload(file, actor: user.ap_id)
|
||||||
|
|
||||||
|
{:ok, %Object{}} = Object.create(%{url: "https://google.com", actor: user.ap_id})
|
||||||
|
|
||||||
|
%{data: %{"attachment" => [%{"url" => [%{"href" => href}]}]}} =
|
||||||
|
note = insert(:note, %{user: user, data: %{"attachment" => [attachment.data]}})
|
||||||
|
|
||||||
|
uploads_dir = Pleroma.Config.get!([Pleroma.Uploaders.Local, :uploads])
|
||||||
|
|
||||||
|
path = href |> Path.dirname() |> Path.basename()
|
||||||
|
|
||||||
|
assert {:ok, ["an_image.jpg"]} == File.ls("#{uploads_dir}/#{path}")
|
||||||
|
|
||||||
|
Object.delete(note)
|
||||||
|
|
||||||
|
ObanHelpers.perform(all_enqueued(worker: Pleroma.Workers.AttachmentsCleanupWorker))
|
||||||
|
|
||||||
|
assert Object.get_by_id(attachment.id) == nil
|
||||||
|
|
||||||
|
assert {:ok, []} == File.ls("#{uploads_dir}/#{path}")
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "normalizer" do
|
describe "normalizer" do
|
||||||
|
|
Loading…
Reference in a new issue