Implement suggestions from the Meilisearch MR
- Index unlisted posts - Move version check outside of the streaming and only do it once - Use a PUT request instead of checking manually if there is need to insert - Add error handling, sort of
This commit is contained in:
parent
c128798418
commit
bac70a2bc1
|
@ -3,38 +3,40 @@
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
|
defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
|
||||||
require Logger
|
|
||||||
require Pleroma.Constants
|
require Pleroma.Constants
|
||||||
|
|
||||||
import Mix.Pleroma
|
import Mix.Pleroma
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
|
|
||||||
import Pleroma.Search.Meilisearch, only: [meili_post!: 2, meili_delete!: 1, meili_get!: 1]
|
import Pleroma.Search.Meilisearch,
|
||||||
|
only: [meili_post: 2, meili_put: 2, meili_get: 1, meili_delete!: 1]
|
||||||
|
|
||||||
def run(["index" | args]) do
|
def run(["index"]) do
|
||||||
start_pleroma()
|
start_pleroma()
|
||||||
|
|
||||||
is_reindex = "--reindex" in args
|
{:ok, _} =
|
||||||
|
meili_post(
|
||||||
|
"/indexes/objects/settings/ranking-rules",
|
||||||
|
[
|
||||||
|
"desc(published)",
|
||||||
|
"words",
|
||||||
|
"exactness",
|
||||||
|
"proximity",
|
||||||
|
"wordsPosition",
|
||||||
|
"typo",
|
||||||
|
"attribute"
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
meili_post!(
|
{:ok, _} =
|
||||||
"/indexes/objects/settings/ranking-rules",
|
meili_post(
|
||||||
[
|
"/indexes/objects/settings/searchable-attributes",
|
||||||
"desc(published)",
|
[
|
||||||
"words",
|
"content"
|
||||||
"exactness",
|
]
|
||||||
"proximity",
|
)
|
||||||
"wordsPosition",
|
|
||||||
"typo",
|
|
||||||
"attribute"
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
meili_post!(
|
IO.puts("Created indices. Starting to insert posts.")
|
||||||
"/indexes/objects/settings/searchable-attributes",
|
|
||||||
[
|
|
||||||
"content"
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
chunk_size = 10_000
|
chunk_size = 10_000
|
||||||
|
|
||||||
|
@ -42,11 +44,11 @@ def run(["index" | args]) do
|
||||||
fn ->
|
fn ->
|
||||||
query =
|
query =
|
||||||
from(Pleroma.Object,
|
from(Pleroma.Object,
|
||||||
# Only index public posts which are notes and have some text
|
# Only index public and unlisted posts which are notes and have some text
|
||||||
where:
|
where:
|
||||||
fragment("data->>'type' = 'Note'") and
|
fragment("data->>'type' = 'Note'") and
|
||||||
fragment("LENGTH(data->>'content') > 0") and
|
(fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) or
|
||||||
fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()),
|
fragment("data->'cc' \\? ?", ^Pleroma.Constants.as_public())),
|
||||||
order_by: [desc: fragment("data->'published'")]
|
order_by: [desc: fragment("data->'published'")]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -70,34 +72,18 @@ def run(["index" | args]) do
|
||||||
{[objects], new_acc}
|
{[objects], new_acc}
|
||||||
end)
|
end)
|
||||||
|> Stream.each(fn objects ->
|
|> Stream.each(fn objects ->
|
||||||
objects =
|
|
||||||
objects
|
|
||||||
|> Enum.filter(fn o ->
|
|
||||||
if is_reindex do
|
|
||||||
result = meili_get!("/indexes/objects/documents/#{o.id}")
|
|
||||||
|
|
||||||
# With >= 0.24.0 the name for "errorCode" is just "code"
|
|
||||||
error_code_key =
|
|
||||||
if meili_get!("/version")["pkgVersion"] |> Version.match?(">= 0.24.0"),
|
|
||||||
do: "code",
|
|
||||||
else: "errorCode"
|
|
||||||
|
|
||||||
# Filter out the already indexed documents.
|
|
||||||
# This is true when the document does not exist
|
|
||||||
result[error_code_key] == "document_not_found"
|
|
||||||
else
|
|
||||||
true
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
|
|
||||||
result =
|
result =
|
||||||
meili_post!(
|
meili_put(
|
||||||
"/indexes/objects/documents",
|
"/indexes/objects/documents",
|
||||||
objects
|
objects
|
||||||
)
|
)
|
||||||
|
|
||||||
if not Map.has_key?(result, "updateId") do
|
with {:ok, res} <- result do
|
||||||
IO.puts("Failed to index: #{inspect(result)}")
|
if not Map.has_key?(res, "updateId") do
|
||||||
|
IO.puts("\nFailed to index: #{inspect(result)}")
|
||||||
|
end
|
||||||
|
else
|
||||||
|
e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}")
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|> Stream.run()
|
|> Stream.run()
|
||||||
|
@ -137,7 +123,7 @@ def run(["show-private-key", master_key]) do
|
||||||
def run(["stats"]) do
|
def run(["stats"]) do
|
||||||
start_pleroma()
|
start_pleroma()
|
||||||
|
|
||||||
result = meili_get!("/indexes/objects/stats")
|
{:ok, result} = meili_get("/indexes/objects/stats")
|
||||||
IO.puts("Number of entries: #{result["numberOfDocuments"]}")
|
IO.puts("Number of entries: #{result["numberOfDocuments"]}")
|
||||||
IO.puts("Indexing? #{result["isIndexing"]}")
|
IO.puts("Indexing? #{result["isIndexing"]}")
|
||||||
end
|
end
|
||||||
|
|
|
@ -14,29 +14,50 @@ defp meili_headers do
|
||||||
if is_nil(private_key), do: [], else: [{"X-Meili-API-Key", private_key}]
|
if is_nil(private_key), do: [], else: [{"X-Meili-API-Key", private_key}]
|
||||||
end
|
end
|
||||||
|
|
||||||
def meili_get!(path) do
|
def meili_get(path) do
|
||||||
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
|
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
|
||||||
|
|
||||||
{:ok, result} =
|
result =
|
||||||
Pleroma.HTTP.get(
|
Pleroma.HTTP.get(
|
||||||
Path.join(endpoint, path),
|
Path.join(endpoint, path),
|
||||||
meili_headers()
|
meili_headers()
|
||||||
)
|
)
|
||||||
|
|
||||||
Jason.decode!(result.body)
|
with {:ok, res} <- result do
|
||||||
|
{:ok, Jason.decode!(res.body)}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def meili_post!(path, params) do
|
def meili_post(path, params) do
|
||||||
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
|
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
|
||||||
|
|
||||||
{:ok, result} =
|
result =
|
||||||
Pleroma.HTTP.post(
|
Pleroma.HTTP.post(
|
||||||
Path.join(endpoint, path),
|
Path.join(endpoint, path),
|
||||||
Jason.encode!(params),
|
Jason.encode!(params),
|
||||||
meili_headers()
|
meili_headers()
|
||||||
)
|
)
|
||||||
|
|
||||||
Jason.decode!(result.body)
|
with {:ok, res} <- result do
|
||||||
|
{:ok, Jason.decode!(res.body)}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def meili_put(path, params) do
|
||||||
|
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
|
||||||
|
|
||||||
|
result =
|
||||||
|
Pleroma.HTTP.request(
|
||||||
|
:put,
|
||||||
|
Path.join(endpoint, path),
|
||||||
|
Jason.encode!(params),
|
||||||
|
meili_headers(),
|
||||||
|
[]
|
||||||
|
)
|
||||||
|
|
||||||
|
with {:ok, res} <- result do
|
||||||
|
{:ok, Jason.decode!(res.body)}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def meili_delete!(path) do
|
def meili_delete!(path) do
|
||||||
|
@ -57,34 +78,40 @@ def search(user, query, options \\ []) do
|
||||||
offset = Keyword.get(options, :offset, 0)
|
offset = Keyword.get(options, :offset, 0)
|
||||||
author = Keyword.get(options, :author)
|
author = Keyword.get(options, :author)
|
||||||
|
|
||||||
result =
|
res =
|
||||||
meili_post!(
|
meili_post(
|
||||||
"/indexes/objects/search",
|
"/indexes/objects/search",
|
||||||
%{q: query, offset: offset, limit: limit}
|
%{q: query, offset: offset, limit: limit}
|
||||||
)
|
)
|
||||||
|
|
||||||
hits = result["hits"] |> Enum.map(& &1["ap"])
|
with {:ok, result} <- res do
|
||||||
|
hits = result["hits"] |> Enum.map(& &1["ap"])
|
||||||
|
|
||||||
try do
|
try do
|
||||||
hits
|
hits
|
||||||
|> Activity.create_by_object_ap_id()
|
|> Activity.create_by_object_ap_id()
|
||||||
|> Activity.with_preloaded_object()
|
|> Activity.with_preloaded_object()
|
||||||
|> Activity.with_preloaded_object()
|
|> Activity.with_preloaded_object()
|
||||||
|> Activity.restrict_deactivated_users()
|
|> Activity.restrict_deactivated_users()
|
||||||
|> maybe_restrict_local(user)
|
|> maybe_restrict_local(user)
|
||||||
|> maybe_restrict_author(author)
|
|> maybe_restrict_author(author)
|
||||||
|> maybe_restrict_blocked(user)
|
|> maybe_restrict_blocked(user)
|
||||||
|> maybe_fetch(user, query)
|
|> maybe_fetch(user, query)
|
||||||
|> order_by([object: obj], desc: obj.data["published"])
|
|> order_by([object: obj], desc: obj.data["published"])
|
||||||
|> Pleroma.Repo.all()
|
|> Pleroma.Repo.all()
|
||||||
rescue
|
rescue
|
||||||
_ -> maybe_fetch([], user, query)
|
_ -> maybe_fetch([], user, query)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def object_to_search_data(object) do
|
def object_to_search_data(object) do
|
||||||
|
# Only index public or unlisted Notes
|
||||||
if not is_nil(object) and object.data["type"] == "Note" and
|
if not is_nil(object) and object.data["type"] == "Note" and
|
||||||
Pleroma.Constants.as_public() in object.data["to"] do
|
not is_nil(object.data["content"]) and
|
||||||
|
(Pleroma.Constants.as_public() in object.data["to"] or
|
||||||
|
Pleroma.Constants.as_public() in object.data["cc"]) and
|
||||||
|
String.length(object.data["content"]) > 1 do
|
||||||
data = object.data
|
data = object.data
|
||||||
|
|
||||||
content_str =
|
content_str =
|
||||||
|
@ -117,13 +144,17 @@ def add_to_index(activity) do
|
||||||
|
|
||||||
if activity.data["type"] == "Create" and maybe_search_data do
|
if activity.data["type"] == "Create" and maybe_search_data do
|
||||||
result =
|
result =
|
||||||
meili_post!(
|
meili_put(
|
||||||
"/indexes/objects/documents",
|
"/indexes/objects/documents",
|
||||||
[maybe_search_data]
|
[maybe_search_data]
|
||||||
)
|
)
|
||||||
|
|
||||||
if not Map.has_key?(result, "updateId") do
|
with {:ok, res} <- result,
|
||||||
Logger.error("Failed to add activity #{activity.id} to index: #{inspect(result)}")
|
true <- Map.has_key?(res, "updateId") do
|
||||||
|
# Do nothing
|
||||||
|
else
|
||||||
|
_ ->
|
||||||
|
Logger.error("Failed to add activity #{activity.id} to index: #{inspect(result)}")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue