Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/a2a/agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ defmodule A2A.Agent do
{:reply, __MODULE__.agent_card(), state}
end

def handle_call(:get_task_store, _from, state) do
{:reply, state.task_store, state}
end

@impl GenServer
def handle_cast({:stream_done, task_id, parts}, state) do
case A2A.Agent.State.get_task(state, task_id) do
Expand Down
8 changes: 8 additions & 0 deletions lib/a2a/authentication_info.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule A2A.AuthenticationInfo do
@moduledoc "Authentication details for push notifications."

@type t :: %__MODULE__{scheme: String.t(), credentials: String.t() | nil}

@enforce_keys [:scheme]
defstruct [:scheme, :credentials]
end
115 changes: 115 additions & 0 deletions lib/a2a/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,66 @@ if Code.ensure_loaded?(Req) do
end
end

@doc "Creates a push notification config for a task."
@spec create_push_config(target(), A2A.PushNotificationConfig.t(), keyword()) ::
{:ok, A2A.PushNotificationConfig.t()} | {:error, term()}
def create_push_config(target, %A2A.PushNotificationConfig{} = config, opts \\ []) do
client = ensure_client(target)
req_opts = take_req_opts(opts)
{:ok, params} = A2A.JSON.encode(config)
body = jsonrpc_request("tasks/pushNotificationConfig/set", params)

case post(client, body, req_opts) do
{:ok, response} -> decode_jsonrpc_result(response, :push_notification_config)
{:error, _} = error -> error
end
end

@doc "Gets a push notification config for a task."
@spec get_push_config(target(), String.t(), String.t(), keyword()) ::
{:ok, A2A.PushNotificationConfig.t()} | {:error, term()}
def get_push_config(target, task_id, config_id, opts \\ []) do
client = ensure_client(target)
req_opts = take_req_opts(opts)
params = %{"taskId" => task_id, "id" => config_id}
body = jsonrpc_request("tasks/pushNotificationConfig/get", params)

case post(client, body, req_opts) do
{:ok, response} -> decode_jsonrpc_result(response, :push_notification_config)
{:error, _} = error -> error
end
end

@doc "Lists push notification configs for a task."
@spec list_push_configs(target(), String.t(), keyword()) ::
{:ok, [A2A.PushNotificationConfig.t()]} | {:error, term()}
def list_push_configs(target, task_id, opts \\ []) do
client = ensure_client(target)
req_opts = take_req_opts(opts)
params = %{"taskId" => task_id}
body = jsonrpc_request("tasks/pushNotificationConfig/list", params)

case post(client, body, req_opts) do
{:ok, response} -> decode_push_configs_result(response)
{:error, _} = error -> error
end
end

@doc "Deletes a push notification config for a task."
@spec delete_push_config(target(), String.t(), String.t(), keyword()) ::
:ok | {:error, term()}
def delete_push_config(target, task_id, config_id, opts \\ []) do
client = ensure_client(target)
req_opts = take_req_opts(opts)
params = %{"taskId" => task_id, "id" => config_id}
body = jsonrpc_request("tasks/pushNotificationConfig/delete", params)

case post(client, body, req_opts) do
{:ok, response} -> decode_empty_result(response)
{:error, _} = error -> error
end
end

# -------------------------------------------------------------------
# Private — Request building
# -------------------------------------------------------------------
Expand Down Expand Up @@ -383,6 +443,61 @@ if Code.ensure_loaded?(Req) do
{:error, {:unexpected_body, body}}
end

defp decode_push_configs_result(%Req.Response{body: body}) when is_map(body) do
decode_push_configs_body(body)
end

defp decode_push_configs_result(%Req.Response{body: body}) when is_binary(body) do
case Jason.decode(body) do
{:ok, decoded} -> decode_push_configs_body(decoded)
{:error, _} = error -> error
end
end

defp decode_push_configs_body(%{"error" => error_map}) do
{:error,
%Error{
code: error_map["code"],
message: error_map["message"],
data: error_map["data"]
}}
end

