defmodule MessageServer.MessageQueue do use GenServer require Logger alias MessageServer.RemoteClient @type queued_message :: %{ target_server: String.t(), payload: map(), timestamp: DateTime.t(), retry_count: non_neg_integer() } @spec start_link(keyword()) :: GenServer.on_start() def start_link(opts \\ []) do {retry_interval, opts} = Keyword.pop(opts, :retry_interval, 30_000) {queue_name, _opts} = Keyword.pop(opts, :name, __MODULE__) GenServer.start_link(__MODULE__, retry_interval, name: queue_name) end @spec queue_message(String.t(), map()) :: :ok def queue_message(target_server, payload) do GenServer.cast(__MODULE__, {:queue_message, target_server, payload}) end @spec process_queue() :: :ok def process_queue do GenServer.cast(__MODULE__, :process_queue) end @impl true def init(retry_interval) do :timer.send_interval(retry_interval, :process_queue) {:ok, %{queue: [], processing: false}} end @impl true def handle_cast({:queue_message, target_server, payload}, %{queue: queue} = state) do message = %{ target_server: target_server, payload: payload, timestamp: DateTime.utc_now(), retry_count: 0 } Logger.info("Queueing message for offline server #{target_server}") {:noreply, %{state | queue: [message | queue]}} end @impl true def handle_cast(:process_queue, %{processing: true} = state) do {:noreply, state} end @impl true def handle_cast(:process_queue, %{queue: []} = state) do {:noreply, state} end @impl true def handle_cast(:process_queue, %{queue: queue} = state) do {:noreply, %{state | processing: true}} Task.start(fn -> process_queued_messages(queue) GenServer.cast(__MODULE__, {:processing_complete, []}) end) {:noreply, state} end @impl true def handle_cast({:processing_complete, failed_messages}, state) do {:noreply, %{state | queue: failed_messages, processing: false}} end @impl true def handle_info(:process_queue, state) do handle_cast(:process_queue, state) end @spec process_queued_messages([queued_message()]) :: [queued_message()] defp process_queued_messages(messages) do Enum.reduce(messages, [], fn message, failed_acc -> case attempt_delivery(message) do :ok -> Logger.info("Successfully delivered queued message to #{message.target_server}") failed_acc {:error, _reason} -> updated_message = %{message | retry_count: message.retry_count + 1} if updated_message.retry_count < 5 do [updated_message | failed_acc] else Logger.error("Dropping message to #{message.target_server} after 5 retries") failed_acc end end end) end @spec attempt_delivery(queued_message()) :: :ok | {:error, String.t()} defp attempt_delivery(%{target_server: target_server, payload: payload}) do RemoteClient.send_message_to_server(target_server, payload) end end