From 20753b8a49baa010bcf9bfaf7d57cbf791b1ed46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Broks=20Randolfs=20Gail=C4=ABtis?= Date: Wed, 20 Aug 2025 14:43:26 +0300 Subject: [PATCH] Initial queue implementation --- lib/message_server/application.ex | 1 + lib/message_server/message_queue.ex | 106 ++++++++++++++++++++++++++++ lib/message_server/remote_client.ex | 3 +- lib/message_server/router.ex | 5 -- 4 files changed, 109 insertions(+), 6 deletions(-) create mode 100644 lib/message_server/message_queue.ex diff --git a/lib/message_server/application.ex b/lib/message_server/application.ex index 4d3c1d7..7583f2f 100644 --- a/lib/message_server/application.ex +++ b/lib/message_server/application.ex @@ -14,6 +14,7 @@ defmodule MessageServer.Application do children = [ {MessageServer.ServerRegistry, {server_id, servers}}, {MessageServer.Storage, {server_id}}, + {MessageServer.MessageQueue, [retry_interval: 30_000]}, {Bandit, plug: MessageServer.Router, port: port} ] diff --git a/lib/message_server/message_queue.ex b/lib/message_server/message_queue.ex new file mode 100644 index 0000000..f77b822 --- /dev/null +++ b/lib/message_server/message_queue.ex @@ -0,0 +1,106 @@ +defmodule MessageServer.MessageQueue do + use GenServer + require Logger + alias MessageServer.RemoteClient + + @type queued_message :: %{ + target_server: String.t(), + payload: map(), + timestamp: DateTime.t(), + retry_count: non_neg_integer() + } + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + {retry_interval, opts} = Keyword.pop(opts, :retry_interval, 30_000) + {queue_name, _opts} = Keyword.pop(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, retry_interval, name: queue_name) + end + + @spec queue_message(String.t(), map()) :: :ok + def queue_message(target_server, payload) do + GenServer.cast(__MODULE__, {:queue_message, target_server, payload}) + end + + @spec process_queue() :: :ok + def process_queue do + GenServer.cast(__MODULE__, :process_queue) + end + + @impl true + def init(retry_interval) do + :timer.send_interval(retry_interval, :process_queue) + {:ok, %{queue: [], processing: false}} + end + + @impl true + def handle_cast({:queue_message, target_server, payload}, %{queue: queue} = state) do + message = %{ + target_server: target_server, + payload: payload, + timestamp: DateTime.utc_now(), + retry_count: 0 + } + + Logger.info("Queueing message for offline server #{target_server}") + {:noreply, %{state | queue: [message | queue]}} + end + + @impl true + def handle_cast(:process_queue, %{processing: true} = state) do + {:noreply, state} + end + + @impl true + def handle_cast(:process_queue, %{queue: []} = state) do + {:noreply, state} + end + + @impl true + def handle_cast(:process_queue, %{queue: queue} = state) do + {:noreply, %{state | processing: true}} + + Task.start(fn -> + process_queued_messages(queue) + GenServer.cast(__MODULE__, {:processing_complete, []}) + end) + + {:noreply, state} + end + + @impl true + def handle_cast({:processing_complete, failed_messages}, state) do + {:noreply, %{state | queue: failed_messages, processing: false}} + end + + @impl true + def handle_info(:process_queue, state) do + handle_cast(:process_queue, state) + end + + @spec process_queued_messages([queued_message()]) :: [queued_message()] + defp process_queued_messages(messages) do + Enum.reduce(messages, [], fn message, failed_acc -> + case attempt_delivery(message) do + :ok -> + Logger.info("Successfully delivered queued message to #{message.target_server}") + failed_acc + + {:error, _reason} -> + updated_message = %{message | retry_count: message.retry_count + 1} + + if updated_message.retry_count < 5 do + [updated_message | failed_acc] + else + Logger.error("Dropping message to #{message.target_server} after 5 retries") + failed_acc + end + end + end) + end + + @spec attempt_delivery(queued_message()) :: :ok | {:error, String.t()} + defp attempt_delivery(%{target_server: target_server, payload: payload}) do + RemoteClient.send_message_to_server(target_server, payload) + end +end diff --git a/lib/message_server/remote_client.ex b/lib/message_server/remote_client.ex index 5d864b8..d43569d 100644 --- a/lib/message_server/remote_client.ex +++ b/lib/message_server/remote_client.ex @@ -1,7 +1,7 @@ defmodule MessageServer.RemoteClient do require Logger - alias MessageServer.{MessageRequest, ServerRegistry, Auth} + alias MessageServer.{MessageRequest, ServerRegistry, Auth, MessageQueue} @spec send_message_to_server(String.t(), MessageRequest.t()) :: :ok | {:error, String.t()} def send_message_to_server(target_server_id, payload) do @@ -15,6 +15,7 @@ defmodule MessageServer.RemoteClient do {:error, reason} = error -> Logger.error("Failed to send message to server #{target_server_id}: #{reason}") + MessageQueue.queue_message(target_server_id, payload) error end end diff --git a/lib/message_server/router.ex b/lib/message_server/router.ex index 40f7949..8b01fcd 100644 --- a/lib/message_server/router.ex +++ b/lib/message_server/router.ex @@ -60,11 +60,6 @@ defmodule MessageServer.Router do |> send_resp(400, Jason.encode!(%{error: reason})) end else - {:error, :auth_failure, reason} -> - conn - |> put_resp_content_type("application/json") - |> send_resp(401, Jason.encode!(%{error: "Unauthorized", message: reason})) - {:error, reason} -> conn |> put_resp_content_type("application/json")