defp decode_push_configs_body(%{"result" => %{"configs" => configs}}) do
decoded =
Enum.map(configs, fn c ->
{:ok, config} = A2A.JSON.decode(c, :push_notification_config)
config
end)

{:ok, decoded}
end

defp decode_push_configs_body(body), do: {:error, {:unexpected_body, body}}

defp decode_empty_result(%Req.Response{body: body}) when is_map(body) do
decode_empty_body(body)
end

defp decode_empty_result(%Req.Response{body: body}) when is_binary(body) do
case Jason.decode(body) do
{:ok, decoded} -> decode_empty_body(decoded)
{:error, _} = error -> error
end
end

defp decode_empty_body(%{"error" => error_map}) do
{:error,
%Error{
code: error_map["code"],
message: error_map["message"],
data: error_map["data"]
}}
end

defp decode_empty_body(%{"result" => _}), do: :ok
defp decode_empty_body(body), do: {:error, {:unexpected_body, body}}

# -------------------------------------------------------------------
# Private — SSE streaming
# -------------------------------------------------------------------
Expand Down
43 changes: 43 additions & 0 deletions lib/a2a/json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,22 @@ defmodule A2A.JSON do
{:ok, map}
end

def encode(%A2A.PushNotificationConfig{} = config) do
map =
%{}
|> put_unless_nil("id", config.id)
|> put_unless_nil("taskId", config.task_id)
|> put_unless_nil("url", config.url)
|> put_unless_nil("token", config.token)
|> put_unless_nil("authentication", encode_authentication_info(config.authentication))

{:ok, map}
end

def encode(%A2A.AuthenticationInfo{} = auth) do
{:ok, encode_authentication_info(auth)}
end

def encode(%{__struct__: mod}) do
{:error, {:unsupported_type, mod}}
end
Expand Down Expand Up @@ -499,6 +515,17 @@ defmodule A2A.JSON do
end
end

def decode(map, :push_notification_config) do
{:ok,
%A2A.PushNotificationConfig{
id: Map.get(map, "id"),
task_id: Map.get(map, "taskId"),
url: Map.get(map, "url"),
token: Map.get(map, "token"),
authentication: decode_authentication_info(Map.get(map, "authentication"))
}}
end

@doc """
Decodes a JSON map into an Elixir struct, raising on failure.
"""
Expand Down Expand Up @@ -629,6 +656,22 @@ defmodule A2A.JSON do
Map.put(map, key, encoded)
end

defp encode_authentication_info(nil), do: nil

defp encode_authentication_info(%A2A.AuthenticationInfo{} = auth) do
%{"scheme" => auth.scheme}
|> put_unless_nil("credentials", auth.credentials)
end

defp decode_authentication_info(nil), do: nil

defp decode_authentication_info(map) when is_map(map) do
%A2A.AuthenticationInfo{
scheme: Map.get(map, "scheme", ""),
credentials: Map.get(map, "credentials")
}
end

# -------------------------------------------------------------------
# Private — Decoding helpers
# -------------------------------------------------------------------
Expand Down
116 changes: 113 additions & 3 deletions lib/a2a/jsonrpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,41 @@ defmodule A2A.JSONRPC do
@callback handle_list(params :: map(), context :: map()) ::
{:ok, map()} | {:error, Error.t()}

@optional_callbacks handle_list: 2
@doc "Called for push notification config create. Optional."
@callback handle_set_push_config(
A2A.PushNotificationConfig.t(),
params :: map(),
context :: map()
) :: {:ok, A2A.PushNotificationConfig.t()} | {:error, Error.t()}

@doc "Called for push notification config get. Optional."
@callback handle_get_push_config(
task_id :: String.t(),
config_id :: String.t(),
params :: map(),
context :: map()
) :: {:ok, A2A.PushNotificationConfig.t()} | {:error, Error.t()}

@doc "Called for push notification config list. Optional."
@callback handle_list_push_configs(
task_id :: String.t(),
params :: map(),
context :: map()
) :: {:ok, [A2A.PushNotificationConfig.t()]} | {:error, Error.t()}

