Initial implementation details

This commit is contained in:
Broks Randolfs Gailītis 2025-08-20 10:10:19 +03:00
parent c2f0fd6fcd
commit f31bd0b3b4
12 changed files with 496 additions and 79 deletions

View File

@ -1,21 +1,30 @@
# MessageServer
# Message server
**TODO: Add description**
## Running
### Starting servers
Server ID (`SERVER_ID`) and remote servers (`SERVERS`) are provided as environment variables.
## Installation
If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `message_server` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:message_server, "~> 0.1.0"}
]
end
#### Server 1
```bash
SERVER_ID=1 SERVERS="2:localhost:4001" PORT=4000 mix run --no-halt
```
Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at <https://hexdocs.pm/message_server>.
#### Server 2
```bash
SERVER_ID=2 SERVERS="1:localhost:4000" PORT=4001 mix run --no-halt
```
### Sending messages
#### From Server 1
```curl
curl -X POST http://localhost:4000/api/messages -H "Content-Type: application/json" -d '{"from": "1-bender", "to": "1-zoidberg", "message": "Dreams are where elves and gnomes live!"}'
```
```curl
curl -X POST http://localhost:4000/api/messages -H "Content-Type: application/json" -d '{"from": "1-bender", "to": "2-nibbler", "message": "Bite my shiny metal ass!"}'
```
#### From Server 2
```curl
curl -X POST http://localhost:4001/api/messages -H "Content-Type: application/json" -d '{"from": "2-nibbler", "to": "1-bender", "message": "I am Nibbler, agent of the Nibblonian fleet."}'
```

View File

@ -12,7 +12,9 @@ defmodule MessageServer.Application do
Logger.info("Known servers: #{inspect(servers)}")
children = [
{MessageServer.Storage, server_id}
{MessageServer.ServerRegistry, {server_id, servers}},
{MessageServer.Storage, {server_id}},
{Bandit, plug: MessageServer.Router, port: port}
]
opts = [strategy: :one_for_one, name: MessageServer.Supervisor]
@ -21,8 +23,7 @@ defmodule MessageServer.Application do
@spec get_server_id() :: String.t()
def get_server_id() do
System.get_env("SERVER_ID") ||
raise "SERVER_ID is required"
System.get_env("SERVER_ID", "1")
end
@spec get_port() :: integer()

View File

@ -0,0 +1,68 @@
defmodule MessageServer.MessageHandler do
require Logger
alias MessageServer.{
MessageRequest,
RemoteClient,
ServerRegistry,
Storage
}
@spec handle_message(MessageRequest.t()) :: :ok | {:error, String.t()}
def handle_message(%MessageRequest{from: from, to: to, message: message}) do
with {:ok, from_server} <- extract_server_id(from),
{:ok, to_server} <- extract_server_id(to),
:ok <- validate_sender_ownership(from_server),
:ok <- route_message(to_server, from, to, message) do
Logger.info("Message routed successfully from #{from} to #{to}")
:ok
else
{:error, reason} = error ->
Logger.warning("Message routing failed: #{reason}")
error
end
end
@spec extract_server_id(String.t()) :: {:ok, String.t()} | {:error, String.t()}
defp extract_server_id(user_id) do
case String.split(user_id, "-", parts: 2) do
[server_id, _user_part] -> {:ok, server_id}
_ -> {:error, "Invalid user ID format: #{user_id}"}
end
end
@spec validate_sender_ownership(String.t()) :: :ok | {:error, String.t()}
defp validate_sender_ownership(sender_server) do
local_server = ServerRegistry.get_server_id()
if sender_server == local_server do
:ok
else
{:error, "Cannot send messages on behalf of users from other servers"}
end
end
@spec route_message(String.t(), String.t(), String.t(), String.t()) ::
:ok | {:error, String.t()}
defp route_message(to_server, from, to, message) do
local_server = ServerRegistry.get_server_id()
if to_server == local_server do
handle_local_message(from, to, message)
else
handle_remote_message(to_server, from, to, message)
end
end
@spec handle_local_message(String.t(), String.t(), String.t()) :: :ok | {:error, String.t()}
defp handle_local_message(from, to, message) do
Storage.append_message(to, from, message)
end
@spec handle_remote_message(String.t(), String.t(), String.t(), String.t()) ::
:ok | {:error, String.t()}
defp handle_remote_message(to_server, from, to, message) do
payload = %MessageRequest{from: from, to: to, message: message}
RemoteClient.send_message_to_server(to_server, payload)
end
end

