Compare commits
No commits in common. "f2ff156816d98f27c4522505c5af3c0456088840" and "753053d10a4036deb19b015f512fa678c98e2c7f" have entirely different histories.
f2ff156816
...
753053d10a
12
README.md
12
README.md
@ -29,18 +29,8 @@ curl -X POST http://localhost:4000/api/messages -H "Content-Type: application/js
|
|||||||
curl -X POST http://localhost:4001/api/messages -H "Content-Type: application/json" -d '{"from": "2-nibbler", "to": "1-bender", "message": "I am Nibbler, agent of the Nibblonian fleet."}'
|
curl -X POST http://localhost:4001/api/messages -H "Content-Type: application/json" -d '{"from": "2-nibbler", "to": "1-bender", "message": "I am Nibbler, agent of the Nibblonian fleet."}'
|
||||||
```
|
```
|
||||||
|
|
||||||
### Notes on queueing failed messages
|
|
||||||
When messages fail to be delivered they are queued for retry at 30 second interval with a maximum of 5 retries by default (can be configured)
|
|
||||||
|
|
||||||
## Security
|
## Security
|
||||||
This application uses a shared auth key (`AUTH_KEY`) to authenticate requests between servers. The key is provided as an environment variable and must be the same on all servers.
|
This application uses a shared auth key (`AUTH_KEY`) to authenticate requests between servers. The key is provided as an environment variable and must be the same on all servers.
|
||||||
|
|
||||||
## Next steps
|
### Next steps
|
||||||
|
|
||||||
### Security - HTTPS with client certificates
|
|
||||||
HTTPS with client certificates should be implemented to ensure secure communication between servers and prevent unauthorized access and possible man-in-the-middle attacks.
|
HTTPS with client certificates should be implemented to ensure secure communication between servers and prevent unauthorized access and possible man-in-the-middle attacks.
|
||||||
|
|
||||||
### Testing
|
|
||||||
Full suite of unit tests should be implemented to ensure the correctness of the application's logic and behavior.
|
|
||||||
|
|
||||||
### Finish implementation of Message Queue to queue failed messages for retry
|
|
||||||
|
|||||||
@ -14,7 +14,6 @@ defmodule MessageServer.Application do
|
|||||||
children = [
|
children = [
|
||||||
{MessageServer.ServerRegistry, {server_id, servers}},
|
{MessageServer.ServerRegistry, {server_id, servers}},
|
||||||
{MessageServer.Storage, {server_id}},
|
{MessageServer.Storage, {server_id}},
|
||||||
{MessageServer.MessageQueue, [retry_interval: 30_000]},
|
|
||||||
{Bandit, plug: MessageServer.Router, port: port}
|
{Bandit, plug: MessageServer.Router, port: port}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@ -1,106 +0,0 @@
|
|||||||
defmodule MessageServer.MessageQueue do
|
|
||||||
use GenServer
|
|
||||||
require Logger
|
|
||||||
alias MessageServer.RemoteClient
|
|
||||||
|
|
||||||
@type queued_message :: %{
|
|
||||||
target_server: String.t(),
|
|
||||||
payload: map(),
|
|
||||||
timestamp: DateTime.t(),
|
|
||||||
retry_count: non_neg_integer()
|
|
||||||
}
|
|
||||||
|
|
||||||
@spec start_link(keyword()) :: GenServer.on_start()
|
|
||||||
def start_link(opts \\ []) do
|
|
||||||
{retry_interval, opts} = Keyword.pop(opts, :retry_interval, 30_000)
|
|
||||||
{queue_name, _opts} = Keyword.pop(opts, :name, __MODULE__)
|
|
||||||
GenServer.start_link(__MODULE__, retry_interval, name: queue_name)
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec queue_message(String.t(), map()) :: :ok
|
|
||||||
def queue_message(target_server, payload) do
|
|
||||||
GenServer.cast(__MODULE__, {:queue_message, target_server, payload})
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec process_queue() :: :ok
|
|
||||||
def process_queue do
|
|
||||||
GenServer.cast(__MODULE__, :process_queue)
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def init(retry_interval) do
|
|
||||||
:timer.send_interval(retry_interval, :process_queue)
|
|
||||||
{:ok, %{queue: [], processing: false}}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def handle_cast({:queue_message, target_server, payload}, %{queue: queue} = state) do
|
|
||||||
message = %{
|
|
||||||
target_server: target_server,
|
|
||||||
payload: payload,
|
|
||||||
timestamp: DateTime.utc_now(),
|
|
||||||
retry_count: 0
|
|
||||||
}
|
|
||||||
|
|
||||||
Logger.info("Queueing message for offline server #{target_server}")
|
|
||||||
{:noreply, %{state | queue: [message | queue]}}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def handle_cast(:process_queue, %{processing: true} = state) do
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def handle_cast(:process_queue, %{queue: []} = state) do
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def handle_cast(:process_queue, %{queue: queue} = state) do
|
|
||||||
{:noreply, %{state | processing: true}}
|
|
||||||
|
|
||||||
Task.start(fn ->
|
|
||||||
process_queued_messages(queue)
|
|
||||||
GenServer.cast(__MODULE__, {:processing_complete, []})
|
|
||||||
end)
|
|
||||||
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def handle_cast({:processing_complete, failed_messages}, state) do
|
|
||||||
{:noreply, %{state | queue: failed_messages, processing: false}}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def handle_info(:process_queue, state) do
|
|
||||||
handle_cast(:process_queue, state)
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec process_queued_messages([queued_message()]) :: [queued_message()]
|
|
||||||
defp process_queued_messages(messages) do
|
|
||||||
Enum.reduce(messages, [], fn message, failed_acc ->
|
|
||||||
case attempt_delivery(message) do
|
|
||||||
:ok ->
|
|
||||||
Logger.info("Successfully delivered queued message to #{message.target_server}")
|
|
||||||
failed_acc
|
|
||||||
|
|
||||||
{:error, _reason} ->
|
|
||||||
updated_message = %{message | retry_count: message.retry_count + 1}
|
|
||||||
|
|
||||||
if updated_message.retry_count < 5 do
|
|
||||||
[updated_message | failed_acc]
|
|
||||||
else
|
|
||||||
Logger.error("Dropping message to #{message.target_server} after 5 retries")
|
|
||||||
failed_acc
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec attempt_delivery(queued_message()) :: :ok | {:error, String.t()}
|
|
||||||
defp attempt_delivery(%{target_server: target_server, payload: payload}) do
|
|
||||||
RemoteClient.send_message_to_server(target_server, payload)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@ -1,7 +1,7 @@
|
|||||||
defmodule MessageServer.RemoteClient do
|
defmodule MessageServer.RemoteClient do
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
alias MessageServer.{MessageRequest, ServerRegistry, Auth, MessageQueue}
|
alias MessageServer.{MessageRequest, ServerRegistry, Auth}
|
||||||
|
|
||||||
@spec send_message_to_server(String.t(), MessageRequest.t()) :: :ok | {:error, String.t()}
|
@spec send_message_to_server(String.t(), MessageRequest.t()) :: :ok | {:error, String.t()}
|
||||||
def send_message_to_server(target_server_id, payload) do
|
def send_message_to_server(target_server_id, payload) do
|
||||||
@ -15,7 +15,6 @@ defmodule MessageServer.RemoteClient do
|
|||||||
|
|
||||||
{:error, reason} = error ->
|
{:error, reason} = error ->
|
||||||
Logger.error("Failed to send message to server #{target_server_id}: #{reason}")
|
Logger.error("Failed to send message to server #{target_server_id}: #{reason}")
|
||||||
MessageQueue.queue_message(target_server_id, payload)
|
|
||||||
error
|
error
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@ -60,6 +60,11 @@ defmodule MessageServer.Router do
|
|||||||
|> send_resp(400, Jason.encode!(%{error: reason}))
|
|> send_resp(400, Jason.encode!(%{error: reason}))
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
|
{:error, :auth_failure, reason} ->
|
||||||
|
conn
|
||||||
|
|> put_resp_content_type("application/json")
|
||||||
|
|> send_resp(401, Jason.encode!(%{error: "Unauthorized", message: reason}))
|
||||||
|
|
||||||
{:error, reason} ->
|
{:error, reason} ->
|
||||||
conn
|
conn
|
||||||
|> put_resp_content_type("application/json")
|
|> put_resp_content_type("application/json")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user