@doc "Called for push notification config delete. Optional."
@callback handle_delete_push_config(
task_id :: String.t(),
config_id :: String.t(),
params :: map(),
context :: map()
) :: :ok | {:error, Error.t()}

@optional_callbacks handle_list: 2,
handle_set_push_config: 3,
handle_get_push_config: 4,
handle_list_push_configs: 3,
handle_delete_push_config: 4

@doc """
Parses a JSON-RPC 2.0 request map and dispatches to the handler.
Expand Down Expand Up @@ -167,8 +201,77 @@ defmodule A2A.JSONRPC do
{:stream, "tasks/resubscribe", req.params, req.id}
end

defp dispatch(%Request{method: "tasks/pushNotificationConfig/" <> _} = req, _, _) do
{:reply, Response.error(req.id, Error.push_notification_not_supported())}
defp dispatch(%Request{method: "tasks/pushNotificationConfig/set"} = req, handler, ctx) do
if function_exported?(handler, :handle_set_push_config, 3) do
with {:ok, config} <- decode_push_config(req.params),
{:ok, result} <-
safe_call(fn -> handler.handle_set_push_config(config, req.params, ctx) end),
{:ok, encoded} <- A2A.JSON.encode(result) do
{:reply, Response.success(req.id, encoded)}
else
{:error, %Error{} = error} -> {:reply, Response.error(req.id, error)}
end
else
{:reply, Response.error(req.id, Error.push_notification_not_supported())}
end
end

defp dispatch(%Request{method: "tasks/pushNotificationConfig/get"} = req, handler, ctx) do
if function_exported?(handler, :handle_get_push_config, 4) do
task_id = req.params["taskId"]
config_id = req.params["id"]

case safe_call(fn ->
handler.handle_get_push_config(task_id, config_id, req.params, ctx)
end) do
{:ok, config} ->
{:ok, encoded} = A2A.JSON.encode(config)
{:reply, Response.success(req.id, encoded)}

{:error, %Error{} = error} ->
{:reply, Response.error(req.id, error)}
end
else
{:reply, Response.error(req.id, Error.push_notification_not_supported())}
end
end

defp dispatch(%Request{method: "tasks/pushNotificationConfig/list"} = req, handler, ctx) do
if function_exported?(handler, :handle_list_push_configs, 3) do
task_id = req.params["taskId"]

case safe_call(fn -> handler.handle_list_push_configs(task_id, req.params, ctx) end) do
{:ok, configs} ->
encoded_configs = Enum.map(configs, fn c -> A2A.JSON.encode!(c) end)
{:reply, Response.success(req.id, %{"configs" => encoded_configs})}

{:error, %Error{} = error} ->
{:reply, Response.error(req.id, error)}
end
else
{:reply, Response.error(req.id, Error.push_notification_not_supported())}
end
end

defp dispatch(%Request{method: "tasks/pushNotificationConfig/delete"} = req, handler, ctx) do
if function_exported?(handler, :handle_delete_push_config, 4) do
task_id = req.params["taskId"]
config_id = req.params["id"]

try do
case handler.handle_delete_push_config(task_id, config_id, req.params, ctx) do
:ok ->
{:reply, Response.success(req.id, %{})}

{:error, %Error{} = error} ->
{:reply, Response.error(req.id, error)}
end
rescue
e -> {:reply, Response.error(req.id, Error.internal_error(Exception.message(e)))}
end
else
{:reply, Response.error(req.id, Error.push_notification_not_supported())}
end
end

defp dispatch(
Expand All @@ -185,6 +288,13 @@ defmodule A2A.JSONRPC do

# -- helpers ---------------------------------------------------------------

defp decode_push_config(params) do
case A2A.JSON.decode(params, :push_notification_config) do
{:ok, _config} = ok -> ok
{:error, reason} -> {:error, Error.invalid_params(inspect(reason))}
end
end

defp decode_message(params) do
case A2A.JSON.decode(params["message"], :message) do
{:ok, _message} = ok -> ok
Expand Down
Loading