View File

@ -0,0 +1,22 @@
defmodule MessageServer.MessageRequest do
@type t :: %__MODULE__{
from: String.t(),
to: String.t(),
message: String.t()
}
defstruct [:from, :to, :message]
@spec new(String.t(), String.t(), String.t()) :: t()
def new(from, to, message) do
%__MODULE__{from: from, to: to, message: message}
end
@spec valid?(t()) :: boolean()
def valid?(%__MODULE__{from: from, to: to, message: message})
when is_binary(from) and is_binary(to) and is_binary(message) do
String.trim(from) != "" and String.trim(to) != "" and String.trim(message) != ""
end
def valid?(_), do: false
end

View File

@ -0,0 +1,59 @@
defmodule MessageServer.RemoteClient do
require Logger
alias MessageServer.{MessageRequest, ServerRegistry}
@spec send_message_to_server(String.t(), MessageRequest.t()) :: :ok | {:error, String.t()}
def send_message_to_server(target_server_id, payload) do
with {:ok, server_info} <- ServerRegistry.get_server_info(target_server_id),
{:ok, serialized_payload} <- serialize_payload(payload),
{:ok, response} <- make_request(server_info, serialized_payload) do
handle_response(response)
else
{:error, :not_found} ->
{:error, "Unknown server: #{target_server_id}"}
{:error, reason} = error ->
Logger.error("Failed to send message to server #{target_server_id}: #{reason}")
error
end
end
@spec serialize_payload(MessageRequest.t()) :: {:ok, binary()} | {:error, String.t()}
defp serialize_payload(payload) do
try do
serialized = :erlang.term_to_binary(payload)
{:ok, serialized}
rescue
error -> {:error, "Serialization failed: #{inspect(error)}"}
end
end
@spec make_request(map(), binary()) :: {:ok, Req.Response.t()} | {:error, String.t()}
defp make_request(%{host: host, port: port}, serialized_payload) do
url = "http://#{host}:#{port}/api/remote/messages"
case Req.post(url,
body: serialized_payload,
headers: [{"content-type", "application/octet-stream"}]
) do
{:ok, %Req.Response{status: status} = response} when status in 200..299 ->
{:ok, response}
{:ok, %Req.Response{status: status}} ->
{:error, "HTTP #{status}"}
{:error, reason} ->
{:error, "Request failed: #{inspect(reason)}"}
end
end
@spec handle_response(Req.Response.t()) :: :ok | {:error, String.t()}
defp handle_response(%Req.Response{body: body}) do
case body do
%{"status" => "success"} -> :ok
%{"error" => error} -> {:error, error}
_ -> {:error, "Invalid response format"}
end
end
end

View File

@ -0,0 +1,30 @@
defmodule MessageServer.RemoteHandler do
require Logger
@spec handle_remote_message(map()) :: :ok | {:error, String.t()}
def handle_remote_message(%{from: from, to: to, message: message}) do
with {:ok, to_server} <- extract_server_id(to),
:ok <- validate_recipient_ownership(to_server) do
MessageServer.Storage.append_message(to, from, message)
end
end
@spec extract_server_id(String.t()) :: {:ok, String.t()} | {:error, String.t()}
defp extract_server_id(user_id) do
case String.split(user_id, "-", parts: 2) do
[server_id, _user_part] -> {:ok, server_id}
_ -> {:error, "Invalid user ID format: #{user_id}"}
end
end
@spec validate_recipient_ownership(String.t()) :: :ok | {:error, String.t()}
defp validate_recipient_ownership(recipient_server) do
local_server = MessageServer.ServerRegistry.get_server_id()
if recipient_server == local_server do
:ok
else
{:error, "Cannot deliver messages to users from other servers"}
end
end
end

View File

