From f31bd0b3b42a0276b2223b1f6dcfd32accdd3945 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Broks=20Randolfs=20Gail=C4=ABtis?= Date: Wed, 20 Aug 2025 10:10:19 +0300 Subject: [PATCH] Initial implementation details --- README.md | 41 ++++--- lib/message_server/application.ex | 7 +- lib/message_server/message_handler.ex | 68 +++++++++++ lib/message_server/message_request.ex | 22 ++++ lib/message_server/remote_client.ex | 59 ++++++++++ lib/message_server/remote_handler.ex | 30 +++++ lib/message_server/router.ex | 118 +++++++++++++++++++ lib/message_server/server_registry.ex | 49 ++++++++ lib/message_server/storage.ex | 8 +- mix.exs | 4 + test/message_server/server_registry_test.exs | 70 +++++++++++ test/message_server/storage_test.exs | 99 +++++++--------- 12 files changed, 496 insertions(+), 79 deletions(-) create mode 100644 lib/message_server/message_handler.ex create mode 100644 lib/message_server/message_request.ex create mode 100644 lib/message_server/remote_client.ex create mode 100644 lib/message_server/remote_handler.ex create mode 100644 lib/message_server/router.ex create mode 100644 lib/message_server/server_registry.ex create mode 100644 test/message_server/server_registry_test.exs diff --git a/README.md b/README.md index 456f2e8..ac0ed82 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,30 @@ -# MessageServer +# Message server -**TODO: Add description** +## Running +### Starting servers +Server ID (`SERVER_ID`) and remote servers (`SERVERS`) are provided as environment variables. -## Installation - -If [available in Hex](https://hex.pm/docs/publish), the package can be installed -by adding `message_server` to your list of dependencies in `mix.exs`: - -```elixir -def deps do - [ - {:message_server, "~> 0.1.0"} - ] -end +#### Server 1 +```bash +SERVER_ID=1 SERVERS="2:localhost:4001" PORT=4000 mix run --no-halt ``` -Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc) -and published on [HexDocs](https://hexdocs.pm). Once published, the docs can -be found at . +#### Server 2 +```bash +SERVER_ID=2 SERVERS="1:localhost:4000" PORT=4001 mix run --no-halt +``` +### Sending messages +#### From Server 1 +```curl +curl -X POST http://localhost:4000/api/messages -H "Content-Type: application/json" -d '{"from": "1-bender", "to": "1-zoidberg", "message": "Dreams are where elves and gnomes live!"}' +``` + +```curl +curl -X POST http://localhost:4000/api/messages -H "Content-Type: application/json" -d '{"from": "1-bender", "to": "2-nibbler", "message": "Bite my shiny metal ass!"}' +``` + +#### From Server 2 +```curl +curl -X POST http://localhost:4001/api/messages -H "Content-Type: application/json" -d '{"from": "2-nibbler", "to": "1-bender", "message": "I am Nibbler, agent of the Nibblonian fleet."}' +``` diff --git a/lib/message_server/application.ex b/lib/message_server/application.ex index 9511253..4d3c1d7 100644 --- a/lib/message_server/application.ex +++ b/lib/message_server/application.ex @@ -12,7 +12,9 @@ defmodule MessageServer.Application do Logger.info("Known servers: #{inspect(servers)}") children = [ - {MessageServer.Storage, server_id} + {MessageServer.ServerRegistry, {server_id, servers}}, + {MessageServer.Storage, {server_id}}, + {Bandit, plug: MessageServer.Router, port: port} ] opts = [strategy: :one_for_one, name: MessageServer.Supervisor] @@ -21,8 +23,7 @@ defmodule MessageServer.Application do @spec get_server_id() :: String.t() def get_server_id() do - System.get_env("SERVER_ID") || - raise "SERVER_ID is required" + System.get_env("SERVER_ID", "1") end @spec get_port() :: integer() diff --git a/lib/message_server/message_handler.ex b/lib/message_server/message_handler.ex new file mode 100644 index 0000000..b8a8260 --- /dev/null +++ b/lib/message_server/message_handler.ex @@ -0,0 +1,68 @@ +defmodule MessageServer.MessageHandler do + require Logger + + alias MessageServer.{ + MessageRequest, + RemoteClient, + ServerRegistry, + Storage + } + + @spec handle_message(MessageRequest.t()) :: :ok | {:error, String.t()} + def handle_message(%MessageRequest{from: from, to: to, message: message}) do + with {:ok, from_server} <- extract_server_id(from), + {:ok, to_server} <- extract_server_id(to), + :ok <- validate_sender_ownership(from_server), + :ok <- route_message(to_server, from, to, message) do + Logger.info("Message routed successfully from #{from} to #{to}") + :ok + else + {:error, reason} = error -> + Logger.warning("Message routing failed: #{reason}") + error + end + end + + @spec extract_server_id(String.t()) :: {:ok, String.t()} | {:error, String.t()} + defp extract_server_id(user_id) do + case String.split(user_id, "-", parts: 2) do + [server_id, _user_part] -> {:ok, server_id} + _ -> {:error, "Invalid user ID format: #{user_id}"} + end + end + + @spec validate_sender_ownership(String.t()) :: :ok | {:error, String.t()} + defp validate_sender_ownership(sender_server) do + local_server = ServerRegistry.get_server_id() + + if sender_server == local_server do + :ok + else + {:error, "Cannot send messages on behalf of users from other servers"} + end + end + + @spec route_message(String.t(), String.t(), String.t(), String.t()) :: + :ok | {:error, String.t()} + defp route_message(to_server, from, to, message) do + local_server = ServerRegistry.get_server_id() + + if to_server == local_server do + handle_local_message(from, to, message) + else + handle_remote_message(to_server, from, to, message) + end + end + + @spec handle_local_message(String.t(), String.t(), String.t()) :: :ok | {:error, String.t()} + defp handle_local_message(from, to, message) do + Storage.append_message(to, from, message) + end + + @spec handle_remote_message(String.t(), String.t(), String.t(), String.t()) :: + :ok | {:error, String.t()} + defp handle_remote_message(to_server, from, to, message) do + payload = %MessageRequest{from: from, to: to, message: message} + RemoteClient.send_message_to_server(to_server, payload) + end +end diff --git a/lib/message_server/message_request.ex b/lib/message_server/message_request.ex new file mode 100644 index 0000000..b0bbdfd --- /dev/null +++ b/lib/message_server/message_request.ex @@ -0,0 +1,22 @@ +defmodule MessageServer.MessageRequest do + @type t :: %__MODULE__{ + from: String.t(), + to: String.t(), + message: String.t() + } + + defstruct [:from, :to, :message] + + @spec new(String.t(), String.t(), String.t()) :: t() + def new(from, to, message) do + %__MODULE__{from: from, to: to, message: message} + end + + @spec valid?(t()) :: boolean() + def valid?(%__MODULE__{from: from, to: to, message: message}) + when is_binary(from) and is_binary(to) and is_binary(message) do + String.trim(from) != "" and String.trim(to) != "" and String.trim(message) != "" + end + + def valid?(_), do: false +end diff --git a/lib/message_server/remote_client.ex b/lib/message_server/remote_client.ex new file mode 100644 index 0000000..72fc8ac --- /dev/null +++ b/lib/message_server/remote_client.ex @@ -0,0 +1,59 @@ +defmodule MessageServer.RemoteClient do + require Logger + + alias MessageServer.{MessageRequest, ServerRegistry} + + @spec send_message_to_server(String.t(), MessageRequest.t()) :: :ok | {:error, String.t()} + def send_message_to_server(target_server_id, payload) do + with {:ok, server_info} <- ServerRegistry.get_server_info(target_server_id), + {:ok, serialized_payload} <- serialize_payload(payload), + {:ok, response} <- make_request(server_info, serialized_payload) do + handle_response(response) + else + {:error, :not_found} -> + {:error, "Unknown server: #{target_server_id}"} + + {:error, reason} = error -> + Logger.error("Failed to send message to server #{target_server_id}: #{reason}") + error + end + end + + @spec serialize_payload(MessageRequest.t()) :: {:ok, binary()} | {:error, String.t()} + defp serialize_payload(payload) do + try do + serialized = :erlang.term_to_binary(payload) + {:ok, serialized} + rescue + error -> {:error, "Serialization failed: #{inspect(error)}"} + end + end + + @spec make_request(map(), binary()) :: {:ok, Req.Response.t()} | {:error, String.t()} + defp make_request(%{host: host, port: port}, serialized_payload) do + url = "http://#{host}:#{port}/api/remote/messages" + + case Req.post(url, + body: serialized_payload, + headers: [{"content-type", "application/octet-stream"}] + ) do + {:ok, %Req.Response{status: status} = response} when status in 200..299 -> + {:ok, response} + + {:ok, %Req.Response{status: status}} -> + {:error, "HTTP #{status}"} + + {:error, reason} -> + {:error, "Request failed: #{inspect(reason)}"} + end + end + + @spec handle_response(Req.Response.t()) :: :ok | {:error, String.t()} + defp handle_response(%Req.Response{body: body}) do + case body do + %{"status" => "success"} -> :ok + %{"error" => error} -> {:error, error} + _ -> {:error, "Invalid response format"} + end + end +end diff --git a/lib/message_server/remote_handler.ex b/lib/message_server/remote_handler.ex new file mode 100644 index 0000000..6721aaf --- /dev/null +++ b/lib/message_server/remote_handler.ex @@ -0,0 +1,30 @@ +defmodule MessageServer.RemoteHandler do + require Logger + + @spec handle_remote_message(map()) :: :ok | {:error, String.t()} + def handle_remote_message(%{from: from, to: to, message: message}) do + with {:ok, to_server} <- extract_server_id(to), + :ok <- validate_recipient_ownership(to_server) do + MessageServer.Storage.append_message(to, from, message) + end + end + + @spec extract_server_id(String.t()) :: {:ok, String.t()} | {:error, String.t()} + defp extract_server_id(user_id) do + case String.split(user_id, "-", parts: 2) do + [server_id, _user_part] -> {:ok, server_id} + _ -> {:error, "Invalid user ID format: #{user_id}"} + end + end + + @spec validate_recipient_ownership(String.t()) :: :ok | {:error, String.t()} + defp validate_recipient_ownership(recipient_server) do + local_server = MessageServer.ServerRegistry.get_server_id() + + if recipient_server == local_server do + :ok + else + {:error, "Cannot deliver messages to users from other servers"} + end + end +end diff --git a/lib/message_server/router.ex b/lib/message_server/router.ex new file mode 100644 index 0000000..7d6cfb3 --- /dev/null +++ b/lib/message_server/router.ex @@ -0,0 +1,118 @@ +defmodule MessageServer.Router do + use Plug.Router + require Logger + + alias MessageServer.{ + MessageRequest, + RemoteHandler, + MessageHandler + } + + plug(:match) + + plug(:debug_content_type) + + plug(Plug.Parsers, + parsers: [:urlencoded, :multipart, :json], + json_decoder: Jason, + pass: ["application/octet-stream"] + ) + + plug(:dispatch) + + def json_request?(conn) do + conn.request_path != "/api/remote/messages" + end + + post "/api/messages" do + with {:ok, message} <- validate_message_request(conn.body_params), + :ok <- MessageHandler.handle_message(message) do + conn + |> put_resp_content_type("application/json") + |> send_resp(200, Jason.encode!(%{status: "success"})) + else + {:error, reason} -> + Logger.warning("Message handling failed: #{inspect(reason)}") + + conn + |> put_resp_content_type("application/json") + |> send_resp(400, Jason.encode!(%{error: reason})) + end + end + + post "/api/remote/messages" do + case handle_etf_body(conn) do + {:ok, payload} -> + payload + |> RemoteHandler.handle_remote_message() + |> case do + :ok -> + conn + |> put_resp_content_type("application/json") + |> send_resp(200, Jason.encode!(%{status: "success"})) + + {:error, reason} -> + Logger.warning("Remote message handling failed: #{inspect(reason)}") + + conn + |> put_resp_content_type("application/json") + |> send_resp(400, Jason.encode!(%{error: reason})) + end + + {:error, reason} -> + conn + |> put_resp_content_type("application/json") + |> send_resp(400, Jason.encode!(%{error: reason, message: "Invalid request body"})) + end + end + + match _ do + send_resp(conn, 404, "Not found") + end + + defp debug_content_type(conn, _opts) do + conn + |> Plug.Conn.get_req_header("content-type") + |> IO.inspect(label: "Received Content-Type") + + conn + end + + defp handle_etf_body(conn) do + with {:ok, body, _conn} <- Plug.Conn.read_body(conn), + {:ok, decoded} <- decode_etf(body) do + {:ok, decoded} + else + {:error, reason} -> {:error, "Failed to read body: #{reason}"} + {:more, _partial, _conn} -> {:error, "Body too large"} + end + end + + @spec decode_etf(binary()) :: {:ok, map()} | {:error, String.t()} + defp decode_etf(binary_body) do + try do + payload = :erlang.binary_to_term(binary_body, [:safe]) + {:ok, payload} + rescue + error -> {:error, "Deserialization failed: #{inspect(error)}"} + end + end + + @spec validate_message_request(map()) :: {:ok, map()} | {:error, String.t()} + defp validate_message_request(params) do + required_fields = ["from", "to", "message"] + + case Enum.all?(required_fields, &Map.has_key?(params, &1)) do + true -> + {:ok, + %MessageRequest{ + from: params["from"], + to: params["to"], + message: params["message"] + }} + + false -> + {:error, "Missing required fields: from, to, message"} + end + end +end diff --git a/lib/message_server/server_registry.ex b/lib/message_server/server_registry.ex new file mode 100644 index 0000000..dd4139a --- /dev/null +++ b/lib/message_server/server_registry.ex @@ -0,0 +1,49 @@ +defmodule MessageServer.ServerRegistry do + use Agent + + @moduledoc """ + Simple registry for server information using Agent for lightweight state management. + """ + + @type server_info :: %{host: String.t(), port: integer()} + @type servers_map :: %{String.t() => server_info()} + @type state :: %{server_id: String.t(), servers: servers_map()} + + def start_link({server_id, servers, agent_name}) do + Agent.start_link( + fn -> %{server_id: server_id, servers: servers} end, + name: agent_name + ) + end + + def start_link({server_id, servers}) do + start_link({server_id, servers, __MODULE__}) + end + + @spec get_server_id() :: String.t() + def get_server_id(agent \\ __MODULE__) do + Agent.get(agent, & &1.server_id) + end + + @spec get_server_info(String.t()) :: {:ok, server_info()} | {:error, :not_found} + def get_server_info(server_id, agent \\ __MODULE__) do + Agent.get(agent, fn %{servers: servers} -> + case Map.get(servers, server_id) do + nil -> {:error, :not_found} + info -> {:ok, info} + end + end) + end + + @spec list_servers() :: servers_map() + def list_servers(agent \\ __MODULE__) do + Agent.get(agent, & &1.servers) + end + + @spec get_local_server_info() :: %{id: String.t(), servers: servers_map()} + def get_local_server_info(agent \\ __MODULE__) do + Agent.get(agent, fn state -> + %{id: state.server_id, servers: state.servers} + end) + end +end diff --git a/lib/message_server/storage.ex b/lib/message_server/storage.ex index 9fe7ec6..64ecf93 100644 --- a/lib/message_server/storage.ex +++ b/lib/message_server/storage.ex @@ -2,8 +2,12 @@ defmodule MessageServer.Storage do use GenServer require Logger - def start_link(server_id) do - GenServer.start_link(__MODULE__, server_id, name: __MODULE__) + def start_link({server_id, server_name}) do + GenServer.start_link(__MODULE__, server_id, name: server_name) + end + + def start_link({server_id}) do + start_link({server_id, __MODULE__}) end @spec append_message(String.t(), String.t(), String.t()) :: :ok | {:error, String.t()} diff --git a/mix.exs b/mix.exs index 4791c33..8b62193 100644 --- a/mix.exs +++ b/mix.exs @@ -6,6 +6,10 @@ defmodule MessageServer.MixProject do app: :message_server, version: "0.1.0", elixir: "~> 1.18", + dialyzer: [ + plt_add_apps: [:mix, :ex_unit], + flags: [:error_handling, :race_conditions, :underspecs] + ], start_permanent: Mix.env() == :prod, deps: deps() ] diff --git a/test/message_server/server_registry_test.exs b/test/message_server/server_registry_test.exs new file mode 100644 index 0000000..f0cb428 --- /dev/null +++ b/test/message_server/server_registry_test.exs @@ -0,0 +1,70 @@ +defmodule MessageServer.ServerRegistryTest do + use ExUnit.Case + + alias MessageServer.ServerRegistry + + @test_server_id "99" + @test_servers %{ + "10" => %{host: "amy.planetexpress.com", port: 4001}, + "11" => %{host: "omicron-persei-8.galaxy", port: 8080} + } + + setup do + pid = start_supervised!({ServerRegistry, {@test_server_id, @test_servers, :test_server}}) + + %{registry: pid} + end + + test "returns correct server id" do + assert ServerRegistry.get_server_id(:test_server) == @test_server_id + end + + test "returns server info for known server" do + @test_servers + |> Map.keys() + |> Enum.each(fn key -> + config = @test_servers[key] + assert {:ok, config} == ServerRegistry.get_server_info(key, :test_server) + end) + end + + test "returns error for unknown server" do + assert {:error, :not_found} = ServerRegistry.get_server_info("1234567", :test_server) + end + + test "lists all servers" do + servers = ServerRegistry.list_servers(:test_server) + + assert servers == @test_servers + # local server not in list + refute Map.has_key?(servers, @test_server_id) + end + + test "returns local server info" do + info = ServerRegistry.get_local_server_info(:test_server) + + assert info.id == @test_server_id + assert info.servers == @test_servers + end + + test "server info contains required fields" do + server = @test_servers |> Map.keys() |> Enum.at(0) + {:ok, server_info} = ServerRegistry.get_server_info(server, :test_server) + + assert Map.has_key?(server_info, :host) + assert Map.has_key?(server_info, :port) + assert is_binary(server_info.host) + assert is_integer(server_info.port) + end + + test "multiple calls return consistent results" do + # Call multiple times to ensure Agent state is stable + assert ServerRegistry.get_server_id(:test_server) == @test_server_id + assert ServerRegistry.get_server_id(:test_server) == @test_server_id + + server = @test_servers |> Map.keys() |> Enum.at(1) + {:ok, info1} = ServerRegistry.get_server_info(server, :test_server) + {:ok, info2} = ServerRegistry.get_server_info(server, :test_server) + assert info1 == info2 + end +end diff --git a/test/message_server/storage_test.exs b/test/message_server/storage_test.exs index 98dc778..e678721 100644 --- a/test/message_server/storage_test.exs +++ b/test/message_server/storage_test.exs @@ -1,116 +1,99 @@ defmodule MessageServer.StorageTest do use ExUnit.Case - import ExUnit.CaptureLog - @test_server_id "test_server" + alias MessageServer.Storage + + @test_server_id "1" @test_storage_dir "storage/server_#{@test_server_id}" setup do - # clean up any existing test storage - File.rm_rf!(@test_storage_dir) - - # start the Storage GenServer for testing - {:ok, pid} = MessageServer.Storage.start_link(@test_server_id) + pid = start_supervised!({Storage, {@test_server_id, :test_storage}}) on_exit(fn -> - # clean up after test - if Process.alive?(pid), do: GenServer.stop(pid) File.rm_rf!(@test_storage_dir) end) - %{storage_pid: pid} + %{storage: pid, storage_dir: @test_storage_dir} end - test "creates storage directory on startup" do - assert File.exists?(@test_storage_dir) - assert File.dir?(@test_storage_dir) + test "creates storage directory on startup", %{storage_dir: storage_dir} do + IO.puts(storage_dir) + assert File.exists?(storage_dir) + assert File.dir?(storage_dir) end - test "appends message to user file" do - user_id = "test_server-alice" - from_user = "test_server-bob" - message = "Hello Alice!" + test "appends message to user file", %{storage_dir: storage_dir} do + user_id = "zoidberg" + from_user = "bender" + message = "let's go steal some stuff" - assert :ok = MessageServer.Storage.append_message(user_id, from_user, message) + assert :ok = Storage.append_message(user_id, from_user, message) - file_path = Path.join(@test_storage_dir, "#{user_id}.txt") + file_path = Path.join(storage_dir, "#{user_id}.txt") assert File.exists?(file_path) content = File.read!(file_path) assert content == "#{from_user}: #{message}\n" end - test "appends multiple messages to same user file" do - user_id = "test_server-alice" + test "appends multiple messages to same user file", %{storage_dir: storage_dir} do + user_id = "leela" - assert :ok = MessageServer.Storage.append_message(user_id, "test_server-bob", "First message") + assert :ok = MessageServer.Storage.append_message(user_id, "fry", "i love you leela") assert :ok = MessageServer.Storage.append_message( user_id, - "test_server-charlie", - "Second message" + "bender", + "bite my shiny metal ass!" ) - file_path = Path.join(@test_storage_dir, "#{user_id}.txt") + file_path = Path.join(storage_dir, "#{user_id}.txt") content = File.read!(file_path) expected_content = """ - test_server-bob: First message - test_server-charlie: Second message + fry: i love you leela + bender: bite my shiny metal ass! """ assert content == expected_content end - test "creates separate files for different users" do + test "creates separate files for different users", %{storage_dir: storage_dir} do assert :ok = MessageServer.Storage.append_message( - "test_server-alice", - "test_server-bob", - "Hi Alice" + "fry", + "professor", + "good news everyone!" ) assert :ok = MessageServer.Storage.append_message( - "test_server-charlie", - "test_server-bob", - "Hi Charlie" + "nibbler", + "zoidberg", + "hooray! people are paying attention to me!" ) - alice_file = Path.join(@test_storage_dir, "test_server-alice.txt") - charlie_file = Path.join(@test_storage_dir, "test_server-charlie.txt") + fry_file = Path.join(storage_dir, "fry.txt") + nibbler_file = Path.join(storage_dir, "nibbler.txt") - assert File.exists?(alice_file) - assert File.exists?(charlie_file) + assert File.exists?(fry_file) + assert File.exists?(nibbler_file) - assert File.read!(alice_file) == "test_server-bob: Hi Alice\n" - assert File.read!(charlie_file) == "test_server-bob: Hi Charlie\n" + assert File.read!(fry_file) == "professor: good news everyone!\n" + assert File.read!(nibbler_file) == "zoidberg: hooray! people are paying attention to me!\n" end - test "handles messages with special characters" do - user_id = "test_server-alice" - from_user = "test_server-bob" - message = "Hello! 🎉 Special chars: @#$%^&*()" + test "handles messages with special characters", %{storage_dir: storage_dir} do + user_id = "zapp" + from_user = "zoidberg" + message = "why not zoidberg? 🦀 (╯°□°)╯︵ ┻━┻" assert :ok = MessageServer.Storage.append_message(user_id, from_user, message) - file_path = Path.join(@test_storage_dir, "#{user_id}.txt") + file_path = Path.join(storage_dir, "#{user_id}.txt") content = File.read!(file_path) assert content == "#{from_user}: #{message}\n" end - - test "logs successful message storage" do - user_id = "test_server-alice" - from_user = "test_server-bob" - message = "Test message" - - log_output = - capture_log(fn -> - assert :ok = MessageServer.Storage.append_message(user_id, from_user, message) - end) - - assert log_output =~ "Message stored for user #{user_id}" - end end