107 lines
3.0 KiB
Elixir
107 lines
3.0 KiB
Elixir
defmodule MessageServer.MessageQueue do
|
|
use GenServer
|
|
require Logger
|
|
alias MessageServer.RemoteClient
|
|
|
|
@type queued_message :: %{
|
|
target_server: String.t(),
|
|
payload: map(),
|
|
timestamp: DateTime.t(),
|
|
retry_count: non_neg_integer()
|
|
}
|
|
|
|
@spec start_link(keyword()) :: GenServer.on_start()
|
|
def start_link(opts \\ []) do
|
|
{retry_interval, opts} = Keyword.pop(opts, :retry_interval, 30_000)
|
|
{queue_name, _opts} = Keyword.pop(opts, :name, __MODULE__)
|
|
GenServer.start_link(__MODULE__, retry_interval, name: queue_name)
|
|
end
|
|
|
|
@spec queue_message(String.t(), map()) :: :ok
|
|
def queue_message(target_server, payload) do
|
|
GenServer.cast(__MODULE__, {:queue_message, target_server, payload})
|
|
end
|
|
|
|
@spec process_queue() :: :ok
|
|
def process_queue do
|
|
GenServer.cast(__MODULE__, :process_queue)
|
|
end
|
|
|
|
@impl true
|
|
def init(retry_interval) do
|
|
:timer.send_interval(retry_interval, :process_queue)
|
|
{:ok, %{queue: [], processing: false}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:queue_message, target_server, payload}, %{queue: queue} = state) do
|
|
message = %{
|
|
target_server: target_server,
|
|
payload: payload,
|
|
timestamp: DateTime.utc_now(),
|
|
retry_count: 0
|
|
}
|
|
|
|
Logger.info("Queueing message for offline server #{target_server}")
|
|
{:noreply, %{state | queue: [message | queue]}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast(:process_queue, %{processing: true} = state) do
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast(:process_queue, %{queue: []} = state) do
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast(:process_queue, %{queue: queue} = state) do
|
|
{:noreply, %{state | processing: true}}
|
|
|
|
Task.start(fn ->
|
|
process_queued_messages(queue)
|
|
GenServer.cast(__MODULE__, {:processing_complete, []})
|
|
end)
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:processing_complete, failed_messages}, state) do
|
|
{:noreply, %{state | queue: failed_messages, processing: false}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info(:process_queue, state) do
|
|
handle_cast(:process_queue, state)
|
|
end
|
|
|
|
@spec process_queued_messages([queued_message()]) :: [queued_message()]
|
|
defp process_queued_messages(messages) do
|
|
Enum.reduce(messages, [], fn message, failed_acc ->
|
|
case attempt_delivery(message) do
|
|
:ok ->
|
|
Logger.info("Successfully delivered queued message to #{message.target_server}")
|
|
failed_acc
|
|
|
|
{:error, _reason} ->
|
|
updated_message = %{message | retry_count: message.retry_count + 1}
|
|
|
|
if updated_message.retry_count < 5 do
|
|
[updated_message | failed_acc]
|
|
else
|
|
Logger.error("Dropping message to #{message.target_server} after 5 retries")
|
|
failed_acc
|
|
end
|
|
end
|
|
end)
|
|
end
|
|
|
|
@spec attempt_delivery(queued_message()) :: :ok | {:error, String.t()}
|
|
defp attempt_delivery(%{target_server: target_server, payload: payload}) do
|
|
RemoteClient.send_message_to_server(target_server, payload)
|
|
end
|
|
end
|