@ -0,0 +1,118 @@
defmodule MessageServer.Router do
use Plug.Router
require Logger
alias MessageServer.{
MessageRequest,
RemoteHandler,
MessageHandler
}
plug(:match)
plug(:debug_content_type)
plug(Plug.Parsers,
parsers: [:urlencoded, :multipart, :json],
json_decoder: Jason,
pass: ["application/octet-stream"]
)
plug(:dispatch)
def json_request?(conn) do
conn.request_path != "/api/remote/messages"
end
post "/api/messages" do
with {:ok, message} <- validate_message_request(conn.body_params),
:ok <- MessageHandler.handle_message(message) do
conn
|> put_resp_content_type("application/json")
|> send_resp(200, Jason.encode!(%{status: "success"}))
else
{:error, reason} ->
Logger.warning("Message handling failed: #{inspect(reason)}")
conn
|> put_resp_content_type("application/json")
|> send_resp(400, Jason.encode!(%{error: reason}))
end
end
post "/api/remote/messages" do
case handle_etf_body(conn) do
{:ok, payload} ->
payload
|> RemoteHandler.handle_remote_message()
|> case do
:ok ->
conn
|> put_resp_content_type("application/json")
|> send_resp(200, Jason.encode!(%{status: "success"}))
{:error, reason} ->
Logger.warning("Remote message handling failed: #{inspect(reason)}")
conn
|> put_resp_content_type("application/json")
|> send_resp(400, Jason.encode!(%{error: reason}))
end
{:error, reason} ->
conn
|> put_resp_content_type("application/json")
|> send_resp(400, Jason.encode!(%{error: reason, message: "Invalid request body"}))
end
end
match _ do
send_resp(conn, 404, "Not found")
end
defp debug_content_type(conn, _opts) do
conn
|> Plug.Conn.get_req_header("content-type")
|> IO.inspect(label: "Received Content-Type")
conn
end
defp handle_etf_body(conn) do
with {:ok, body, _conn} <- Plug.Conn.read_body(conn),
{:ok, decoded} <- decode_etf(body) do
{:ok, decoded}
else
{:error, reason} -> {:error, "Failed to read body: #{reason}"}
{:more, _partial, _conn} -> {:error, "Body too large"}
end
end
@spec decode_etf(binary()) :: {:ok, map()} | {:error, String.t()}
defp decode_etf(binary_body) do
try do
payload = :erlang.binary_to_term(binary_body, [:safe])
{:ok, payload}
rescue
error -> {:error, "Deserialization failed: #{inspect(error)}"}
end
end
@spec validate_message_request(map()) :: {:ok, map()} | {:error, String.t()}
defp validate_message_request(params) do
required_fields = ["from", "to", "message"]
case Enum.all?(required_fields, &Map.has_key?(params, &1)) do
true ->
{:ok,
%MessageRequest{
from: params["from"],
to: params["to"],
message: params["message"]
}}
false ->
{:error, "Missing required fields: from, to, message"}
end
end
end

View File

@ -0,0 +1,49 @@
defmodule MessageServer.ServerRegistry do
use Agent
@moduledoc """
Simple registry for server information using Agent for lightweight state management.
"""
@type server_info :: %{host: String.t(), port: integer()}
@type servers_map :: %{String.t() => server_info()}
@type state :: %{server_id: String.t(), servers: servers_map()}
def start_link({server_id, servers, agent_name}) do
Agent.start_link(
fn -> %{server_id: server_id, servers: servers} end,
name: agent_name
)
end
def start_link({server_id, servers}) do
start_link({server_id, servers, __MODULE__})
end
@spec get_server_id() :: String.t()
def get_server_id(agent \\ __MODULE__) do
Agent.get(agent, & &1.server_id)
end
@spec get_server_info(String.t()) :: {:ok, server_info()} | {:error, :not_found}
def get_server_info(server_id, agent \\ __MODULE__) do
Agent.get(agent, fn %{servers: servers} ->
case Map.get(servers, server_id) do
nil -> {:error, :not_found}
info -> {:ok, info}
end
end)
end
@spec list_servers() :: servers_map()
def list_servers(agent \\ __MODULE__) do
Agent.get(agent, & &1.servers)
end
@spec get_local_server_info() :: %{id: String.t(), servers: servers_map()}
def get_local_server_info(agent \\ __MODULE__) do
Agent.get(agent, fn state ->
%{id: state.server_id, servers: state.servers}
end)
end
end

