diff --git a/lib/a2a/plug/multi_tenant.ex b/lib/a2a/plug/multi_tenant.ex new file mode 100644 index 0000000..b54a13e --- /dev/null +++ b/lib/a2a/plug/multi_tenant.ex @@ -0,0 +1,124 @@ +if Code.ensure_loaded?(Plug) do + defmodule A2A.Plug.MultiTenant do + @moduledoc """ + Plug for multi-tenant A2A deployments with path-based routing. + + Routes requests matching `/:tenant/:agent/*path` to the correct agent + process. This is an optional module — existing single-tenant usage + via `A2A.Plug` is unaffected. + + ## Usage + + # In a Phoenix router: + forward "/", A2A.Plug.MultiTenant, + agents: %{ + "greeter" => GreeterAgent, + "helper" => HelperAgent + }, + base_url: "http://localhost:4000" + + # With a registry: + forward "/", A2A.Plug.MultiTenant, + registry: MyApp.AgentRegistry, + base_url: "http://localhost:4000" + + This serves: + - `GET /:tenant/:agent/.well-known/agent-card.json` — per-tenant agent card + - `POST /:tenant/:agent/` — JSON-RPC dispatch with tenant context + + ## Options + + - `:agents` — static map of agent name to GenServer name/pid + - `:registry` — `A2A.Registry` name for dynamic agent lookup + - `:base_url` — public base URL (required) + - `:plug_opts` — extra options forwarded to `A2A.Plug.init/1` + + ## Tenant Context + + Injects into `conn.assigns`: + - `:a2a_tenant` — the tenant ID from the URL path + - `:a2a_agent_name` — the agent name from the URL path + + Sets `"tenant_id"` in task metadata via `A2A.Plug.put_metadata/2`. + """ + + @behaviour Plug + + import Plug.Conn + + @impl Plug + @spec init(keyword()) :: map() + def init(opts) do + agents = Keyword.get(opts, :agents) + registry = Keyword.get(opts, :registry) + + unless agents || registry do + raise ArgumentError, + "A2A.Plug.MultiTenant requires either :agents map or :registry option" + end + + %{ + agents: agents, + registry: registry, + base_url: Keyword.fetch!(opts, :base_url), + plug_opts: Keyword.get(opts, :plug_opts, []) + } + end + + @impl Plug + @spec call(Plug.Conn.t(), map()) :: Plug.Conn.t() + def call(%{path_info: [tenant, agent_name | rest]} = conn, opts) do + case resolve_agent(agent_name, opts) do + {:ok, agent_ref} -> + conn = + conn + |> assign(:a2a_tenant, tenant) + |> assign(:a2a_agent_name, agent_name) + + tenant_base_url = "#{opts.base_url}/#{tenant}/#{agent_name}" + + conn = + conn + |> A2A.Plug.put_base_url(tenant_base_url) + |> A2A.Plug.put_metadata(%{"tenant_id" => tenant}) + + plug_init_opts = + [ + agent: agent_ref, + base_url: opts.base_url + ] ++ opts.plug_opts + + conn = %{conn | path_info: rest, script_name: conn.script_name ++ [tenant, agent_name]} + + a2a_opts = A2A.Plug.init(plug_init_opts) + A2A.Plug.call(conn, a2a_opts) + + {:error, :not_found} -> + conn + |> send_resp(404, "Agent not found: #{agent_name}") + end + end + + def call(conn, _opts) do + send_resp(conn, 404, "Not Found") + end + + defp resolve_agent(name, %{agents: agents}) when is_map(agents) do + case Map.fetch(agents, name) do + {:ok, agent} -> {:ok, agent} + :error -> {:error, :not_found} + end + end + + defp resolve_agent(name, %{registry: registry}) when not is_nil(registry) do + entries = A2A.Registry.all(registry) + + case Enum.find(entries, fn {_mod, card} -> card.name == name end) do + {mod, _card} -> {:ok, mod} + nil -> {:error, :not_found} + end + end + + defp resolve_agent(_name, _opts), do: {:error, :not_found} + end +end diff --git a/lib/a2a/task_store/ets.ex b/lib/a2a/task_store/ets.ex index 0637650..2403478 100644 --- a/lib/a2a/task_store/ets.ex +++ b/lib/a2a/task_store/ets.ex @@ -14,6 +14,20 @@ defmodule A2A.TaskStore.ETS do ## With an Agent MyAgent.start_link(task_store: {A2A.TaskStore.ETS, :my_tasks}) + + ## Multi-Tenant Usage + + For tenant-isolated storage, use `tenant_ref/2` to create a namespaced + reference. Tasks stored under one tenant are invisible to other tenants. + + ref = A2A.TaskStore.ETS.tenant_ref(:my_tasks, "acme") + :ok = A2A.TaskStore.ETS.put(ref, task) + + # Only returns tasks for "acme" tenant + {:ok, tasks} = A2A.TaskStore.ETS.list_all(ref) + + # Plain ref still works (backward compatible) + {:ok, all} = A2A.TaskStore.ETS.list_all(:my_tasks) """ use GenServer @@ -33,6 +47,32 @@ defmodule A2A.TaskStore.ETS do GenServer.start_link(__MODULE__, name, name: name) end + @doc """ + Creates a tenant-namespaced store reference. + + All operations using this ref will scope keys by the given tenant, + providing task isolation between tenants sharing the same ETS table. + + ref = A2A.TaskStore.ETS.tenant_ref(:my_tasks, "acme") + :ok = A2A.TaskStore.ETS.put(ref, task) + """ + @spec tenant_ref(atom(), String.t()) :: {atom(), String.t()} + def tenant_ref(table, tenant) when is_atom(table) and is_binary(tenant) do + {table, tenant} + end + + # -- get --------------------------------------------------------------------- + + @doc false + def get({table, tenant}, task_id) do + key = {tenant, task_id} + + case :ets.lookup(table, key) do + [{^key, task}] -> {:ok, task} + [] -> {:error, :not_found} + end + end + @impl A2A.TaskStore def get(table, task_id) do case :ets.lookup(table, task_id) do @@ -41,32 +81,81 @@ defmodule A2A.TaskStore.ETS do end end + # -- put --------------------------------------------------------------------- + + @doc false + def put({table, tenant}, %A2A.Task{} = task) do + :ets.insert(table, {{tenant, task.id}, task}) + :ok + end + @impl A2A.TaskStore def put(table, %A2A.Task{} = task) do :ets.insert(table, {task.id, task}) :ok end + # -- delete ------------------------------------------------------------------ + + @doc false + def delete({table, tenant}, task_id) do + :ets.delete(table, {tenant, task_id}) + :ok + end + @impl A2A.TaskStore def delete(table, task_id) do :ets.delete(table, task_id) :ok end + # -- list -------------------------------------------------------------------- + + @doc false + def list({table, tenant}, context_id) do + :ets.tab2list(table) + |> Enum.filter(fn + {{^tenant, _id}, task} -> task.context_id == context_id + _ -> false + end) + |> Enum.map(fn {_key, task} -> task end) + |> then(&{:ok, &1}) + end + @impl A2A.TaskStore def list(table, context_id) do - tasks = - :ets.tab2list(table) - |> Enum.filter(fn {_id, task} -> task.context_id == context_id end) - |> Enum.map(fn {_id, task} -> task end) + :ets.tab2list(table) + |> Enum.filter(fn + {{_tenant, _id}, _task} -> false + {_id, task} -> task.context_id == context_id + end) + |> Enum.map(fn {_key, task} -> task end) + |> then(&{:ok, &1}) + end - {:ok, tasks} + # -- list_all ---------------------------------------------------------------- + + @doc false + def list_all(ref, opts \\ []) + + def list_all({table, tenant}, opts) do + :ets.tab2list(table) + |> Enum.filter(fn + {{^tenant, _id}, _task} -> true + _ -> false + end) + |> Enum.map(fn {_key, task} -> task end) + |> A2A.Task.Filter.apply(opts) end @impl A2A.TaskStore - def list_all(table, opts \\ []) do + def list_all(table, opts) do :ets.tab2list(table) - |> Enum.map(fn {_id, task} -> task end) + |> Enum.filter(fn + {{_tenant, _id}, _task} -> false + _ -> true + end) + |> Enum.map(fn {_key, task} -> task end) |> A2A.Task.Filter.apply(opts) end diff --git a/test/a2a/plug/multi_tenant_test.exs b/test/a2a/plug/multi_tenant_test.exs new file mode 100644 index 0000000..833bc6f --- /dev/null +++ b/test/a2a/plug/multi_tenant_test.exs @@ -0,0 +1,255 @@ +defmodule A2A.Plug.MultiTenantTest do + use ExUnit.Case, async: true + + @moduletag :plug + + defp mt_opts(agents, extra \\ []) do + A2A.Plug.MultiTenant.init([agents: agents, base_url: "http://localhost:4000"] ++ extra) + end + + defp json_rpc_conn(method, path, params \\ %{}, id \\ 1) do + body = + Jason.encode!(%{ + "jsonrpc" => "2.0", + "id" => id, + "method" => method, + "params" => params + }) + + Plug.Test.conn(:post, path, body) + |> Plug.Conn.put_req_header("content-type", "application/json") + end + + defp message_params(text \\ "hello") do + %{ + "message" => %{ + "messageId" => "msg-test", + "role" => "user", + "parts" => [%{"kind" => "text", "text" => text}] + } + } + end + + defp json_body(conn), do: Jason.decode!(conn.resp_body) + + defp get_resp_header(conn, key) do + for {k, v} <- conn.resp_headers, k == key, do: v + end + + setup do + echo = start_supervised!({A2A.Test.EchoAgent, [name: nil]}, id: :echo) + greeter = start_supervised!({A2A.Test.GreeterAgent, [name: nil]}, id: :greeter) + {:ok, echo: echo, greeter: greeter} + end + + describe "path-based routing" do + test "routes /:tenant/:agent to correct agent", %{echo: echo, greeter: greeter} do + agents = %{"echo" => echo, "greeter" => greeter} + opts = mt_opts(agents) + + conn = + json_rpc_conn("message/send", "/acme/echo/", message_params("hi")) + |> A2A.Plug.MultiTenant.call(opts) + + assert conn.status == 200 + body = json_body(conn) + assert body["result"]["task"]["status"]["state"] == "TASK_STATE_COMPLETED" + + artifact = hd(body["result"]["task"]["artifacts"]) + assert hd(artifact["parts"])["text"] == "hi" + end + + test "different tenants use same agent", %{echo: echo} do + agents = %{"echo" => echo} + opts = mt_opts(agents) + + conn1 = + json_rpc_conn("message/send", "/tenant-a/echo/", message_params("from A")) + |> A2A.Plug.MultiTenant.call(opts) + + conn2 = + json_rpc_conn("message/send", "/tenant-b/echo/", message_params("from B")) + |> A2A.Plug.MultiTenant.call(opts) + + assert conn1.status == 200 + assert conn2.status == 200 + + body1 = json_body(conn1) + body2 = json_body(conn2) + + artifact1 = hd(body1["result"]["task"]["artifacts"]) + artifact2 = hd(body2["result"]["task"]["artifacts"]) + assert hd(artifact1["parts"])["text"] == "from A" + assert hd(artifact2["parts"])["text"] == "from B" + end + + test "unknown agent returns 404", %{echo: echo} do + agents = %{"echo" => echo} + opts = mt_opts(agents) + + conn = + Plug.Test.conn(:post, "/acme/nonexistent/", "") + |> A2A.Plug.MultiTenant.call(opts) + + assert conn.status == 404 + assert conn.resp_body =~ "Agent not found" + end + + test "missing path segments returns 404" do + opts = mt_opts(%{"echo" => self()}) + + conn = + Plug.Test.conn(:get, "/only-one-segment") + |> A2A.Plug.MultiTenant.call(opts) + + assert conn.status == 404 + end + end + + describe "tenant context" do + test "injects tenant_id into task metadata", %{echo: echo} do + agents = %{"echo" => echo} + opts = mt_opts(agents) + + conn = + json_rpc_conn("message/send", "/acme/echo/", message_params()) + |> A2A.Plug.MultiTenant.call(opts) + + body = json_body(conn) + meta = body["result"]["task"]["metadata"] + assert meta["tenant_id"] == "acme" + end + + test "sets conn assigns for tenant and agent", %{echo: echo} do + agents = %{"echo" => echo} + opts = mt_opts(agents) + + conn = + json_rpc_conn("message/send", "/acme/echo/", message_params()) + |> A2A.Plug.MultiTenant.call(opts) + + assert conn.assigns[:a2a_tenant] == "acme" + assert conn.assigns[:a2a_agent_name] == "echo" + end + end + + describe "agent card per-tenant" do + test "serves agent card with tenant-scoped URL", %{echo: echo} do + agents = %{"echo" => echo} + opts = mt_opts(agents) + + conn = + Plug.Test.conn(:get, "/acme/echo/.well-known/agent-card.json") + |> A2A.Plug.MultiTenant.call(opts) + + assert conn.status == 200 + assert get_resp_header(conn, "content-type") |> hd() =~ "application/json" + + body = json_body(conn) + assert body["name"] == "echo" + assert body["url"] == "http://localhost:4000/acme/echo" + end + + test "different tenants get different URLs", %{echo: echo} do + agents = %{"echo" => echo} + opts = mt_opts(agents) + + conn_a = + Plug.Test.conn(:get, "/tenant-a/echo/.well-known/agent-card.json") + |> A2A.Plug.MultiTenant.call(opts) + + conn_b = + Plug.Test.conn(:get, "/tenant-b/echo/.well-known/agent-card.json") + |> A2A.Plug.MultiTenant.call(opts) + + assert json_body(conn_a)["url"] == "http://localhost:4000/tenant-a/echo" + assert json_body(conn_b)["url"] == "http://localhost:4000/tenant-b/echo" + end + end + + describe "registry-based lookup" do + setup do + registry = :"test_registry_#{System.unique_integer([:positive])}" + echo = start_supervised!({A2A.Test.EchoAgent, [name: nil]}, id: :reg_echo) + + _ = + start_supervised!( + {A2A.Registry, name: registry, agents: []}, + id: :reg + ) + + A2A.Registry.register(registry, echo, A2A.Test.EchoAgent.agent_card()) + + {:ok, registry: registry, echo: echo} + end + + test "resolves agent by card name from registry", %{registry: registry} do + opts = + A2A.Plug.MultiTenant.init( + registry: registry, + base_url: "http://localhost:4000" + ) + + conn = + json_rpc_conn("message/send", "/acme/echo/", message_params("via registry")) + |> A2A.Plug.MultiTenant.call(opts) + + assert conn.status == 200 + body = json_body(conn) + artifact = hd(body["result"]["task"]["artifacts"]) + assert hd(artifact["parts"])["text"] == "via registry" + end + + test "returns 404 for unregistered agent", %{registry: registry} do + opts = + A2A.Plug.MultiTenant.init( + registry: registry, + base_url: "http://localhost:4000" + ) + + conn = + Plug.Test.conn(:post, "/acme/unknown/", "") + |> A2A.Plug.MultiTenant.call(opts) + + assert conn.status == 404 + end + end + + describe "init/1" do + test "raises without agents or registry" do + assert_raise ArgumentError, ~r/requires either/, fn -> + A2A.Plug.MultiTenant.init(base_url: "http://localhost:4000") + end + end + + test "raises without base_url" do + assert_raise KeyError, fn -> + A2A.Plug.MultiTenant.init(agents: %{"echo" => self()}) + end + end + end + + describe "backward compatibility" do + test "A2A.Plug still works standalone", %{echo: echo} do + opts = A2A.Plug.init(agent: echo, base_url: "http://localhost:4000") + + conn = + Plug.Test.conn( + :post, + "/", + Jason.encode!(%{ + "jsonrpc" => "2.0", + "id" => 1, + "method" => "message/send", + "params" => message_params() + }) + ) + |> Plug.Conn.put_req_header("content-type", "application/json") + |> A2A.Plug.call(opts) + + assert conn.status == 200 + body = json_body(conn) + assert body["result"]["task"]["kind"] == "task" + end + end +end diff --git a/test/a2a/task_store/ets_tenant_test.exs b/test/a2a/task_store/ets_tenant_test.exs new file mode 100644 index 0000000..fa87499 --- /dev/null +++ b/test/a2a/task_store/ets_tenant_test.exs @@ -0,0 +1,145 @@ +defmodule A2A.TaskStore.ETSTenantTest do + use ExUnit.Case, async: true + + alias A2A.TaskStore.ETS + + defp make_task(id, context_id \\ nil) do + %A2A.Task{ + id: id, + context_id: context_id, + status: A2A.Task.Status.new(:completed), + history: [], + artifacts: [], + metadata: %{} + } + end + + setup do + table = :"tenant_test_#{System.unique_integer([:positive])}" + start_supervised!({ETS, name: table}) + {:ok, table: table} + end + + describe "tenant_ref/2" do + test "creates a {table, tenant} tuple", %{table: table} do + ref = ETS.tenant_ref(table, "acme") + assert ref == {table, "acme"} + end + end + + describe "tenant-namespaced operations" do + test "put and get with tenant ref", %{table: table} do + ref = ETS.tenant_ref(table, "acme") + task = make_task("tsk-1") + + assert :ok = ETS.put(ref, task) + assert {:ok, ^task} = ETS.get(ref, "tsk-1") + end + + test "tenant A tasks invisible to tenant B", %{table: table} do + ref_a = ETS.tenant_ref(table, "acme") + ref_b = ETS.tenant_ref(table, "beta") + + task_a = make_task("tsk-a") + task_b = make_task("tsk-b") + + :ok = ETS.put(ref_a, task_a) + :ok = ETS.put(ref_b, task_b) + + assert {:ok, ^task_a} = ETS.get(ref_a, "tsk-a") + assert {:error, :not_found} = ETS.get(ref_a, "tsk-b") + + assert {:ok, ^task_b} = ETS.get(ref_b, "tsk-b") + assert {:error, :not_found} = ETS.get(ref_b, "tsk-a") + end + + test "tenant tasks invisible to plain ref", %{table: table} do + ref = ETS.tenant_ref(table, "acme") + task = make_task("tsk-1") + :ok = ETS.put(ref, task) + + assert {:error, :not_found} = ETS.get(table, "tsk-1") + end + + test "plain tasks invisible to tenant ref", %{table: table} do + task = make_task("tsk-plain") + :ok = ETS.put(table, task) + + ref = ETS.tenant_ref(table, "acme") + assert {:error, :not_found} = ETS.get(ref, "tsk-plain") + end + + test "delete only affects correct tenant", %{table: table} do + ref_a = ETS.tenant_ref(table, "acme") + ref_b = ETS.tenant_ref(table, "beta") + + :ok = ETS.put(ref_a, make_task("tsk-shared-id")) + :ok = ETS.put(ref_b, make_task("tsk-shared-id")) + + :ok = ETS.delete(ref_a, "tsk-shared-id") + + assert {:error, :not_found} = ETS.get(ref_a, "tsk-shared-id") + assert {:ok, _} = ETS.get(ref_b, "tsk-shared-id") + end + + test "list by context_id is tenant-scoped", %{table: table} do + ref_a = ETS.tenant_ref(table, "acme") + ref_b = ETS.tenant_ref(table, "beta") + + :ok = ETS.put(ref_a, make_task("tsk-a1", "ctx-1")) + :ok = ETS.put(ref_a, make_task("tsk-a2", "ctx-1")) + :ok = ETS.put(ref_b, make_task("tsk-b1", "ctx-1")) + + {:ok, tasks} = ETS.list(ref_a, "ctx-1") + ids = Enum.map(tasks, & &1.id) |> Enum.sort() + assert ids == ["tsk-a1", "tsk-a2"] + end + + test "list_all is tenant-scoped", %{table: table} do + ref_a = ETS.tenant_ref(table, "acme") + ref_b = ETS.tenant_ref(table, "beta") + + :ok = ETS.put(ref_a, make_task("tsk-a1")) + :ok = ETS.put(ref_a, make_task("tsk-a2")) + :ok = ETS.put(ref_b, make_task("tsk-b1")) + :ok = ETS.put(table, make_task("tsk-plain")) + + {:ok, result} = ETS.list_all(ref_a, []) + ids = Enum.map(result.tasks, & &1.id) |> Enum.sort() + assert ids == ["tsk-a1", "tsk-a2"] + end + + test "plain list excludes tenant tasks", %{table: table} do + ref = ETS.tenant_ref(table, "acme") + :ok = ETS.put(ref, make_task("tsk-tenant", "ctx-1")) + :ok = ETS.put(table, make_task("tsk-plain", "ctx-1")) + + {:ok, tasks} = ETS.list(table, "ctx-1") + ids = Enum.map(tasks, & &1.id) + assert ids == ["tsk-plain"] + end + + test "plain list_all excludes tenant tasks", %{table: table} do + ref = ETS.tenant_ref(table, "acme") + :ok = ETS.put(ref, make_task("tsk-tenant")) + :ok = ETS.put(table, make_task("tsk-plain")) + + {:ok, result} = ETS.list_all(table, []) + ids = Enum.map(result.tasks, & &1.id) + assert ids == ["tsk-plain"] + end + end + + describe "backward compatibility" do + test "existing non-tenant operations unchanged", %{table: table} do + task = make_task("tsk-1", "ctx-1") + + assert :ok = ETS.put(table, task) + assert {:ok, ^task} = ETS.get(table, "tsk-1") + assert {:ok, [^task]} = ETS.list(table, "ctx-1") + + assert :ok = ETS.delete(table, "tsk-1") + assert {:error, :not_found} = ETS.get(table, "tsk-1") + end + end +end diff --git a/test/support/agents/greeter_agent.ex b/test/support/agents/greeter_agent.ex new file mode 100644 index 0000000..892dcb2 --- /dev/null +++ b/test/support/agents/greeter_agent.ex @@ -0,0 +1,15 @@ +defmodule A2A.Test.GreeterAgent do + @moduledoc false + use A2A.Agent, + name: "greeter", + description: "Greets users by name", + skills: [ + %{id: "greet", name: "Greet", description: "Says hello", tags: ["greeting"]} + ] + + @impl A2A.Agent + def handle_message(message, _context) do + text = A2A.Message.text(message) || "stranger" + {:reply, [A2A.Part.Text.new("Hello, #{text}!")]} + end +end