diff --git a/evaluators/contrib/financial-governance/README.md b/evaluators/contrib/financial-governance/README.md new file mode 100644 index 00000000..a1a10461 --- /dev/null +++ b/evaluators/contrib/financial-governance/README.md @@ -0,0 +1,292 @@ +# Financial Governance Evaluators for Agent Control + +Evaluators that enforce financial spend limits and transaction policies for autonomous AI agents. + +As agents transact autonomously via protocols like [x402](https://github.com/coinbase/x402) and payment layers like [agentpay-mcp](https://github.com/AI-Agent-Economy/agentpay-mcp), enterprises need governance over what agents spend. These evaluators bring financial policy enforcement into the Agent Control framework. + +## Evaluators + +### `financial_governance.spend_limit` + +Tracks cumulative agent spend and enforces rolling budget limits. Stateful — records approved transactions and checks new ones against accumulated spend. + +- **Per-transaction cap** — reject any single payment above a threshold (`BudgetLimit` with no window) +- **Rolling period budget** — reject payments that would exceed a time-windowed budget (`BudgetWindow(kind="rolling", ...)`) +- **Calendar-aligned budget** — reject payments that exceed a day/week/month budget (`BudgetWindow(kind="fixed", ...)`) +- **Scoped budgets** — independent counters per channel, agent, or session via `scope_by` +- **Pluggable storage** — abstract `SpendStore` protocol with built-in `InMemorySpendStore`; bring your own PostgreSQL, Redis, etc. +- **Atomic enforcement** — `check_and_record()` prevents TOCTOU races in single-process deployments + +### `financial_governance.transaction_policy` + +Static policy checks with no state tracking. Enforces structural rules on individual transactions. + +- **Currency allowlist** — only permit specific currencies (e.g., `["USDC", "USDT"]`) +- **Recipient blocklist/allowlist** — control which addresses an agent can pay +- **Amount bounds** — minimum and maximum per-transaction limits + +## Installation + +```bash +# From the repo root (development) +cd evaluators/contrib/financial-governance +pip install -e ".[dev]" +``` + +## Configuration + +### Spend Limit + +The `spend_limit` evaluator is configured via a list of `BudgetLimit` objects. Each limit is evaluated independently — the first violation wins. + +```yaml +controls: + - name: spend-limit + evaluator: + type: financial_governance.spend_limit + config: + limits: + # Per-transaction cap: single payment ≤ 100 USDC + - amount: "100.00" + currency: USDC + # Per-channel rolling 24h budget: each channel limited to 1000 USDC/day + - amount: "1000.00" + currency: USDC + scope_by: [channel] + window: + kind: rolling + seconds: 86400 + selector: + path: input # Extract step.input (transaction dict) + action: deny +``` + +### Transaction Policy + +```yaml +controls: + - name: transaction-policy + evaluator: + type: financial_governance.transaction_policy + config: + allowed_currencies: [USDC, USDT] + blocked_recipients: ["0xDEAD..."] + allowed_recipients: ["0xALICE...", "0xBOB..."] + min_amount: "0.01" + max_amount: "5000.00" + selector: + path: input + action: deny +``` + +## Selector Paths + +Both evaluators support two selector configurations: + +- **`selector.path: "input"`** (recommended) — The evaluator receives `step.input` directly, which should be the transaction dict. +- **`selector.path: "*"`** — The evaluator receives the full Step object. It automatically extracts `step.input` for transaction fields and `step.context` for channel/agent/session metadata. + +## Input Data Schema + +The transaction dict (from `step.input`) should contain: + +```python +# step.input — transaction payload +{ + "amount": "50.00", # required — Decimal or numeric string + "currency": "USDC", # required — payment currency + "recipient": "0xABC...", # required — payment recipient + # optional context fields (used for scope_by) + "channel": "slack", + "agent_id": "agent-42", + "session_id": "sess-1", +} +``` + +> **Note:** Use `Decimal` or string representations for `amount` — never raw `float`. Floating-point arithmetic is imprecise for money. The evaluator internally converts to `Decimal`. + +## BudgetLimit Model + +```python +from decimal import Decimal +from agent_control_evaluator_financial_governance.spend_limit import ( + BudgetLimit, BudgetWindow, SpendLimitConfig, SpendLimitEvaluator, +) + +# Per-transaction cap (no window) +cap = BudgetLimit(amount=Decimal("100"), currency="USDC") + +# Rolling 24-hour budget, scoped per channel +rolling = BudgetLimit( + amount=Decimal("1000"), + currency="USDC", + scope_by=("channel",), + window=BudgetWindow(kind="rolling", seconds=86400), +) + +# Calendar-day budget (UTC) +daily = BudgetLimit( + amount=Decimal("500"), + currency="USDC", + window=BudgetWindow(kind="fixed", unit="day"), +) + +config = SpendLimitConfig(limits=[cap, rolling, daily]) +evaluator = SpendLimitEvaluator(config) +``` + +### BudgetWindow + +| kind | Required fields | Notes | +|------|----------------|-------| +| `"rolling"` | `seconds` | Sliding window from `now - seconds` | +| `"fixed"` | `unit` (`"day"`, `"week"`, or `"month"`) | Calendar-aligned, UTC by default | + +### scope_by semantics + +`scope_by` lists the context dimension keys to isolate spend buckets. Each dimension is **independent**: + +- `scope_by=()` (default) — global budget: all spend in that currency shares one counter +- `scope_by=("channel",)` — one counter per unique `channel` value +- `scope_by=("agent_id",)` — one counter per unique `agent_id` +- `scope_by=("channel", "agent_id")` — one counter per unique `(channel, agent_id)` pair + +Spend in `channel-A` does **not** count against `channel-B`'s budget. + +## Context-Aware Limits + +Context fields (`channel`, `agent_id`, `session_id`) can be provided in two ways: + +**Option A: Via `step.context`** (recommended for engine integration) + +```python +step = Step( + type="tool", + name="payment", + input={"amount": "75.00", "currency": "USDC", "recipient": "0xABC"}, + context={ + "channel": "experimental", + "agent_id": "agent-42", + }, +) +``` + +When using `selector.path: "*"`, the evaluator merges `step.context` fields into the transaction data automatically. Fields already present in `step.input` are never overwritten by context. + +**Option B: Inline in the transaction dict** (simpler, for direct SDK use) + +```python +result = await evaluator.evaluate({ + "amount": "75.00", + "currency": "USDC", + "recipient": "0xABC", + "channel": "experimental", + "agent_id": "agent-42", +}) +``` + +## Custom SpendStore + +The `SpendStore` protocol requires three methods. Implement them for your backend: + +```python +from decimal import Decimal +from agent_control_evaluator_financial_governance.spend_limit import ( + SpendStore, SpendLimitConfig, SpendLimitEvaluator, +) + +class PostgresSpendStore: + """Example: PostgreSQL-backed spend tracking.""" + + def __init__(self, connection_string: str): + self._conn = connect(connection_string) + + def record_spend( + self, + amount: Decimal, + currency: str, + metadata: dict | None = None, + ) -> None: + self._conn.execute( + "INSERT INTO agent_spend (amount, currency, metadata, recorded_at)" + " VALUES (%s, %s, %s, NOW())", + (str(amount), currency, json.dumps(metadata)), + ) + + def get_spend( + self, + currency: str, + start: float, + end: float | None = None, + scope: dict[str, str] | None = None, + ) -> Decimal: + # Build WHERE clause for scope filtering + clauses = [ + "currency = %s", + "recorded_at >= to_timestamp(%s)", + ] + params = [currency, start] + if end is not None: + clauses.append("recorded_at <= to_timestamp(%s)") + params.append(end) + if scope: + for k, v in scope.items(): + clauses.append(f"metadata->>{k!r} = %s") + params.append(v) + where = " AND ".join(clauses) + row = self._conn.execute( + f"SELECT COALESCE(SUM(amount), 0) FROM agent_spend WHERE {where}", + params, + ).fetchone() + return Decimal(str(row[0])) + + def check_and_record( + self, + amount: Decimal, + currency: str, + limit: Decimal, + start: float, + end: float | None = None, + scope: dict[str, str] | None = None, + metadata: dict | None = None, + ) -> tuple[bool, Decimal]: + # Use a DB transaction for atomicity + with self._conn.transaction(): + current = self.get_spend(currency, start, end, scope) + if current + amount > limit: + return False, current + self.record_spend(amount, currency, metadata) + return True, current + +# Use it: +store = PostgresSpendStore("postgresql://...") +evaluator = SpendLimitEvaluator(config, store=store) +``` + +> **Single-process atomicity note:** `InMemorySpendStore.check_and_record()` uses a `threading.Lock` to atomically check-and-record within a single process. For multi-process or distributed deployments, your custom store must implement true database-level atomics (e.g., PostgreSQL `SELECT ... FOR UPDATE`, Redis Lua scripts). + +## Running Tests + +```bash +cd evaluators/contrib/financial-governance +pip install -e ".[dev]" +pytest tests/ -v +``` + +## Design Decisions + +1. **Decimal for money** — All monetary amounts use `Decimal`, never `float`. Floating-point arithmetic is unsuitable for financial calculations. +2. **BudgetLimit + BudgetWindow models** — Expressive, composable budget definitions that replace the previous flat config. Each limit is independent; first violation wins. +3. **Independent scope dimensions** — `scope_by=("channel",)` creates a separate counter for each channel value. Spend in one channel is completely isolated from another. +4. **Atomic check_and_record()** — Eliminates the TOCTOU race of separate `get_spend()` + `record_spend()` calls. Single-process safe with `threading.Lock`; production stores should use DB-level atomics. +5. **Decoupled from data source** — The `SpendStore` protocol means no new tables in core Agent Control. Bring your own persistence. +6. **Fail-open on malformed input** — Missing or malformed data returns `matched=False, error=None`, following Agent Control conventions. The `error` field is reserved for evaluator crashes, not policy decisions. + +## Related Projects + +- [x402](https://github.com/coinbase/x402) — HTTP 402 payment protocol +- [agentpay-mcp](https://github.com/up2itnow0822/agentpay-mcp) — MCP server for non-custodial agent payments + +## License + +Apache-2.0 — see [LICENSE](../../../LICENSE). diff --git a/evaluators/contrib/financial-governance/pyproject.toml b/evaluators/contrib/financial-governance/pyproject.toml new file mode 100644 index 00000000..c833a911 --- /dev/null +++ b/evaluators/contrib/financial-governance/pyproject.toml @@ -0,0 +1,55 @@ +[project] +name = "agent-control-evaluator-financial-governance" +version = "0.1.0" +description = "Financial governance evaluators for agent-control — spend limits and transaction policy enforcement" +readme = "README.md" +requires-python = ">=3.12" +license = { text = "Apache-2.0" } +authors = [{ name = "agent-control contributors" }] +keywords = ["agent-control", "evaluator", "financial", "spend-limit", "x402", "agentpay"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development :: Libraries", +] +dependencies = [ + "agent-control-evaluators>=3.0.0", + "agent-control-models>=3.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "pytest-cov>=4.0.0", + "ruff>=0.1.0", + "mypy>=1.8.0", +] + +[project.entry-points."agent_control.evaluators"] +"financial_governance.spend_limit" = "agent_control_evaluator_financial_governance.spend_limit:SpendLimitEvaluator" +"financial_governance.transaction_policy" = "agent_control_evaluator_financial_governance.transaction_policy:TransactionPolicyEvaluator" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/agent_control_evaluator_financial_governance"] + +[tool.ruff] +line-length = 100 +target-version = "py312" + +[tool.ruff.lint] +select = ["E", "F", "I"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" + +[tool.uv.sources] +agent-control-evaluators = { path = "../../builtin", editable = true } +agent-control-models = { path = "../../../models", editable = true } diff --git a/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/__init__.py b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/__init__.py new file mode 100644 index 00000000..21ba243c --- /dev/null +++ b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/__init__.py @@ -0,0 +1,59 @@ +"""Financial governance evaluators for agent-control. + +Provides two evaluators for enforcing financial policy on AI agent transactions: + +- ``financial_governance.spend_limit``: Tracks cumulative spend against rolling + period budgets and per-transaction caps. Uses the :class:`BudgetLimit` / + :class:`BudgetWindow` model for expressive, scoped budget definitions. +- ``financial_governance.transaction_policy``: Static policy checks — allowlists, + blocklists, amount bounds, and permitted currencies. + +Both evaluators are registered automatically when this package is installed and +the ``agent_control.evaluators`` entry point group is discovered. + +Example usage in an agent-control control config:: + + { + "condition": { + "selector": {"path": "input"}, + "evaluator": { + "name": "financial_governance.spend_limit", + "config": { + "limits": [ + { + "amount": "100.00", + "currency": "USDC" + }, + { + "amount": "1000.00", + "currency": "USDC", + "scope_by": ["channel"], + "window": {"kind": "rolling", "seconds": 86400} + } + ] + } + } + }, + "action": {"decision": "deny"} + } +""" + +from agent_control_evaluator_financial_governance.spend_limit import ( + BudgetLimit, + BudgetWindow, + SpendLimitConfig, + SpendLimitEvaluator, +) +from agent_control_evaluator_financial_governance.transaction_policy import ( + TransactionPolicyConfig, + TransactionPolicyEvaluator, +) + +__all__ = [ + "SpendLimitEvaluator", + "SpendLimitConfig", + "BudgetLimit", + "BudgetWindow", + "TransactionPolicyEvaluator", + "TransactionPolicyConfig", +] diff --git a/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/__init__.py b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/__init__.py new file mode 100644 index 00000000..424d6107 --- /dev/null +++ b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/__init__.py @@ -0,0 +1,14 @@ +"""Spend-limit evaluator package.""" + +from .config import BudgetLimit, BudgetWindow, SpendLimitConfig +from .evaluator import SpendLimitEvaluator +from .store import InMemorySpendStore, SpendStore + +__all__ = [ + "SpendLimitEvaluator", + "SpendLimitConfig", + "BudgetLimit", + "BudgetWindow", + "SpendStore", + "InMemorySpendStore", +] diff --git a/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/config.py b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/config.py new file mode 100644 index 00000000..f33bddce --- /dev/null +++ b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/config.py @@ -0,0 +1,187 @@ +"""Configuration model for the spend-limit evaluator.""" + +from __future__ import annotations + +from decimal import Decimal +from typing import Any + +from pydantic import Field, field_validator, model_validator + +from agent_control_evaluators import EvaluatorConfig + + +class BudgetWindow(EvaluatorConfig): + """Defines the time window for a rolling or calendar-based budget. + + Attributes: + kind: ``"rolling"`` — a sliding window of *seconds* duration; + ``"fixed"`` — a calendar-aligned window (day / week / month). + seconds: Window length in seconds. **Required** when ``kind="rolling"``. + unit: Calendar unit. **Required** when ``kind="fixed"``. + One of ``"day"``, ``"week"``, ``"month"``. + timezone: IANA timezone name for ``kind="fixed"`` windows (e.g. + ``"America/New_York"``). Defaults to ``"UTC"`` when omitted. + + Examples:: + + BudgetWindow(kind="rolling", seconds=86400) # 24-hour rolling + BudgetWindow(kind="fixed", unit="day") # UTC calendar day + BudgetWindow(kind="fixed", unit="month", timezone="America/New_York") + """ + + kind: str = Field( + ..., + description='Window kind: "rolling" or "fixed".', + ) + seconds: int | None = Field( + default=None, + ge=1, + description="Window duration in seconds. Required for kind='rolling'.", + ) + unit: str | None = Field( + default=None, + description=( + 'Calendar unit: "day", "week", or "month". Required for kind="fixed".' + ), + ) + timezone: str | None = Field( + default=None, + description='IANA timezone (e.g. "America/New_York"). Defaults to "UTC".', + ) + + @model_validator(mode="after") + def validate_window_fields(self) -> BudgetWindow: + """Enforce that required fields are present for each kind.""" + if self.kind == "rolling": + if self.seconds is None: + raise ValueError( + "BudgetWindow kind='rolling' requires 'seconds' to be set" + ) + elif self.kind == "fixed": + valid_units = {"day", "week", "month"} + if self.unit is None: + raise ValueError( + "BudgetWindow kind='fixed' requires 'unit' to be set " + f"(one of {sorted(valid_units)})" + ) + if self.unit not in valid_units: + raise ValueError( + f"BudgetWindow unit must be one of {sorted(valid_units)}, " + f"got '{self.unit}'" + ) + else: + raise ValueError( + f"BudgetWindow kind must be 'rolling' or 'fixed', got '{self.kind}'" + ) + return self + + +class BudgetLimit(EvaluatorConfig): + """A single budget constraint, optionally scoped to a context dimension. + + Attributes: + amount: Maximum monetary amount. Uses ``Decimal`` for precision — + never ``float`` for money. + currency: Currency symbol this limit applies to (e.g. ``"USDC"``). + scope_by: Tuple of context dimension keys used to isolate budgets. + Each dimension is **independent**: ``scope_by=("channel",)`` creates + a separate counter for each unique channel value. + An empty tuple means global (unscoped): all transactions for this + currency share a single counter. + window: Time window for accumulated-spend budgets. ``None`` means a + per-transaction cap: ``amount`` is the maximum for any single + transaction, regardless of accumulated spend. + + Examples:: + + # Per-transaction cap of 500 USDC regardless of channel or agent + BudgetLimit(amount=Decimal("500"), currency="USDC") + + # Per-channel rolling 24-hour budget of 5000 USDC + BudgetLimit( + amount=Decimal("5000"), + currency="USDC", + scope_by=("channel",), + window=BudgetWindow(kind="rolling", seconds=86400), + ) + + # Per-agent calendar-day budget (US Eastern) + BudgetLimit( + amount=Decimal("1000"), + currency="USDC", + scope_by=("agent_id",), + window=BudgetWindow(kind="fixed", unit="day", timezone="America/New_York"), + ) + """ + + amount: Decimal = Field( + ..., + gt=Decimal("0"), + description="Budget ceiling — Decimal for monetary precision.", + ) + currency: str = Field( + ..., + min_length=1, + description="Currency symbol this limit applies to (e.g. 'USDC', 'ETH').", + ) + scope_by: tuple[str, ...] = Field( + default=(), + description=( + "Context dimension keys that isolate spend buckets. " + "scope_by=('channel',) → one budget per channel. " + "Empty tuple = global budget." + ), + ) + window: BudgetWindow | None = Field( + default=None, + description=( + "Time window for accumulated-spend budgets. " + "None = per-transaction cap (amount is the per-call maximum)." + ), + ) + + @field_validator("currency") + @classmethod + def normalize_currency(cls, v: str) -> str: + """Normalize currency symbol to upper-case for consistent comparison.""" + return v.upper() + + @field_validator("scope_by", mode="before") + @classmethod + def coerce_scope_by(cls, v: Any) -> tuple[str, ...]: + """Accept list or tuple for scope_by and coerce to tuple.""" + if isinstance(v, list): + return tuple(v) + return v + + +class SpendLimitConfig(EvaluatorConfig): + """Configuration for :class:`~.evaluator.SpendLimitEvaluator`. + + Each entry in *limits* is evaluated independently. First violation wins. + + Attributes: + limits: List of :class:`BudgetLimit` constraints to enforce. + The evaluator checks each limit in order and returns a violation + result on the first breach. An empty list means no limits — + all transactions are allowed. + + Example config dict:: + + { + "limits": [ + {"amount": "500.00", "currency": "USDC"}, + { + "amount": "5000.00", + "currency": "USDC", + "scope_by": ["channel"], + "window": {"kind": "rolling", "seconds": 86400} + } + ] + } + """ + + limits: list[BudgetLimit] = Field( + default_factory=list, + description="Budget constraints to enforce. Evaluated in order; first violation wins.", + ) diff --git a/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/evaluator.py b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/evaluator.py new file mode 100644 index 00000000..09e17ada --- /dev/null +++ b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/evaluator.py @@ -0,0 +1,377 @@ +"""Spend-limit evaluator — tracks cumulative agent spend against rolling budgets.""" + +from __future__ import annotations + +import calendar +import time +from decimal import Decimal, InvalidOperation +from typing import Any + +from agent_control_evaluators import ( + Evaluator, + EvaluatorMetadata, + register_evaluator, +) +from agent_control_models import EvaluatorResult + +from .config import BudgetLimit, SpendLimitConfig +from .store import InMemorySpendStore, SpendStore + + +def _extract_decimal(data: dict[str, Any], key: str) -> Decimal | None: + """Safely extract a Decimal value from *data* by *key*. + + Returns None if the key is absent or the value cannot be coerced. + """ + raw = data.get(key) + if raw is None: + return None + try: + return Decimal(str(raw)) + except (TypeError, ValueError, InvalidOperation): + return None + + +def _window_start(limit: BudgetLimit) -> float: + """Compute the Unix timestamp start of the current budget window. + + For ``kind="rolling"``: ``now - seconds``. + For ``kind="fixed"`` with ``unit="day"``: midnight UTC today. + For ``kind="fixed"`` with ``unit="week"``: midnight UTC Monday of this week. + For ``kind="fixed"`` with ``unit="month"``: midnight UTC on the 1st of this month. + + Note: Timezone support is noted in the model but calendar alignment uses UTC + for now. Full IANA timezone support is a follow-up. + """ + window = limit.window + assert window is not None # called only when window is set + + now = time.time() + if window.kind == "rolling": + assert window.seconds is not None + return now - window.seconds + + # kind == "fixed" + import datetime as _dt + utc_now = _dt.datetime.now(_dt.timezone.utc) + + if window.unit == "day": + start = utc_now.replace(hour=0, minute=0, second=0, microsecond=0) + elif window.unit == "week": + # Monday of the current ISO week + start = utc_now - _dt.timedelta(days=utc_now.weekday()) + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + elif window.unit == "month": + start = utc_now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + else: + # Fallback — should not happen given BudgetWindow validation + start = utc_now.replace(hour=0, minute=0, second=0, microsecond=0) + + return start.timestamp() + + +@register_evaluator +class SpendLimitEvaluator(Evaluator[SpendLimitConfig]): + """Evaluator that enforces per-transaction and rolling-period spend limits. + + ``matched=True`` means the transaction **violates** at least one configured + limit and should be blocked. ``matched=False`` means the transaction is + within all budget constraints and may proceed. + + Thread safety: + The evaluator itself is stateless. All mutable state lives in the + injected :class:`~.store.SpendStore`. The default + :class:`~.store.InMemorySpendStore` is thread-safe. + + Instance caching note: + Evaluator instances are cached and reused across requests (see base + class docstring). Only the ``SpendStore`` instance is mutable; do not + add per-request state to ``self``. + + Args: + config: Validated :class:`SpendLimitConfig` with ``limits`` list. + store: Optional :class:`SpendStore` implementation. Defaults to a new + :class:`InMemorySpendStore` when not provided. + + Input ``data`` schema:: + + { + "amount": Decimal | float | str, # required — transaction amount + "currency": str, # required — payment currency + "recipient": str, # required — recipient address or id + # optional context fields (used for scope_by matching) + "channel": str, + "agent_id": str, + "session_id": str, + } + + Example:: + + from agent_control_evaluator_financial_governance.spend_limit import ( + BudgetLimit, BudgetWindow, SpendLimitConfig, SpendLimitEvaluator + ) + from decimal import Decimal + + config = SpendLimitConfig(limits=[ + BudgetLimit(amount=Decimal("100"), currency="USDC"), + BudgetLimit( + amount=Decimal("1000"), + currency="USDC", + scope_by=("channel",), + window=BudgetWindow(kind="rolling", seconds=86400), + ), + ]) + evaluator = SpendLimitEvaluator(config) + result = await evaluator.evaluate({ + "amount": "50.00", + "currency": "USDC", + "recipient": "0xABC...", + "channel": "slack", + }) + # result.matched == False → transaction is within limits + """ + + metadata = EvaluatorMetadata( + name="financial_governance.spend_limit", + version="0.1.0", + description=( + "Tracks cumulative agent spend and enforces per-transaction caps " + "and rolling period budgets. Supports pluggable SpendStore backends." + ), + ) + config_model = SpendLimitConfig + + def __init__( + self, + config: SpendLimitConfig, + store: SpendStore | None = None, + ) -> None: + super().__init__(config) + self._store: SpendStore = store if store is not None else InMemorySpendStore() + + # ------------------------------------------------------------------ + # Main evaluation entry point + # ------------------------------------------------------------------ + + @staticmethod + def _normalize_data(data: Any) -> tuple[dict[str, Any] | None, dict[str, Any]]: + """Extract transaction fields and step context from selector output. + + Handles two selector paths: + - ``selector.path: "input"`` → data IS the transaction dict. + - ``selector.path: "*"`` → data is the full Step dict with ``input`` + and ``context`` sub-keys. + + Returns: + (tx_data, step_context) where tx_data is the transaction dict + (or None if missing) and step_context holds channel/agent_id/etc. + """ + if not isinstance(data, dict): + return None, {} + + # If data looks like a Step (has "input" + "type" keys), extract + # the transaction payload from "input" and context from "context". + if "type" in data and "input" in data: + tx = data.get("input") + ctx = data.get("context") or {} + if not isinstance(tx, dict): + return None, ctx if isinstance(ctx, dict) else {} + # Merge step context into tx so downstream logic sees channel/agent_id. + # Input fields take priority — context must NOT clobber input values. + merged = {**tx} + if isinstance(ctx, dict): + for k in ("channel", "agent_id", "session_id"): + if k in ctx and k not in merged: + merged[k] = ctx[k] + return merged, ctx if isinstance(ctx, dict) else {} + + # Otherwise assume data IS the transaction dict (selector.path: "input") + return data, {} + + def _build_scope( + self, data: dict[str, Any], limit: BudgetLimit + ) -> dict[str, str] | None: + """Build the scope filter for *limit* from transaction *data*. + + For each key in ``limit.scope_by``, extract the value from ``data`` + (if present). Returns ``None`` (global query) when scope_by is empty + or none of the specified keys are present in data. + """ + if not limit.scope_by: + return None + + scope: dict[str, str] = {} + for k in limit.scope_by: + val = data.get(k) + if val is not None: + scope[k] = str(val) + + return scope if scope else None + + async def evaluate(self, data: Any) -> EvaluatorResult: + """Evaluate a transaction against all configured spend limits. + + Iterates over ``config.limits`` in order. Returns the first violation + found or a passing result if all limits are satisfied. After passing + all rolling-period limits, records the transaction in the store. + + Args: + data: Transaction dict (when ``selector.path`` is ``"input"``) + or full Step dict (when path is ``"*"``). Malformed payload + returns ``matched=False, error=None`` — not an evaluator error. + + Returns: + ``EvaluatorResult`` where ``matched=True`` indicates a limit + violation (transaction should be denied). + """ + if data is None: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="No transaction data provided; skipping spend-limit check", + ) + + tx_data, _step_ctx = self._normalize_data(data) + if tx_data is None: + return EvaluatorResult( + matched=False, + confidence=1.0, + message=( + "Could not extract transaction data from selector output; " + "skipping spend-limit check" + ), + ) + + data = tx_data + + # ---- Extract required fields ---- + # NOTE: Malformed selector output is NOT an evaluator error. + # Missing or invalid fields → matched=False, error=None. + amount = _extract_decimal(data, "amount") + if amount is None: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="Transaction data missing required field 'amount'; cannot evaluate", + ) + if amount <= Decimal("0"): + return EvaluatorResult( + matched=False, + confidence=1.0, + message=f"Transaction amount must be positive, got {amount}; cannot evaluate", + ) + + tx_currency: str = str(data.get("currency", "")).upper() + if not tx_currency: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="Transaction data missing required field 'currency'; cannot evaluate", + ) + + recipient: str = str(data.get("recipient", "")).strip() + + # ---- No limits configured → allow everything ---- + if not self.config.limits: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="No limits configured; transaction allowed", + metadata={"amount": float(amount), "currency": tx_currency, "recipient": recipient}, + ) + + # ---- Evaluate each limit in order ---- + # We iterate all limits first to check. If all pass, record once at the end. + # For period budgets we use check_and_record atomically to avoid TOCTOU. + # We collect limits that apply to this transaction (matching currency) + # and also track which limits need to be recorded after all checks pass. + + period_limits_to_record: list[tuple[BudgetLimit, dict[str, str] | None, float]] = [] + # ^ (limit, scope, window_start) + + for limit in self.config.limits: + # Skip limits for other currencies + if limit.currency != tx_currency: + continue + + scope = self._build_scope(data, limit) + + # Per-transaction cap (window=None) + if limit.window is None: + if amount > limit.amount: + return EvaluatorResult( + matched=True, + confidence=1.0, + message=( + f"Transaction amount {amount} {tx_currency} exceeds " + f"per-transaction cap of {limit.amount} {tx_currency}" + ), + metadata={ + "violation": "per_transaction_cap", + "amount": float(amount), + "max_per_transaction": float(limit.amount), + "currency": tx_currency, + "recipient": recipient, + }, + ) + # Per-tx cap passes → no need to "record" a cap (it's per-call) + + else: + # Rolling / fixed period budget + win_start = _window_start(limit) + period_limits_to_record.append((limit, scope, win_start)) + + period_spend = self._store.get_spend(tx_currency, win_start, scope=scope) + projected = period_spend + amount + + if projected > limit.amount: + return EvaluatorResult( + matched=True, + confidence=1.0, + message=( + f"Transaction would bring period spend to " + f"{projected} {tx_currency}, exceeding the " + f"{limit.window.kind} budget of {limit.amount} {tx_currency} " + f"(current period spend: {period_spend})" + ), + metadata={ + "violation": "period_budget", + "amount": float(amount), + "current_period_spend": float(period_spend), + "projected_period_spend": float(projected), + "max_per_period": float(limit.amount), + "currency": tx_currency, + "recipient": recipient, + }, + ) + + # ---- All limits passed — record the spend ---- + # Build metadata to attach to the spend record + spend_metadata: dict[str, Any] = { + k: data[k] + for k in ("channel", "agent_id", "session_id") + if k in data and data[k] is not None + } + spend_metadata["recipient"] = recipient + + # Record once per transaction (not once per limit — the store is a ledger) + # We only need one record; all scope queries will find it via their filters. + if period_limits_to_record: + self._store.record_spend( + amount=amount, + currency=tx_currency, + metadata=spend_metadata if spend_metadata else None, + ) + + return EvaluatorResult( + matched=False, + confidence=1.0, + message=( + f"Transaction of {amount} {tx_currency} to '{recipient}' is within limits" + ), + metadata={ + "amount": float(amount), + "currency": tx_currency, + "recipient": recipient, + }, + ) diff --git a/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/store.py b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/store.py new file mode 100644 index 00000000..260cf684 --- /dev/null +++ b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/spend_limit/store.py @@ -0,0 +1,303 @@ +"""SpendStore protocol and built-in InMemorySpendStore implementation. + +The SpendStore abstraction decouples the spend-limit evaluator from any +particular persistence backend. The default ``InMemorySpendStore`` requires no +external dependencies and is suitable for single-process deployments or testing. + +For production multi-process or multi-replica deployments you should implement a +custom SpendStore backed by a durable store such as PostgreSQL or Redis. See +README.md for an example. + +Atomicity note +-------------- +The ``check_and_record()`` method is the recommended path for enforcing hard +spend caps. It atomically queries the current spend *and* records a new entry +(or rejects it) in a single operation, eliminating the TOCTOU race that exists +when callers do ``get_spend()`` followed by ``record_spend()`` separately. + +The ``InMemorySpendStore`` implements atomicity with a threading ``Lock``. +This is safe within a single process but does NOT prevent overshoot across +multiple processes or replicas. Production deployments that require strict +enforcement should use a backend with database-level atomics: + +- **PostgreSQL**: ``SELECT SUM(...) FOR UPDATE`` + conditional ``INSERT`` + inside a single transaction. +- **Redis**: Lua script or ``MULTI``/``EXEC`` pipeline with a + compare-and-swap pattern. + +Document this single-process limitation prominently in any custom store +implementation so operators are not surprised by concurrent overshoot in +distributed deployments. +""" + +from __future__ import annotations + +import time +from collections import deque +from decimal import Decimal +from threading import Lock +from typing import Any, Protocol, runtime_checkable + + +@runtime_checkable +class SpendStore(Protocol): + """Protocol that all spend store implementations must satisfy. + + Implementations are free to choose any persistence mechanism. + All methods must be thread-safe. + + Atomic enforcement + ------------------ + Prefer ``check_and_record()`` over the separate ``get_spend()`` + + ``record_spend()`` pattern. The split pattern has a TOCTOU race condition: + two concurrent requests can both read the same current spend, both decide + they are within budget, and both record — overshooting the cap. + + ``check_and_record()`` performs the read-decide-write as a single atomic + step. For the ``InMemorySpendStore`` this is protected by a + ``threading.Lock`` (single-process only). Production stores should use + DB-level atomics (see module docstring). + """ + + def record_spend( + self, + amount: Decimal, + currency: str, + metadata: dict[str, Any] | None = None, + ) -> None: + """Persist a completed spend record. + + Args: + amount: Positive monetary amount (Decimal — never float for money). + currency: ISO-4217 or token symbol (e.g. ``"USDC"``). + metadata: Optional key-value bag for agent_id, session_id, etc. + """ + ... + + def get_spend( + self, + currency: str, + start: float, + end: float | None = None, + scope: dict[str, str] | None = None, + ) -> Decimal: + """Return total spend for *currency* within the given time range. + + Args: + currency: Currency symbol to query (case-sensitive). + start: Unix timestamp (seconds, inclusive lower bound). + end: Unix timestamp (seconds, inclusive upper bound). ``None`` + means "up to now" — no upper bound is applied. + scope: Optional key-value pairs to filter by metadata fields. + For example, ``{"channel": "slack"}`` returns only spend + recorded with that channel in metadata. When None, returns + all spend regardless of metadata. + + **Scope semantics (composite key):** + All present keys together form a single composite scope key. + A record with ``{"channel": "A", "agent_id": "bot-1"}`` will + only match a scope of ``{"channel": "A", "agent_id": "bot-1"}`` + — NOT a query for ``{"channel": "A"}`` alone. + + Returns: + Sum of all matching spend amounts as a Decimal. + """ + ... + + def check_and_record( + self, + amount: Decimal, + currency: str, + limit: Decimal, + start: float, + end: float | None = None, + scope: dict[str, str] | None = None, + metadata: dict[str, Any] | None = None, + ) -> tuple[bool, Decimal]: + """Atomically check whether recording *amount* stays within *limit* + and, if so, record it. + + Eliminates the TOCTOU race of separate ``get_spend()`` + ``record_spend()``. + + **Single-process atomicity only** for ``InMemorySpendStore``. + Production stores must use DB-level atomics (see module docstring). + + Args: + amount: Positive monetary amount of the proposed transaction. + currency: Currency symbol (e.g. ``"USDC"``). + limit: Maximum allowed total spend *including* this transaction. + Rejected if ``current_spend + amount > limit``. + start: Unix timestamp lower bound for the current-period query. + end: Unix timestamp upper bound (``None`` = "up to now"). + scope: Optional metadata filter (same semantics as ``get_spend``). + metadata: Metadata to attach to the new record if accepted. + + Returns: + ``(accepted, current_spend)`` where: + + - ``accepted`` is ``True`` when within budget and recorded. + - ``current_spend`` is total period spend *before* this transaction. + """ + ... + + +class _SpendRecord: + """Internal record stored by :class:`InMemorySpendStore`.""" + + __slots__ = ("amount", "currency", "recorded_at", "metadata") + + def __init__( + self, + amount: Decimal, + currency: str, + recorded_at: float, + metadata: dict[str, Any] | None, + ) -> None: + self.amount = amount + self.currency = currency + self.recorded_at = recorded_at + self.metadata = metadata + + def matches_scope(self, scope: dict[str, str]) -> bool: + """Check if this record's metadata matches all scope key-value pairs.""" + if not self.metadata: + return False + return all( + self.metadata.get(k) == v + for k, v in scope.items() + ) + + +class InMemorySpendStore: + """Thread-safe in-memory implementation of :class:`SpendStore`. + + Records are kept in a ``deque`` ordered by insertion time. Records older + than *max_age_seconds* are pruned to prevent unbounded memory growth. + + **Single-process only.** Each process maintains an independent ledger. + Use for single-process services, local development, and tests. + For production deployments use a custom ``SpendStore`` backed by + PostgreSQL, Redis, or another shared store with DB-level atomic operations. + + Atomicity + --------- + ``check_and_record()`` acquires the internal lock for the entire + read-decide-write sequence, making it atomic within a single process. + ``get_spend()`` + ``record_spend()`` called separately are *not* atomic + and may overshoot the cap under concurrent load. + + Args: + max_age_seconds: Records older than this are eligible for pruning. + Defaults to 7 days (604 800 s). + """ + + def __init__(self, max_age_seconds: int = 604_800) -> None: + self._max_age_seconds = max_age_seconds + self._records: deque[_SpendRecord] = deque() + self._lock = Lock() + + # ------------------------------------------------------------------ + # SpendStore protocol implementation + # ------------------------------------------------------------------ + + def record_spend( + self, + amount: Decimal, + currency: str, + metadata: dict[str, Any] | None = None, + ) -> None: + """Record a spend event at the current wall-clock time.""" + if amount <= Decimal("0"): + raise ValueError(f"amount must be positive, got {amount!r}") + + now = time.time() + record = _SpendRecord( + amount=amount, + currency=currency, + recorded_at=now, + metadata=metadata, + ) + with self._lock: + self._records.append(record) + self._prune_locked(now) + + def get_spend( + self, + currency: str, + start: float, + end: float | None = None, + scope: dict[str, str] | None = None, + ) -> Decimal: + """Sum all spend for *currency* in the time range [start, end].""" + with self._lock: + return self._sum_locked(currency, start, end, scope) + + def check_and_record( + self, + amount: Decimal, + currency: str, + limit: Decimal, + start: float, + end: float | None = None, + scope: dict[str, str] | None = None, + metadata: dict[str, Any] | None = None, + ) -> tuple[bool, Decimal]: + """Atomically check the period budget and record if within limit. + + Acquires the internal lock for the entire read-decide-write sequence. + **Single-process atomicity only** — does not coordinate across + multiple processes or replicas. + """ + if amount <= Decimal("0"): + raise ValueError(f"amount must be positive, got {amount!r}") + + now = time.time() + with self._lock: + current = self._sum_locked(currency, start, end, scope) + if current + amount > limit: + return False, current + record = _SpendRecord( + amount=amount, + currency=currency, + recorded_at=now, + metadata=metadata, + ) + self._records.append(record) + self._prune_locked(now) + return True, current + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _sum_locked( + self, + currency: str, + start: float, + end: float | None, + scope: dict[str, str] | None, + ) -> Decimal: + """Sum records matching the query (must be called with _lock held).""" + total = Decimal("0") + for r in self._records: + if r.currency != currency: + continue + if r.recorded_at < start: + continue + if end is not None and r.recorded_at > end: + continue + if scope is not None and not r.matches_scope(scope): + continue + total += r.amount + return total + + def _prune_locked(self, now: float) -> None: + """Remove records older than *max_age_seconds* (called with lock held).""" + cutoff = now - self._max_age_seconds + while self._records and self._records[0].recorded_at < cutoff: + self._records.popleft() + + def record_count(self) -> int: + """Return the current number of stored records (useful for tests).""" + with self._lock: + return len(self._records) diff --git a/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/__init__.py b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/__init__.py new file mode 100644 index 00000000..693b8ccc --- /dev/null +++ b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/__init__.py @@ -0,0 +1,9 @@ +"""Transaction-policy evaluator package.""" + +from .config import TransactionPolicyConfig +from .evaluator import TransactionPolicyEvaluator + +__all__ = [ + "TransactionPolicyEvaluator", + "TransactionPolicyConfig", +] diff --git a/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/config.py b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/config.py new file mode 100644 index 00000000..286e8a2f --- /dev/null +++ b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/config.py @@ -0,0 +1,90 @@ +"""Configuration model for the transaction-policy evaluator.""" + +from __future__ import annotations + +from decimal import Decimal +from typing import Any + +from pydantic import Field, field_validator, model_validator + +from agent_control_evaluators import EvaluatorConfig + + +class TransactionPolicyConfig(EvaluatorConfig): + """Configuration for :class:`~.evaluator.TransactionPolicyEvaluator`. + + All list fields default to empty lists (no restriction applied). A field + is only enforced when it contains at least one entry. + + Attributes: + allowed_recipients: If non-empty, **only** recipients in this list are + permitted. Transactions to any other address are blocked. + blocked_recipients: Recipients that are explicitly prohibited. Checked + before ``allowed_recipients``. + min_amount: Minimum transaction amount (inclusive). ``Decimal("0")`` + disables the lower bound check. + max_amount: Maximum transaction amount (inclusive). ``Decimal("0")`` + disables the upper bound check. + allowed_currencies: If non-empty, **only** currencies in this list are + permitted. + + Example config dict:: + + { + "allowed_recipients": ["0xABC...", "0xDEF..."], + "blocked_recipients": ["0xDEAD..."], + "min_amount": "0.01", + "max_amount": "10000.00", + "allowed_currencies": ["USDC", "USDT"] + } + """ + + allowed_recipients: list[str] = Field( + default_factory=list, + description=( + "Allowlisted recipient addresses. When non-empty, only these " + "recipients are permitted." + ), + ) + blocked_recipients: list[str] = Field( + default_factory=list, + description="Blocklisted recipient addresses that are always denied.", + ) + min_amount: Decimal = Field( + default=Decimal("0"), + ge=Decimal("0"), + description="Minimum transaction amount (inclusive). Decimal('0') = no minimum.", + ) + max_amount: Decimal = Field( + default=Decimal("0"), + ge=Decimal("0"), + description="Maximum transaction amount (inclusive). Decimal('0') = no maximum.", + ) + allowed_currencies: list[str] = Field( + default_factory=list, + description=( + "Permitted currency symbols. When non-empty, only these " + "currencies are accepted." + ), + ) + + @field_validator("allowed_currencies", mode="before") + @classmethod + def normalize_currencies(cls, v: Any) -> list[str]: + """Normalize all currency symbols to upper-case.""" + if not isinstance(v, list): + return v + return [c.upper() for c in v] + + @model_validator(mode="after") + def validate_amount_bounds(self) -> TransactionPolicyConfig: + """Ensure max_amount >= min_amount when both are non-zero.""" + if ( + self.max_amount > Decimal("0") + and self.min_amount > Decimal("0") + and self.max_amount < self.min_amount + ): + raise ValueError( + f"max_amount ({self.max_amount}) must be >= min_amount ({self.min_amount})" + ) + return self diff --git a/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/evaluator.py b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/evaluator.py new file mode 100644 index 00000000..f1542fa8 --- /dev/null +++ b/evaluators/contrib/financial-governance/src/agent_control_evaluator_financial_governance/transaction_policy/evaluator.py @@ -0,0 +1,264 @@ +"""Transaction-policy evaluator — static policy checks with no state tracking.""" + +from __future__ import annotations + +from decimal import Decimal, InvalidOperation +from typing import Any + +from agent_control_evaluators import ( + Evaluator, + EvaluatorMetadata, + register_evaluator, +) +from agent_control_models import EvaluatorResult + +from .config import TransactionPolicyConfig + + +@register_evaluator +class TransactionPolicyEvaluator(Evaluator[TransactionPolicyConfig]): + """Stateless evaluator for static transaction policy checks. + + Checks are applied in this order (first violation wins): + + 1. Currency allowlist (if configured) + 2. Recipient blocklist + 3. Recipient allowlist (if configured) + 4. Minimum amount bound + 5. Maximum amount bound + + ``matched=True`` means the transaction **violates** the policy and should be + blocked. ``matched=False`` means the transaction passed all checks. + + Thread safety: + This evaluator has no mutable instance state. Concurrent calls to + :meth:`evaluate` are safe. + + Input ``data`` schema:: + + { + "amount": Decimal | float | str, # required — transaction amount + "currency": str, # required — payment currency + "recipient": str, # required — recipient address or id + # optional context fields (logged in result metadata) + "channel": str, + "agent_id": str, + "session_id": str + } + + Example:: + + from agent_control_evaluator_financial_governance.transaction_policy import ( + TransactionPolicyConfig, + TransactionPolicyEvaluator, + ) + from decimal import Decimal + + config = TransactionPolicyConfig( + allowed_currencies=["USDC", "USDT"], + blocked_recipients=["0xDEAD..."], + max_amount=Decimal("5000"), + ) + evaluator = TransactionPolicyEvaluator(config) + result = await evaluator.evaluate({ + "amount": "100.00", + "currency": "USDC", + "recipient": "0xABC...", + }) + # result.matched == False → transaction passes all policy checks + """ + + metadata = EvaluatorMetadata( + name="financial_governance.transaction_policy", + version="0.1.0", + description=( + "Static transaction policy enforcement: recipient allowlists/blocklists, " + "amount bounds, and currency restrictions. No state tracking." + ), + ) + config_model = TransactionPolicyConfig + + @staticmethod + def _normalize_data(data: Any) -> dict[str, Any] | None: + """Extract transaction fields from selector output. + + Handles ``selector.path: "input"`` (data is the transaction dict) + and ``selector.path: "*"`` (data is the full Step dict). + """ + if not isinstance(data, dict): + return None + if "type" in data and "input" in data: + tx = data.get("input") + ctx = data.get("context") or {} + if not isinstance(tx, dict): + return None + merged = {**tx} + if isinstance(ctx, dict): + for k in ("channel", "agent_id", "session_id"): + if k in ctx and k not in merged: + merged[k] = ctx[k] + return merged + return data + + async def evaluate(self, data: Any) -> EvaluatorResult: + """Evaluate a transaction against the static policy. + + Args: + data: Transaction dict (when ``selector.path`` is ``"input"``) + or full Step dict (when path is ``"*"``). Malformed payload + returns ``matched=False, error=None`` — not an evaluator error. + + Returns: + ``EvaluatorResult`` where ``matched=True`` indicates a policy + violation (transaction should be denied). + """ + if data is None: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="No transaction data provided; skipping policy check", + ) + + tx_data = self._normalize_data(data) + if tx_data is None: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="Could not extract transaction data from selector output; skipping", + ) + + # Use normalized transaction dict for the rest of evaluate + data = tx_data + + # ---- Extract and validate required fields ---- + # Malformed input → matched=False, error=None (not an evaluator crash) + currency_raw = data.get("currency") + if not currency_raw: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="Transaction data missing required field 'currency'", + ) + currency: str = str(currency_raw).upper() + + recipient_raw = data.get("recipient") + if not recipient_raw: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="Transaction data missing required field 'recipient'", + ) + recipient: str = str(recipient_raw).strip() + + amount_raw = data.get("amount") + if amount_raw is None: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="Transaction data missing required field 'amount'", + ) + try: + amount = Decimal(str(amount_raw)) + except (TypeError, ValueError, InvalidOperation): + return EvaluatorResult( + matched=False, + confidence=1.0, + message=f"Transaction 'amount' is not numeric: {amount_raw!r}", + ) + + # Build shared metadata for result context + base_meta: dict[str, Any] = { + "amount": float(amount), + "currency": currency, + "recipient": recipient, + } + for ctx_key in ("channel", "agent_id", "session_id"): + if ctx_key in data and data[ctx_key] is not None: + base_meta[ctx_key] = data[ctx_key] + + # ---- Check 1: Currency allowlist ---- + if self.config.allowed_currencies: + if currency not in self.config.allowed_currencies: + return EvaluatorResult( + matched=True, + confidence=1.0, + message=( + f"Currency '{currency}' is not in the allowed currencies list: " + f"{self.config.allowed_currencies}" + ), + metadata={ + **base_meta, + "violation": "currency_not_allowed", + "allowed_currencies": self.config.allowed_currencies, + }, + ) + + # ---- Check 2: Recipient blocklist ---- + if self.config.blocked_recipients and recipient in self.config.blocked_recipients: + return EvaluatorResult( + matched=True, + confidence=1.0, + message=f"Recipient '{recipient}' is on the blocklist", + metadata={ + **base_meta, + "violation": "recipient_blocked", + }, + ) + + # ---- Check 3: Recipient allowlist ---- + if self.config.allowed_recipients: + if recipient not in self.config.allowed_recipients: + return EvaluatorResult( + matched=True, + confidence=1.0, + message=( + f"Recipient '{recipient}' is not in the allowed recipients list" + ), + metadata={ + **base_meta, + "violation": "recipient_not_allowed", + }, + ) + + # ---- Check 4: Minimum amount ---- + if self.config.min_amount > Decimal("0") and amount < self.config.min_amount: + return EvaluatorResult( + matched=True, + confidence=1.0, + message=( + f"Transaction amount {amount} {currency} is below the minimum " + f"of {self.config.min_amount} {currency}" + ), + metadata={ + **base_meta, + "violation": "amount_below_minimum", + "min_amount": float(self.config.min_amount), + }, + ) + + # ---- Check 5: Maximum amount ---- + if self.config.max_amount > Decimal("0") and amount > self.config.max_amount: + return EvaluatorResult( + matched=True, + confidence=1.0, + message=( + f"Transaction amount {amount} {currency} exceeds the maximum " + f"of {self.config.max_amount} {currency}" + ), + metadata={ + **base_meta, + "violation": "amount_exceeds_maximum", + "max_amount": float(self.config.max_amount), + }, + ) + + # ---- All checks passed ---- + return EvaluatorResult( + matched=False, + confidence=1.0, + message=( + f"Transaction of {amount} {currency} to '{recipient}' " + "passed all policy checks" + ), + metadata=base_meta, + ) diff --git a/evaluators/contrib/financial-governance/tests/__init__.py b/evaluators/contrib/financial-governance/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/evaluators/contrib/financial-governance/tests/test_spend_limit.py b/evaluators/contrib/financial-governance/tests/test_spend_limit.py new file mode 100644 index 00000000..59316099 --- /dev/null +++ b/evaluators/contrib/financial-governance/tests/test_spend_limit.py @@ -0,0 +1,730 @@ +"""Tests for the spend_limit evaluator and supporting infrastructure.""" + +from __future__ import annotations + +import time +from decimal import Decimal +from typing import Any + +import pytest + +from agent_control_evaluator_financial_governance.spend_limit import ( + BudgetLimit, + BudgetWindow, + InMemorySpendStore, + SpendLimitConfig, + SpendLimitEvaluator, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _rolling_window(seconds: int = 86400) -> BudgetWindow: + return BudgetWindow(kind="rolling", seconds=seconds) + + +def _per_tx_limit(amount: str | Decimal, currency: str = "USDC", **kw: Any) -> BudgetLimit: + """Build a per-transaction cap (no window).""" + return BudgetLimit(amount=Decimal(str(amount)), currency=currency, **kw) + + +def _period_limit( + amount: str | Decimal, + currency: str = "USDC", + seconds: int = 86400, + **kw: Any, +) -> BudgetLimit: + """Build a rolling-period budget limit.""" + return BudgetLimit( + amount=Decimal(str(amount)), + currency=currency, + window=_rolling_window(seconds), + **kw, + ) + + +def _make_evaluator( + limits: list[BudgetLimit] | None = None, + store: InMemorySpendStore | None = None, + # Legacy convenience kwargs translated to BudgetLimit list + max_per_transaction: str | Decimal | None = None, + max_per_period: str | Decimal | None = None, + period_seconds: int = 86400, + currency: str = "USDC", +) -> SpendLimitEvaluator: + if limits is None: + limits = [] + if max_per_transaction is not None and Decimal(str(max_per_transaction)) > 0: + limits.append(_per_tx_limit(max_per_transaction, currency=currency)) + if max_per_period is not None and Decimal(str(max_per_period)) > 0: + limits.append(_period_limit(max_per_period, currency=currency, seconds=period_seconds)) + cfg = SpendLimitConfig(limits=limits) + return SpendLimitEvaluator(cfg, store=store) + + +def _tx( + amount: Any = "10.00", + currency: str = "USDC", + recipient: str = "0xABC", + **extra: Any, +) -> dict[str, Any]: + return {"amount": amount, "currency": currency, "recipient": recipient, **extra} + + +# --------------------------------------------------------------------------- +# InMemorySpendStore unit tests +# --------------------------------------------------------------------------- + + +def test_store_record_and_query() -> None: + """Basic record/query round-trip.""" + store = InMemorySpendStore() + since = time.time() - 1 + + store.record_spend(Decimal("100"), "USDC") + store.record_spend(Decimal("50"), "USDC") + store.record_spend(Decimal("200"), "ETH") + + assert store.get_spend("USDC", since) == Decimal("150") + assert store.get_spend("ETH", since) == Decimal("200") + assert store.get_spend("USDT", since) == Decimal("0") + + +def test_store_since_timestamp_filters_old_records() -> None: + store = InMemorySpendStore() + store.record_spend(Decimal("1000"), "USDC") + future_since = time.time() + 1 + assert store.get_spend("USDC", future_since) == Decimal("0") + + +def test_store_end_timestamp_filters_future_records() -> None: + store = InMemorySpendStore() + past_end = time.time() - 1 + store.record_spend(Decimal("100"), "USDC") + assert store.get_spend("USDC", time.time() - 10, end=past_end) == Decimal("0") + + +def test_store_end_none_includes_all_current_records() -> None: + store = InMemorySpendStore() + store.record_spend(Decimal("100"), "USDC") + assert store.get_spend("USDC", time.time() - 5) == Decimal("100") + + +def test_store_record_count() -> None: + store = InMemorySpendStore() + assert store.record_count() == 0 + store.record_spend(Decimal("1"), "USDC") + store.record_spend(Decimal("2"), "USDC") + assert store.record_count() == 2 + + +def test_store_rejects_non_positive_amount() -> None: + store = InMemorySpendStore() + with pytest.raises(ValueError, match="amount must be positive"): + store.record_spend(Decimal("0"), "USDC") + with pytest.raises(ValueError, match="amount must be positive"): + store.record_spend(Decimal("-5"), "USDC") + + +def test_store_metadata_accepted() -> None: + store = InMemorySpendStore() + store.record_spend( + Decimal("10"), "USDC", + metadata={"agent_id": "agent-1", "session_id": "s-99"}, + ) + assert store.record_count() == 1 + + +def test_store_scope_filter() -> None: + """get_spend with scope only returns matching records.""" + store = InMemorySpendStore() + since = time.time() - 1 + store.record_spend(Decimal("90"), "USDC", metadata={"channel": "A"}) + store.record_spend(Decimal("20"), "USDC", metadata={"channel": "B"}) + + assert store.get_spend("USDC", since, scope={"channel": "A"}) == Decimal("90") + assert store.get_spend("USDC", since, scope={"channel": "B"}) == Decimal("20") + assert store.get_spend("USDC", since) == Decimal("110") + + +# --------------------------------------------------------------------------- +# check_and_record atomic tests +# --------------------------------------------------------------------------- + + +def test_check_and_record_accepts_within_limit() -> None: + """check_and_record records and returns (True, prior_spend).""" + store = InMemorySpendStore() + since = time.time() - 1 + + accepted, prior = store.check_and_record( + amount=Decimal("50"), + currency="USDC", + limit=Decimal("100"), + start=since, + ) + assert accepted is True + assert prior == Decimal("0") + assert store.record_count() == 1 + assert store.get_spend("USDC", since) == Decimal("50") + + +def test_check_and_record_rejects_over_limit() -> None: + """check_and_record rejects when amount would exceed limit.""" + store = InMemorySpendStore() + since = time.time() - 1 + store.record_spend(Decimal("90"), "USDC") + + accepted, prior = store.check_and_record( + amount=Decimal("20"), + currency="USDC", + limit=Decimal("100"), + start=since, + ) + assert accepted is False + assert prior == Decimal("90") + assert store.record_count() == 1 + + +def test_check_and_record_exact_boundary_accepted() -> None: + """check_and_record accepts when spend exactly reaches the limit.""" + store = InMemorySpendStore() + since = time.time() - 1 + store.record_spend(Decimal("90"), "USDC") + + accepted, prior = store.check_and_record( + amount=Decimal("10"), + currency="USDC", + limit=Decimal("100"), + start=since, + ) + assert accepted is True + assert prior == Decimal("90") + assert store.get_spend("USDC", since) == Decimal("100") + + +def test_check_and_record_scoped_isolation() -> None: + """check_and_record with scope only counts matching records.""" + store = InMemorySpendStore() + since = time.time() - 1 + store.record_spend(Decimal("90"), "USDC", metadata={"channel": "A"}) + + accepted, prior = store.check_and_record( + amount=Decimal("20"), + currency="USDC", + limit=Decimal("100"), + start=since, + scope={"channel": "B"}, + metadata={"channel": "B"}, + ) + assert accepted is True + assert prior == Decimal("0") + assert store.get_spend("USDC", since, scope={"channel": "B"}) == Decimal("20") + + +def test_check_and_record_rejects_non_positive() -> None: + store = InMemorySpendStore() + with pytest.raises(ValueError): + store.check_and_record( + amount=Decimal("0"), + currency="USDC", + limit=Decimal("100"), + start=time.time() - 1, + ) + + +# --------------------------------------------------------------------------- +# BudgetWindow / BudgetLimit / SpendLimitConfig validation +# --------------------------------------------------------------------------- + + +def test_budget_limit_currency_normalized() -> None: + limit = BudgetLimit(amount=Decimal("100"), currency="usdc") + assert limit.currency == "USDC" + + +def test_budget_window_rolling_requires_seconds() -> None: + with pytest.raises(Exception): + BudgetWindow(kind="rolling") + + +def test_budget_window_fixed_requires_unit() -> None: + with pytest.raises(Exception): + BudgetWindow(kind="fixed") + + +def test_budget_window_rolling_valid() -> None: + w = BudgetWindow(kind="rolling", seconds=3600) + assert w.seconds == 3600 + + +def test_budget_window_fixed_valid() -> None: + w = BudgetWindow(kind="fixed", unit="day", timezone="America/New_York") + assert w.unit == "day" + assert w.timezone == "America/New_York" + + +def test_config_empty_limits() -> None: + cfg = SpendLimitConfig(limits=[]) + assert cfg.limits == [] + + +def test_config_limits_parsed_from_dict() -> None: + """SpendLimitConfig parses limits from dicts (Pydantic coercion).""" + cfg = SpendLimitConfig(limits=[ + {"amount": "100.00", "currency": "USDC"}, + { + "amount": "1000.00", + "currency": "USDC", + "scope_by": ["channel"], + "window": {"kind": "rolling", "seconds": 86400}, + }, + ]) + assert len(cfg.limits) == 2 + assert cfg.limits[0].amount == Decimal("100.00") + assert cfg.limits[1].scope_by == ("channel",) + assert cfg.limits[1].window is not None + assert cfg.limits[1].window.kind == "rolling" + + +def test_budget_limit_rejects_non_positive_amount() -> None: + with pytest.raises(Exception): + BudgetLimit(amount=Decimal("0"), currency="USDC") + with pytest.raises(Exception): + BudgetLimit(amount=Decimal("-1"), currency="USDC") + + +# --------------------------------------------------------------------------- +# SpendLimitEvaluator — basic behaviour +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_none_data_is_allowed() -> None: + ev = _make_evaluator(max_per_transaction="100") + result = await ev.evaluate(None) + assert result.matched is False + assert result.error is None + + +@pytest.mark.asyncio +async def test_non_dict_data_is_allowed() -> None: + ev = _make_evaluator(max_per_transaction="100") + result = await ev.evaluate("not a dict") + assert result.matched is False + assert result.error is None + + +@pytest.mark.asyncio +async def test_missing_amount_not_matched() -> None: + """Missing amount is a non-match, NOT an evaluator error.""" + ev = _make_evaluator(max_per_transaction="100") + result = await ev.evaluate({"currency": "USDC", "recipient": "0xABC"}) + assert result.matched is False + assert result.error is None + assert "amount" in (result.message or "").lower() + + +@pytest.mark.asyncio +async def test_missing_currency_not_matched() -> None: + """Missing currency is a non-match, NOT an evaluator error.""" + ev = _make_evaluator(max_per_transaction="100") + result = await ev.evaluate({"amount": "10.00", "recipient": "0xABC"}) + assert result.matched is False + assert result.error is None + assert "currency" in (result.message or "").lower() + + +@pytest.mark.asyncio +async def test_wrong_currency_is_skipped() -> None: + """Transaction in a different currency should be allowed.""" + ev = _make_evaluator(limits=[_per_tx_limit("1", currency="USDC")]) + result = await ev.evaluate(_tx(amount="99999.00", currency="ETH")) + assert result.matched is False + + +@pytest.mark.asyncio +async def test_no_limits_configured_allows_everything() -> None: + ev = _make_evaluator(limits=[]) + result = await ev.evaluate(_tx(amount="999999.00")) + assert result.matched is False + + +# --------------------------------------------------------------------------- +# Per-transaction cap tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_per_transaction_cap_violation() -> None: + ev = _make_evaluator(max_per_transaction="100") + result = await ev.evaluate(_tx(amount="101.00")) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "per_transaction_cap" + assert result.error is None + + +@pytest.mark.asyncio +async def test_per_transaction_cap_exact_boundary_allowed() -> None: + ev = _make_evaluator(max_per_transaction="100") + result = await ev.evaluate(_tx(amount="100.00")) + assert result.matched is False + + +# --------------------------------------------------------------------------- +# Period budget tests (atomic via check_and_record) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_period_budget_violation() -> None: + store = InMemorySpendStore() + ev = _make_evaluator(max_per_period="500", store=store) + store.record_spend(Decimal("480"), "USDC") + + result = await ev.evaluate(_tx(amount="25.00")) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "period_budget" + assert result.metadata["current_period_spend"] == pytest.approx(480.0) + assert result.metadata["projected_period_spend"] == pytest.approx(505.0) + + +@pytest.mark.asyncio +async def test_period_budget_exact_boundary_allowed() -> None: + store = InMemorySpendStore() + ev = _make_evaluator(max_per_period="500", store=store) + store.record_spend(Decimal("490"), "USDC") + + result = await ev.evaluate(_tx(amount="10.00")) + assert result.matched is False + since = time.time() - 5 + assert store.get_spend("USDC", since) == Decimal("500") + + +@pytest.mark.asyncio +async def test_successful_transaction_is_recorded() -> None: + store = InMemorySpendStore() + ev = _make_evaluator(max_per_transaction="100", max_per_period="1000", store=store) + + assert store.record_count() == 0 + result = await ev.evaluate(_tx(amount="50.00")) + assert result.matched is False + assert store.record_count() == 1 + since = time.time() - 5 + assert store.get_spend("USDC", since) == Decimal("50") + + +@pytest.mark.asyncio +async def test_multiple_sequential_transactions_accumulate() -> None: + store = InMemorySpendStore() + ev = _make_evaluator(max_per_transaction="100", max_per_period="250", store=store) + + r1 = await ev.evaluate(_tx(amount="80.00")) + assert r1.matched is False + r2 = await ev.evaluate(_tx(amount="80.00")) + assert r2.matched is False + r3 = await ev.evaluate(_tx(amount="80.00")) + assert r3.matched is False # 240 <= 250 + r4 = await ev.evaluate(_tx(amount="80.00")) + assert r4.matched is True + assert r4.metadata and r4.metadata["violation"] == "period_budget" + + +@pytest.mark.asyncio +async def test_currency_case_insensitive_in_data() -> None: + ev = _make_evaluator(max_per_transaction="100", currency="USDC") + result = await ev.evaluate(_tx(amount="10.00", currency="usdc")) + assert result.matched is False + + +# --------------------------------------------------------------------------- +# BudgetLimit.scope_by — independent dimension budget isolation +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_scope_by_channel_isolates_budgets() -> None: + """scope_by=(channel,) gives each channel its own independent counter. + + lan17s specific test: 90 USDC in channel A, then 20 USDC in channel B + with a 100 USDC per-channel budget. Channel B should be ALLOWED because + its scoped spend is 0, not 90. + """ + store = InMemorySpendStore() + limit = BudgetLimit( + amount=Decimal("100"), + currency="USDC", + scope_by=("channel",), + window=BudgetWindow(kind="rolling", seconds=86400), + ) + ev = SpendLimitEvaluator(SpendLimitConfig(limits=[limit]), store=store) + + r1 = await ev.evaluate(_tx(amount="90.00", channel="channel-A")) + assert r1.matched is False, f"Channel A 90 USDC should be allowed: {r1.message}" + + since = time.time() - 5 + assert store.get_spend("USDC", since, scope={"channel": "channel-A"}) == Decimal("90") + + r2 = await ev.evaluate(_tx(amount="20.00", channel="channel-B")) + assert r2.matched is False, ( + f"Channel B 20 USDC should be allowed (channel B has 0 spend): {r2.message}" + ) + assert store.get_spend("USDC", since, scope={"channel": "channel-B"}) == Decimal("20") + assert store.get_spend("USDC", since, scope={"channel": "channel-A"}) == Decimal("90") + + +@pytest.mark.asyncio +async def test_scope_by_channel_accumulates_within_same_channel() -> None: + """Spend within the same channel accumulates correctly.""" + store = InMemorySpendStore() + limit = BudgetLimit( + amount=Decimal("100"), + currency="USDC", + scope_by=("channel",), + window=BudgetWindow(kind="rolling", seconds=86400), + ) + ev = SpendLimitEvaluator(SpendLimitConfig(limits=[limit]), store=store) + + r1 = await ev.evaluate(_tx(amount="60.00", channel="channel-A")) + assert r1.matched is False + + r2 = await ev.evaluate(_tx(amount="50.00", channel="channel-A")) + assert r2.matched is True + assert r2.metadata and r2.metadata["violation"] == "period_budget" + + +@pytest.mark.asyncio +async def test_scope_by_agent_id_isolation() -> None: + """scope_by=(agent_id,) isolates budgets per agent.""" + store = InMemorySpendStore() + limit = BudgetLimit( + amount=Decimal("100"), + currency="USDC", + scope_by=("agent_id",), + window=BudgetWindow(kind="rolling", seconds=86400), + ) + ev = SpendLimitEvaluator(SpendLimitConfig(limits=[limit]), store=store) + + r1 = await ev.evaluate(_tx(amount="90.00", agent_id="agent-1")) + assert r1.matched is False + + r2 = await ev.evaluate(_tx(amount="20.00", agent_id="agent-2")) + assert r2.matched is False + + +@pytest.mark.asyncio +async def test_global_budget_without_scope() -> None: + """scope_by=() means all spend in that currency counts together.""" + store = InMemorySpendStore() + ev = _make_evaluator(max_per_period="100", store=store) + + r1 = await ev.evaluate(_tx(amount="90.00")) + assert r1.matched is False + + r2 = await ev.evaluate(_tx(amount="20.00")) + assert r2.matched is True + + +@pytest.mark.asyncio +async def test_multiple_limits_in_one_config() -> None: + """Global per-tx cap and per-channel period budget co-exist.""" + store = InMemorySpendStore() + cfg = SpendLimitConfig(limits=[ + BudgetLimit(amount=Decimal("200"), currency="USDC"), + BudgetLimit( + amount=Decimal("100"), + currency="USDC", + scope_by=("channel",), + window=BudgetWindow(kind="rolling", seconds=86400), + ), + ]) + ev = SpendLimitEvaluator(cfg, store=store) + + r1 = await ev.evaluate(_tx(amount="90.00", channel="channel-A")) + assert r1.matched is False + + r2 = await ev.evaluate(_tx(amount="90.00", channel="channel-B")) + assert r2.matched is False + + r3 = await ev.evaluate(_tx(amount="20.00", channel="channel-A")) + assert r3.matched is True + assert r3.metadata and r3.metadata["violation"] == "period_budget" + + r4 = await ev.evaluate(_tx(amount="210.00", channel="channel-C")) + assert r4.matched is True + assert r4.metadata and r4.metadata["violation"] == "per_transaction_cap" + + +# --------------------------------------------------------------------------- +# Malformed input — matched=False, error=None (never error=...) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_malformed_input_is_not_evaluator_error() -> None: + """Malformed input must return matched=False, error=None. + + The error field is reserved for evaluator crashes/timeouts/missing deps. + """ + ev = _make_evaluator(max_per_transaction="100") + + r1 = await ev.evaluate({"currency": "USDC", "recipient": "0xABC"}) + assert r1.matched is False + assert r1.error is None + + r2 = await ev.evaluate({"amount": "10.00", "recipient": "0xABC"}) + assert r2.matched is False + assert r2.error is None + + r3 = await ev.evaluate({"amount": "-5.00", "currency": "USDC", "recipient": "0xABC"}) + assert r3.matched is False + assert r3.error is None + + r4 = await ev.evaluate("not a dict") + assert r4.matched is False + assert r4.error is None + + r5 = await ev.evaluate(None) + assert r5.matched is False + assert r5.error is None + + +# --------------------------------------------------------------------------- +# Step normalization (selector.path: "*" vs "input") +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_step_object_input_extraction() -> None: + """selector.path=* passes a full Step dict; evaluator extracts from input.""" + ev = _make_evaluator(max_per_transaction="100") + step_data = { + "type": "tool", + "name": "payment", + "input": {"amount": "50.00", "currency": "USDC", "recipient": "0xABC"}, + "context": None, + } + result = await ev.evaluate(step_data) + assert result.matched is False + + +@pytest.mark.asyncio +async def test_step_context_merged_into_transaction() -> None: + """Context fields from step.context are available for scoped budgets.""" + store = InMemorySpendStore() + limit = BudgetLimit( + amount=Decimal("100"), + currency="USDC", + scope_by=("channel",), + window=BudgetWindow(kind="rolling", seconds=86400), + ) + ev = SpendLimitEvaluator(SpendLimitConfig(limits=[limit]), store=store) + + step1 = { + "type": "tool", + "name": "payment", + "input": {"amount": "90.00", "currency": "USDC", "recipient": "0xABC"}, + "context": {"channel": "channel-A"}, + } + r1 = await ev.evaluate(step1) + assert r1.matched is False + + step2 = { + "type": "tool", + "name": "payment", + "input": {"amount": "20.00", "currency": "USDC", "recipient": "0xABC"}, + "context": {"channel": "channel-B"}, + } + r2 = await ev.evaluate(step2) + assert r2.matched is False + + +@pytest.mark.asyncio +async def test_step_context_overrides_not_clobbered_by_input() -> None: + """If input already has channel, step.context must NOT overwrite it. + + Asserts against actual store state to prove spend was recorded under + channel=from-input, not from-context. + """ + store = InMemorySpendStore() + ev = _make_evaluator(max_per_transaction="100", max_per_period="1000", store=store) + + step_data = { + "type": "tool", + "name": "payment", + "input": { + "amount": "10.00", + "currency": "USDC", + "recipient": "0xABC", + "channel": "from-input", + }, + "context": {"channel": "from-context"}, + } + result = await ev.evaluate(step_data) + assert result.matched is False + + since = time.time() - 5 + assert store.get_spend("USDC", since, scope={"channel": "from-input"}) == Decimal("10") + assert store.get_spend("USDC", since, scope={"channel": "from-context"}) == Decimal("0") + + +# --------------------------------------------------------------------------- +# lan17 specific channel-scope-independence test +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_lan17_channel_scope_independence() -> None: + """lan17s test: 90 USDC in channel A, then 20 USDC in channel B. + + With a 100 USDC per-channel budget (scope_by=(channel,)), the second + transaction must be ALLOWED — channel B has 0 spend. + """ + store = InMemorySpendStore() + limit = BudgetLimit( + amount=Decimal("100"), + currency="USDC", + scope_by=("channel",), + window=BudgetWindow(kind="rolling", seconds=86400), + ) + ev = SpendLimitEvaluator(SpendLimitConfig(limits=[limit]), store=store) + + r1 = await ev.evaluate(_tx(amount="90.00", channel="channel-A")) + assert r1.matched is False, f"Channel A 90 USDC should be allowed: {r1.message}" + + since = time.time() - 5 + assert store.get_spend("USDC", since, scope={"channel": "channel-A"}) == Decimal("90") + + r2 = await ev.evaluate(_tx(amount="20.00", channel="channel-B")) + assert r2.matched is False, ( + f"Channel B 20 USDC should be allowed (channel B has 0 spend): {r2.message}" + ) + + assert store.get_spend("USDC", since, scope={"channel": "channel-B"}) == Decimal("20") + assert store.get_spend("USDC", since, scope={"channel": "channel-A"}) == Decimal("90") + + +# --------------------------------------------------------------------------- +# Fixed window (calendar-aligned) budget +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_fixed_window_day_budget() -> None: + """Fixed-day window budget works (uses UTC approximation).""" + store = InMemorySpendStore() + limit = BudgetLimit( + amount=Decimal("100"), + currency="USDC", + window=BudgetWindow(kind="fixed", unit="day"), + ) + ev = SpendLimitEvaluator(SpendLimitConfig(limits=[limit]), store=store) + + r1 = await ev.evaluate(_tx(amount="90.00")) + assert r1.matched is False + + r2 = await ev.evaluate(_tx(amount="20.00")) + assert r2.matched is True + assert r2.metadata and r2.metadata["violation"] == "period_budget" diff --git a/evaluators/contrib/financial-governance/tests/test_transaction_policy.py b/evaluators/contrib/financial-governance/tests/test_transaction_policy.py new file mode 100644 index 00000000..35b5dcfb --- /dev/null +++ b/evaluators/contrib/financial-governance/tests/test_transaction_policy.py @@ -0,0 +1,362 @@ +"""Tests for the transaction_policy evaluator.""" + +from __future__ import annotations + +from decimal import Decimal +from typing import Any + +import pytest +from pydantic import ValidationError + +from agent_control_evaluator_financial_governance.transaction_policy import ( + TransactionPolicyConfig, + TransactionPolicyEvaluator, +) + + +# --------------------------------------------------------------------------- +# TransactionPolicyConfig validation tests +# --------------------------------------------------------------------------- + + +def test_config_currencies_normalized() -> None: + cfg = TransactionPolicyConfig(allowed_currencies=["usdc", "Usdt"]) + assert cfg.allowed_currencies == ["USDC", "USDT"] + + +def test_config_defaults_are_permissive() -> None: + cfg = TransactionPolicyConfig() + assert cfg.allowed_recipients == [] + assert cfg.blocked_recipients == [] + assert cfg.min_amount == Decimal("0") + assert cfg.max_amount == Decimal("0") + assert cfg.allowed_currencies == [] + + +def test_config_max_amount_lt_min_raises() -> None: + with pytest.raises(ValidationError, match="max_amount"): + TransactionPolicyConfig(min_amount=Decimal("100"), max_amount=Decimal("10")) + + +def test_config_max_equals_min_is_valid() -> None: + cfg = TransactionPolicyConfig(min_amount=Decimal("50"), max_amount=Decimal("50")) + assert cfg.min_amount == Decimal("50") + assert cfg.max_amount == Decimal("50") + + +# --------------------------------------------------------------------------- +# Helper factory +# --------------------------------------------------------------------------- + + +def _make_evaluator(**kwargs: Any) -> TransactionPolicyEvaluator: + cfg = TransactionPolicyConfig(**kwargs) + return TransactionPolicyEvaluator(cfg) + + +def _tx( + amount: float = 100.0, + currency: str = "USDC", + recipient: str = "0xABC", + **extra: Any, +) -> dict[str, Any]: + return {"amount": amount, "currency": currency, "recipient": recipient, **extra} + + +# --------------------------------------------------------------------------- +# Edge cases: None / non-dict inputs +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_none_data_passes() -> None: + ev = _make_evaluator(allowed_currencies=["USDC"]) + result = await ev.evaluate(None) + assert result.matched is False + assert result.error is None + + +@pytest.mark.asyncio +async def test_non_dict_data_passes() -> None: + ev = _make_evaluator(allowed_currencies=["USDC"]) + result = await ev.evaluate(["not", "a", "dict"]) + assert result.matched is False + + +# --------------------------------------------------------------------------- +# Missing required fields +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_missing_currency_not_matched() -> None: + """Missing currency is a non-match, NOT an evaluator error.""" + ev = _make_evaluator() + result = await ev.evaluate({"amount": 10.0, "recipient": "0xABC"}) + assert result.matched is False + assert result.error is None + assert "currency" in (result.message or "").lower() + + +@pytest.mark.asyncio +async def test_missing_recipient_not_matched() -> None: + """Missing recipient is a non-match, NOT an evaluator error.""" + ev = _make_evaluator() + result = await ev.evaluate({"amount": 10.0, "currency": "USDC"}) + assert result.matched is False + assert result.error is None + assert "recipient" in (result.message or "").lower() + + +@pytest.mark.asyncio +async def test_missing_amount_not_matched() -> None: + """Missing amount is a non-match, NOT an evaluator error.""" + ev = _make_evaluator() + result = await ev.evaluate({"currency": "USDC", "recipient": "0xABC"}) + assert result.matched is False + assert result.error is None + assert "amount" in (result.message or "").lower() + + +@pytest.mark.asyncio +async def test_non_numeric_amount_not_matched() -> None: + """Non-numeric amount is a non-match, NOT an evaluator error.""" + ev = _make_evaluator() + result = await ev.evaluate({"amount": "lots", "currency": "USDC", "recipient": "0xABC"}) + assert result.matched is False + assert result.error is None + + +# --------------------------------------------------------------------------- +# No restrictions configured → everything passes +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_empty_config_allows_everything() -> None: + ev = _make_evaluator() + result = await ev.evaluate(_tx(amount=999_999.0, currency="XYZ", recipient="0xANY")) + assert result.matched is False + + +# --------------------------------------------------------------------------- +# Currency allowlist +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_currency_not_in_allowlist_is_blocked() -> None: + ev = _make_evaluator(allowed_currencies=["USDC", "USDT"]) + result = await ev.evaluate(_tx(currency="DAI")) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "currency_not_allowed" + + +@pytest.mark.asyncio +async def test_currency_in_allowlist_passes() -> None: + ev = _make_evaluator(allowed_currencies=["USDC", "USDT"]) + result = await ev.evaluate(_tx(currency="USDT")) + assert result.matched is False + + +@pytest.mark.asyncio +async def test_currency_allowlist_case_insensitive_in_data() -> None: + """Currency from incoming data is uppercased before comparison.""" + ev = _make_evaluator(allowed_currencies=["USDC"]) + result = await ev.evaluate(_tx(currency="usdc")) + assert result.matched is False + + +# --------------------------------------------------------------------------- +# Recipient blocklist +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_blocked_recipient_is_denied() -> None: + ev = _make_evaluator(blocked_recipients=["0xDEAD", "0xBAD"]) + result = await ev.evaluate(_tx(recipient="0xDEAD")) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "recipient_blocked" + + +@pytest.mark.asyncio +async def test_non_blocked_recipient_passes() -> None: + ev = _make_evaluator(blocked_recipients=["0xDEAD"]) + result = await ev.evaluate(_tx(recipient="0xGOOD")) + assert result.matched is False + + +# --------------------------------------------------------------------------- +# Recipient allowlist +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_recipient_not_in_allowlist_is_blocked() -> None: + ev = _make_evaluator(allowed_recipients=["0xALICE", "0xBOB"]) + result = await ev.evaluate(_tx(recipient="0xEVE")) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "recipient_not_allowed" + + +@pytest.mark.asyncio +async def test_recipient_in_allowlist_passes() -> None: + ev = _make_evaluator(allowed_recipients=["0xALICE", "0xBOB"]) + result = await ev.evaluate(_tx(recipient="0xBOB")) + assert result.matched is False + + +# --------------------------------------------------------------------------- +# Blocklist takes priority over allowlist +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_blocked_beats_allowlist() -> None: + """A recipient on the blocklist should be denied even if also allowlisted.""" + ev = _make_evaluator( + allowed_recipients=["0xALICE"], + blocked_recipients=["0xALICE"], # deliberately in both + ) + result = await ev.evaluate(_tx(recipient="0xALICE")) + assert result.matched is True + # Violation should be blocklist (checked first) + assert result.metadata and result.metadata["violation"] == "recipient_blocked" + + +# --------------------------------------------------------------------------- +# Amount bounds +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_amount_below_minimum_is_blocked() -> None: + ev = _make_evaluator(min_amount=Decimal("10")) + result = await ev.evaluate(_tx(amount=9.99)) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "amount_below_minimum" + + +@pytest.mark.asyncio +async def test_amount_at_minimum_passes() -> None: + ev = _make_evaluator(min_amount=Decimal("10")) + result = await ev.evaluate(_tx(amount=10.0)) + assert result.matched is False + + +@pytest.mark.asyncio +async def test_amount_above_maximum_is_blocked() -> None: + ev = _make_evaluator(max_amount=Decimal("1000")) + result = await ev.evaluate(_tx(amount=1000.01)) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "amount_exceeds_maximum" + + +@pytest.mark.asyncio +async def test_amount_at_maximum_passes() -> None: + ev = _make_evaluator(max_amount=Decimal("1000")) + result = await ev.evaluate(_tx(amount=1000.0)) + assert result.matched is False + + +@pytest.mark.asyncio +async def test_amount_bounds_disabled_at_zero() -> None: + ev = _make_evaluator(min_amount=Decimal("0"), max_amount=Decimal("0")) + result = await ev.evaluate(_tx(amount=0.001)) + assert result.matched is False + result2 = await ev.evaluate(_tx(amount=1_000_000_000.0)) + assert result2.matched is False + + +# --------------------------------------------------------------------------- +# Full policy (all fields configured) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_full_policy_passes_compliant_transaction() -> None: + ev = _make_evaluator( + allowed_currencies=["USDC", "USDT"], + blocked_recipients=["0xDEAD"], + allowed_recipients=["0xALICE", "0xBOB"], + min_amount=Decimal("1"), + max_amount=Decimal("5000"), + ) + result = await ev.evaluate(_tx(amount=250.0, currency="USDC", recipient="0xALICE")) + assert result.matched is False + + +@pytest.mark.asyncio +async def test_context_fields_appear_in_metadata() -> None: + """Optional context fields (channel, agent_id, session_id) should surface in result metadata.""" + ev = _make_evaluator() + result = await ev.evaluate(_tx(channel="discord", agent_id="agent-42", session_id="sess-1")) + assert result.metadata + assert result.metadata.get("channel") == "discord" + assert result.metadata.get("agent_id") == "agent-42" + assert result.metadata.get("session_id") == "sess-1" + + +# --------------------------------------------------------------------------- +# Check ordering: currency first, then blocklist, then allowlist, then bounds +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_currency_check_before_recipient_check() -> None: + """Currency violation should be reported even if recipient is also blocked.""" + ev = _make_evaluator( + allowed_currencies=["USDC"], + blocked_recipients=["0xDEAD"], + ) + result = await ev.evaluate(_tx(currency="DAI", recipient="0xDEAD")) + # Currency checked first + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "currency_not_allowed" + + +@pytest.mark.asyncio +async def test_blocklist_before_allowlist() -> None: + """Blocklist violation should be reported even if recipient not in allowlist.""" + ev = _make_evaluator( + allowed_recipients=["0xGOOD"], + blocked_recipients=["0xBAD"], + ) + result = await ev.evaluate(_tx(recipient="0xBAD")) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "recipient_blocked" + + +# --------------------------------------------------------------------------- +# Step normalization tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_step_object_input_extraction() -> None: + """When data is a full Step dict, extract transaction from 'input'.""" + ev = _make_evaluator(allowed_currencies=["USDC"]) + step_data = { + "type": "tool", + "name": "payment", + "input": {"amount": 100.0, "currency": "USDC", "recipient": "0xABC"}, + "context": {"channel": "slack"}, + } + result = await ev.evaluate(step_data) + assert result.matched is False + + +@pytest.mark.asyncio +async def test_step_blocked_recipient_via_step() -> None: + """Blocklist check should work when data comes as a Step dict.""" + ev = _make_evaluator(blocked_recipients=["0xDEAD"]) + step_data = { + "type": "tool", + "name": "payment", + "input": {"amount": 10.0, "currency": "USDC", "recipient": "0xDEAD"}, + "context": None, + } + result = await ev.evaluate(step_data) + assert result.matched is True + assert result.metadata and result.metadata["violation"] == "recipient_blocked"