View File

@ -2,8 +2,12 @@ defmodule MessageServer.Storage do
use GenServer
require Logger
def start_link(server_id) do
GenServer.start_link(__MODULE__, server_id, name: __MODULE__)
def start_link({server_id, server_name}) do
GenServer.start_link(__MODULE__, server_id, name: server_name)
end
def start_link({server_id}) do
start_link({server_id, __MODULE__})
end
@spec append_message(String.t(), String.t(), String.t()) :: :ok | {:error, String.t()}

View File

@ -6,6 +6,10 @@ defmodule MessageServer.MixProject do
app: :message_server,
version: "0.1.0",
elixir: "~> 1.18",
dialyzer: [
plt_add_apps: [:mix, :ex_unit],
flags: [:error_handling, :race_conditions, :underspecs]
],
start_permanent: Mix.env() == :prod,
deps: deps()
]

View File

@ -0,0 +1,70 @@
defmodule MessageServer.ServerRegistryTest do
use ExUnit.Case
alias MessageServer.ServerRegistry
@test_server_id "99"
@test_servers %{
"10" => %{host: "amy.planetexpress.com", port: 4001},
"11" => %{host: "omicron-persei-8.galaxy", port: 8080}
}
setup do
pid = start_supervised!({ServerRegistry, {@test_server_id, @test_servers, :test_server}})
%{registry: pid}
end
test "returns correct server id" do
assert ServerRegistry.get_server_id(:test_server) == @test_server_id
end
test "returns server info for known server" do
@test_servers
|> Map.keys()
|> Enum.each(fn key ->
config = @test_servers[key]
assert {:ok, config} == ServerRegistry.get_server_info(key, :test_server)
end)
end
test "returns error for unknown server" do
assert {:error, :not_found} = ServerRegistry.get_server_info("1234567", :test_server)
end
test "lists all servers" do
servers = ServerRegistry.list_servers(:test_server)
assert servers == @test_servers
# local server not in list
refute Map.has_key?(servers, @test_server_id)
end
test "returns local server info" do
info = ServerRegistry.get_local_server_info(:test_server)
assert info.id == @test_server_id
assert info.servers == @test_servers
end
test "server info contains required fields" do
server = @test_servers |> Map.keys() |> Enum.at(0)
{:ok, server_info} = ServerRegistry.get_server_info(server, :test_server)
assert Map.has_key?(server_info, :host)
assert Map.has_key?(server_info, :port)
assert is_binary(server_info.host)
assert is_integer(server_info.port)
end
test "multiple calls return consistent results" do
# Call multiple times to ensure Agent state is stable
assert ServerRegistry.get_server_id(:test_server) == @test_server_id
assert ServerRegistry.get_server_id(:test_server) == @test_server_id
server = @test_servers |> Map.keys() |> Enum.at(1)
{:ok, info1} = ServerRegistry.get_server_info(server, :test_server)
{:ok, info2} = ServerRegistry.get_server_info(server, :test_server)
assert info1 == info2
end
end

View File

@ -1,116 +1,99 @@
defmodule MessageServer.StorageTest do
use ExUnit.Case
import ExUnit.CaptureLog
@test_server_id "test_server"
alias MessageServer.Storage
@test_server_id "1"
@test_storage_dir "storage/server_#{@test_server_id}"
setup do
# clean up any existing test storage
File.rm_rf!(@test_storage_dir)
# start the Storage GenServer for testing
{:ok, pid} = MessageServer.Storage.start_link(@test_server_id)
pid = start_supervised!({Storage, {@test_server_id, :test_storage}})
on_exit(fn ->
# clean up after test
if Process.alive?(pid), do: GenServer.stop(pid)
File.rm_rf!(@test_storage_dir)
end)
%{storage_pid: pid}
%{storage: pid, storage_dir: @test_storage_dir}
end
test "creates storage directory on startup" do
assert File.exists?(@test_storage_dir)
assert File.dir?(@test_storage_dir)
test "creates storage directory on startup", %{storage_dir: storage_dir} do
IO.puts(storage_dir)
assert File.exists?(storage_dir)
assert File.dir?(storage_dir)
end
test "appends message to user file" do
user_id = "test_server-alice"
from_user = "test_server-bob"
message = "Hello Alice!"
test "appends message to user file", %{storage_dir: storage_dir} do
user_id = "zoidberg"
from_user = "bender"
message = "let's go steal some stuff"
assert :ok = MessageServer.Storage.append_message(user_id, from_user, message)
assert :ok = Storage.append_message(user_id, from_user, message)
file_path = Path.join(@test_storage_dir, "#{user_id}.txt")
file_path = Path.join(storage_dir, "#{user_id}.txt")
assert File.exists?(file_path)
content = File.read!(file_path)
assert content == "#{from_user}: #{message}\n"
end
test "appends multiple messages to same user file" do
user_id = "test_server-alice"
test "appends multiple messages to same user file", %{storage_dir: storage_dir} do
user_id = "leela"
assert :ok = MessageServer.Storage.append_message(user_id, "test_server-bob", "First message")
assert :ok = MessageServer.Storage.append_message(user_id, "fry", "i love you leela")
assert :ok =
MessageServer.Storage.append_message(
user_id,
"test_server-charlie",
"Second message"
"bender",
"bite my shiny metal ass!"
)
file_path = Path.join(@test_storage_dir, "#{user_id}.txt")
file_path = Path.join(storage_dir, "#{user_id}.txt")
content = File.read!(file_path)
expected_content = """
test_server-bob: First message
test_server-charlie: Second message
fry: i love you leela
bender: bite my shiny metal ass!
"""
assert content == expected_content
end
test "creates separate files for different users" do
test "creates separate files for different users", %{storage_dir: storage_dir} do
assert :ok =
MessageServer.Storage.append_message(
"test_server-alice",
"test_server-bob",
"Hi Alice"
"fry",
"professor",
"good news everyone!"
)
assert :ok =
MessageServer.Storage.append_message(
"test_server-charlie",
"test_server-bob",
"Hi Charlie"
"nibbler",
"zoidberg",
"hooray! people are paying attention to me!"
)
alice_file = Path.join(@test_storage_dir, "test_server-alice.txt")
charlie_file = Path.join(@test_storage_dir, "test_server-charlie.txt")
fry_file = Path.join(storage_dir, "fry.txt")
nibbler_file = Path.join(storage_dir, "nibbler.txt")
assert File.exists?(alice_file)
assert File.exists?(charlie_file)
assert File.exists?(fry_file)
assert File.exists?(nibbler_file)
assert File.read!(alice_file) == "test_server-bob: Hi Alice\n"
assert File.read!(charlie_file) == "test_server-bob: Hi Charlie\n"
assert File.read!(fry_file) == "professor: good news everyone!\n"
assert File.read!(nibbler_file) == "zoidberg: hooray! people are paying attention to me!\n"
end
test "handles messages with special characters" do
user_id = "test_server-alice"
from_user = "test_server-bob"
message = "Hello! 🎉 Special chars: @#$%^&*()"
test "handles messages with special characters", %{storage_dir: storage_dir} do
user_id = "zapp"
from_user = "zoidberg"
message = "why not zoidberg? 🦀 (╯°□°)╯︵ ┻━┻"
assert :ok = MessageServer.Storage.append_message(user_id, from_user, message)
file_path = Path.join(@test_storage_dir, "#{user_id}.txt")
file_path = Path.join(storage_dir, "#{user_id}.txt")
content = File.read!(file_path)
assert content == "#{from_user}: #{message}\n"
end
test "logs successful message storage" do
user_id = "test_server-alice"
from_user = "test_server-bob"
message = "Test message"
log_output =
capture_log(fn ->
assert :ok = MessageServer.Storage.append_message(user_id, from_user, message)
end)
assert log_output =~ "Message stored for user #{user_id}"
end
end