From a5b7f24aa3510d164860c841b7d0aeec18f49533 Mon Sep 17 00:00:00 2001 From: Ethan Steininger Date: Mon, 30 Mar 2026 15:11:23 -0400 Subject: [PATCH 1/2] feat: add Mixpeek contextual enrichment for IAB taxonomy classification Add MixpeekClient and contextual enrichment tools that enable buyer agents to classify content into IAB v3.0 taxonomy categories and search indexed inventory via Mixpeek retriever pipelines. New files: - MixpeekClient: async HTTP client for Mixpeek content-intelligence API - ClassifyContentTool: CrewAI tool for IAB taxonomy classification - ContextualSearchTool: CrewAI tool for multimodal inventory search - MCP tools: classify_content and contextual_search via @mcp.tool() - Unit tests: 18 tests covering client and tool behavior Configuration: MIXPEEK_API_KEY, MIXPEEK_BASE_URL, MIXPEEK_NAMESPACE env vars (all optional, tools gracefully degrade when unconfigured). --- .env.example | 5 + src/ad_buyer/clients/__init__.py | 4 + src/ad_buyer/clients/mixpeek_client.py | 194 ++++++++++++++++++ src/ad_buyer/config/settings.py | 5 + src/ad_buyer/interfaces/mcp_server.py | 92 +++++++++ src/ad_buyer/tools/research/__init__.py | 8 +- .../tools/research/contextual_enrichment.py | 185 +++++++++++++++++ tests/unit/test_contextual_enrichment.py | 120 +++++++++++ tests/unit/test_mixpeek_client.py | 133 ++++++++++++ 9 files changed, 745 insertions(+), 1 deletion(-) create mode 100644 src/ad_buyer/clients/mixpeek_client.py create mode 100644 src/ad_buyer/tools/research/contextual_enrichment.py create mode 100644 tests/unit/test_contextual_enrichment.py create mode 100644 tests/unit/test_mixpeek_client.py diff --git a/.env.example b/.env.example index 155c3d7..315e8ae 100644 --- a/.env.example +++ b/.env.example @@ -21,6 +21,11 @@ DATABASE_URL=sqlite:///./ad_buyer.db # Optional Redis (for caching/pub-sub) REDIS_URL= +# Mixpeek Contextual Enrichment (optional: enables classify_content and contextual_search tools) +MIXPEEK_API_KEY= +MIXPEEK_BASE_URL=https://api.mixpeek.com +MIXPEEK_NAMESPACE= + # Environment ENVIRONMENT=development LOG_LEVEL=INFO diff --git a/src/ad_buyer/clients/__init__.py b/src/ad_buyer/clients/__init__.py index 1c79355..8f697fe 100644 --- a/src/ad_buyer/clients/__init__.py +++ b/src/ad_buyer/clients/__init__.py @@ -6,6 +6,7 @@ from .a2a_client import A2AClient, A2AError, A2AResponse from .deals_client import DealsClient, DealsClientError from .mcp_client import IABMCPClient, MCPClientError, MCPToolResult +from .mixpeek_client import MixpeekClient, MixpeekError from .opendirect_client import OpenDirectClient from .ucp_client import UCPClient, UCPExchangeResult from .unified_client import Protocol, UnifiedClient, UnifiedResult @@ -31,4 +32,7 @@ # IAB Deals API v1.0 client (quote-then-book flow) "DealsClient", "DealsClientError", + # Mixpeek contextual enrichment + "MixpeekClient", + "MixpeekError", ] diff --git a/src/ad_buyer/clients/mixpeek_client.py b/src/ad_buyer/clients/mixpeek_client.py new file mode 100644 index 0000000..e5deb05 --- /dev/null +++ b/src/ad_buyer/clients/mixpeek_client.py @@ -0,0 +1,194 @@ +# Mixpeek contextual enrichment client for the Ad Buyer Agent. +# +# Provides content classification (IAB taxonomy), brand-safety scoring, +# and visual creative analysis via the Mixpeek REST API. + +from __future__ import annotations + +import logging +from typing import Any + +import httpx + +logger = logging.getLogger(__name__) + +# Default timeout for Mixpeek API calls (seconds). +_DEFAULT_TIMEOUT = 30.0 + + +class MixpeekError(Exception): + """Raised when a Mixpeek API call fails.""" + + def __init__(self, message: str, status_code: int | None = None): + super().__init__(message) + self.status_code = status_code + + +class MixpeekClient: + """Async HTTP client for the Mixpeek content-intelligence API. + + The client wraps three capabilities that are useful during the + buyer-agent research & execution phases: + + 1. **Content classification** – map a page URL or text to IAB v3.0 + taxonomy categories via Mixpeek's ``execute_taxonomy`` endpoint. + 2. **Brand-safety check** – lightweight sentiment / safety score + for a given URL or text snippet. + 3. **Creative analysis** – extract objects, text (OCR), and brand + logos from an ad-creative image or video URL. + + All methods are async and use ``httpx.AsyncClient`` under the hood. + """ + + def __init__( + self, + api_key: str, + base_url: str = "https://api.mixpeek.com", + namespace: str | None = None, + timeout: float = _DEFAULT_TIMEOUT, + ): + self.api_key = api_key + self.base_url = base_url.rstrip("/") + self.namespace = namespace + self._client = httpx.AsyncClient( + base_url=self.base_url, + timeout=timeout, + follow_redirects=True, + ) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _headers(self, namespace: str | None = None) -> dict[str, str]: + """Build request headers with auth and optional namespace.""" + h: dict[str, str] = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + ns = namespace or self.namespace + if ns: + h["X-Namespace"] = ns + return h + + async def _request( + self, + method: str, + path: str, + *, + namespace: str | None = None, + json: dict | None = None, + params: dict | None = None, + ) -> dict[str, Any]: + """Send a request and return the parsed JSON response.""" + try: + resp = await self._client.request( + method, + path, + headers=self._headers(namespace), + json=json, + params=params, + ) + except httpx.HTTPError as exc: + raise MixpeekError(f"HTTP error: {exc}") from exc + + if resp.status_code >= 400: + raise MixpeekError( + f"Mixpeek API error {resp.status_code}: {resp.text}", + status_code=resp.status_code, + ) + return resp.json() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def list_taxonomies(self, namespace: str | None = None) -> list[dict]: + """List available taxonomies in the namespace.""" + data = await self._request( + "POST", "/v1/taxonomies/list", namespace=namespace, json={} + ) + return data.get("results", []) + + async def classify_content( + self, + taxonomy_id: str, + text: str | None = None, + url: str | None = None, + *, + namespace: str | None = None, + ) -> dict[str, Any]: + """Classify content against an IAB taxonomy. + + Either *text* (raw page text) or *url* (page URL for Mixpeek to + scrape) should be supplied. Returns taxonomy node assignments + with confidence scores. + """ + body: dict[str, Any] = {} + if text: + body["input"] = {"text": text} + elif url: + body["input"] = {"url": url} + else: + raise ValueError("Either text or url must be provided") + + return await self._request( + "POST", + f"/v1/taxonomies/execute/{taxonomy_id}", + namespace=namespace, + json=body, + ) + + async def search_content( + self, + retriever_id: str, + query: str, + *, + namespace: str | None = None, + limit: int = 10, + filters: dict | None = None, + ) -> dict[str, Any]: + """Execute a retriever pipeline (contextual search). + + The retriever can combine feature search, brand-safety filtering, + taxonomy enrichment, and reranking in a single call. + """ + inputs: dict[str, Any] = {"query": query} + if filters: + inputs.update(filters) + + body: dict[str, Any] = { + "inputs": inputs, + "page_size": limit, + } + + return await self._request( + "POST", + f"/v1/retrievers/{retriever_id}/execute", + namespace=namespace, + json=body, + ) + + async def get_tools(self) -> list[dict]: + """Fetch the public MCP tools list (no auth required). + + Hits the ``/tools`` REST endpoint on the MCP server, which + returns all 48 tool definitions without authentication. + """ + resp = await self._client.get( + "https://mcp.mixpeek.com/tools", + timeout=10, + ) + return resp.json().get("tools", []) + + async def health(self) -> dict[str, Any]: + """Check MCP server health (no auth required).""" + resp = await self._client.get( + "https://mcp.mixpeek.com/health", + timeout=5, + ) + return resp.json() + + async def close(self) -> None: + """Shut down the underlying HTTP client.""" + await self._client.aclose() diff --git a/src/ad_buyer/config/settings.py b/src/ad_buyer/config/settings.py index 6c8ac08..f6f390f 100644 --- a/src/ad_buyer/config/settings.py +++ b/src/ad_buyer/config/settings.py @@ -71,6 +71,11 @@ def get_cors_origins(self) -> list[str]: return [] return [o.strip() for o in self.cors_allowed_origins.split(",") if o.strip()] + # Mixpeek contextual enrichment + mixpeek_api_key: str = "" + mixpeek_base_url: str = "https://api.mixpeek.com" + mixpeek_namespace: str = "" + # Environment environment: str = "development" log_level: str = "INFO" diff --git a/src/ad_buyer/interfaces/mcp_server.py b/src/ad_buyer/interfaces/mcp_server.py index 8eadbd7..fd19ae3 100644 --- a/src/ad_buyer/interfaces/mcp_server.py +++ b/src/ad_buyer/interfaces/mcp_server.py @@ -69,6 +69,7 @@ ManualDealEntry, create_manual_deal, ) +from ..clients.mixpeek_client import MixpeekClient, MixpeekError logger = logging.getLogger(__name__) @@ -2869,6 +2870,97 @@ async def help_prompt() -> list[Message]: )] +# --------------------------------------------------------------------------- +# Contextual Enrichment (Mixpeek) +# --------------------------------------------------------------------------- + + +def _get_mixpeek_client() -> MixpeekClient: + """Create a MixpeekClient from current settings.""" + s = Settings() + return MixpeekClient( + api_key=s.mixpeek_api_key, + base_url=s.mixpeek_base_url, + namespace=s.mixpeek_namespace, + ) + + +@mcp.tool( + name="classify_content", + description=( + "Classify page or ad-creative content into IAB v3.0 taxonomy " + "categories using Mixpeek. Supply either raw text or a URL. " + "Returns category codes with confidence scores for contextual " + "targeting and brand-safety evaluation." + ), +) +async def classify_content( + text: str | None = None, + url: str | None = None, + taxonomy_id: str | None = None, +) -> str: + """Classify content into IAB taxonomy categories via Mixpeek.""" + if not text and not url: + return json.dumps({"error": "Either text or url must be provided"}) + + client = _get_mixpeek_client() + try: + tid = taxonomy_id + if not tid: + taxonomies = await client.list_taxonomies() + iab = [ + t for t in taxonomies + if "iab" in t.get("taxonomy_name", "").lower() + ] + if iab: + tid = iab[0]["taxonomy_id"] + elif taxonomies: + tid = taxonomies[0]["taxonomy_id"] + else: + return json.dumps({ + "error": "No taxonomies found in this namespace. " + "Create one first via the Mixpeek dashboard." + }) + + result = await client.classify_content( + taxonomy_id=tid, text=text, url=url, + ) + return json.dumps(result, indent=2) + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +@mcp.tool( + name="contextual_search", + description=( + "Search indexed ad inventory using a Mixpeek retriever pipeline. " + "Pipelines can combine multimodal search, brand-safety filtering, " + "IAB taxonomy enrichment, and reranking. Returns matching inventory " + "with relevance scores and enriched metadata." + ), +) +async def contextual_search( + query: str, + retriever_id: str, + limit: int = 10, +) -> str: + """Search inventory via a Mixpeek retriever pipeline.""" + client = _get_mixpeek_client() + try: + result = await client.search_content( + retriever_id=retriever_id, + query=query, + limit=limit, + ) + return json.dumps(result, indent=2) + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + # --------------------------------------------------------------------------- # Mounting # --------------------------------------------------------------------------- diff --git a/src/ad_buyer/tools/research/__init__.py b/src/ad_buyer/tools/research/__init__.py index 5e7c982..1f1f3e7 100644 --- a/src/ad_buyer/tools/research/__init__.py +++ b/src/ad_buyer/tools/research/__init__.py @@ -4,6 +4,12 @@ """Research tools for inventory discovery.""" from .avails_check import AvailsCheckTool +from .contextual_enrichment import ClassifyContentTool, ContextualSearchTool from .product_search import ProductSearchTool -__all__ = ["ProductSearchTool", "AvailsCheckTool"] +__all__ = [ + "ProductSearchTool", + "AvailsCheckTool", + "ClassifyContentTool", + "ContextualSearchTool", +] diff --git a/src/ad_buyer/tools/research/contextual_enrichment.py b/src/ad_buyer/tools/research/contextual_enrichment.py new file mode 100644 index 0000000..65f09b0 --- /dev/null +++ b/src/ad_buyer/tools/research/contextual_enrichment.py @@ -0,0 +1,185 @@ +# Contextual enrichment tools powered by Mixpeek. +# +# Provides IAB taxonomy classification and brand-safety scoring +# that buyer agents can use during inventory research and deal +# evaluation. + +from __future__ import annotations + +import json +from typing import Any + +from crewai.tools import BaseTool +from pydantic import BaseModel, Field + +from ...async_utils import run_async +from ...clients.mixpeek_client import MixpeekClient, MixpeekError +from ...config.settings import Settings + + +def _get_mixpeek_client() -> MixpeekClient: + """Create a MixpeekClient from current settings.""" + settings = Settings() + return MixpeekClient( + api_key=settings.mixpeek_api_key, + base_url=settings.mixpeek_base_url, + namespace=settings.mixpeek_namespace, + ) + + +# ----------------------------------------------------------------------- +# 1. Content Classification +# ----------------------------------------------------------------------- + +class ClassifyContentInput(BaseModel): + """Input for contextual content classification.""" + + text: str | None = Field( + default=None, + description="Raw page/creative text to classify", + ) + url: str | None = Field( + default=None, + description="Page URL — Mixpeek will scrape and classify it", + ) + taxonomy_id: str | None = Field( + default=None, + description="Mixpeek taxonomy ID. Uses default IAB taxonomy if omitted.", + ) + + +class ClassifyContentTool(BaseTool): + """Classify page or creative content into IAB v3.0 categories. + + Uses Mixpeek's taxonomy engine to map text or a URL to + standardised IAB content categories with confidence scores. + Buyer agents use these categories for contextual targeting + decisions and brand-safety evaluation. + """ + + name: str = "classify_content" + description: str = ( + "Classify page or ad-creative content into IAB v3.0 taxonomy " + "categories using Mixpeek. Supply either raw text or a URL. " + "Returns category codes with confidence scores for contextual " + "targeting and brand-safety evaluation." + ) + args_schema: type[BaseModel] = ClassifyContentInput + + def _run( + self, + text: str | None = None, + url: str | None = None, + taxonomy_id: str | None = None, + ) -> str: + return run_async(self._arun(text=text, url=url, taxonomy_id=taxonomy_id)) + + async def _arun( + self, + text: str | None = None, + url: str | None = None, + taxonomy_id: str | None = None, + ) -> str: + if not text and not url: + return json.dumps({"error": "Either text or url must be provided"}) + + client = _get_mixpeek_client() + try: + # If no taxonomy_id supplied, try to find an IAB taxonomy + tid = taxonomy_id + if not tid: + taxonomies = await client.list_taxonomies() + iab = [ + t for t in taxonomies + if "iab" in t.get("taxonomy_name", "").lower() + ] + if iab: + tid = iab[0]["taxonomy_id"] + elif taxonomies: + tid = taxonomies[0]["taxonomy_id"] + else: + return json.dumps({ + "error": "No taxonomies found in this namespace. " + "Create one first via the Mixpeek dashboard." + }) + + result = await client.classify_content( + taxonomy_id=tid, + text=text, + url=url, + ) + return json.dumps(result, indent=2) + + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +# ----------------------------------------------------------------------- +# 2. Contextual Search (inventory enrichment) +# ----------------------------------------------------------------------- + +class ContextualSearchInput(BaseModel): + """Input for contextual inventory search.""" + + query: str = Field( + description="Natural-language search query (e.g. 'sports news articles')", + ) + retriever_id: str = Field( + description="Mixpeek retriever pipeline ID to execute", + ) + limit: int = Field( + default=10, + description="Max results to return", + ge=1, + le=100, + ) + + +class ContextualSearchTool(BaseTool): + """Search indexed inventory via a Mixpeek retriever pipeline. + + Retriever pipelines can chain feature search, brand-safety + filtering, taxonomy enrichment, and reranking in a single call. + Use this to find contextually relevant inventory during the + research phase. + """ + + name: str = "contextual_search" + description: str = ( + "Search indexed ad inventory using a Mixpeek retriever pipeline. " + "Pipelines can combine multimodal search, brand-safety filtering, " + "IAB taxonomy enrichment, and reranking. Returns matching inventory " + "with relevance scores and enriched metadata." + ) + args_schema: type[BaseModel] = ContextualSearchInput + + def _run( + self, + query: str = "", + retriever_id: str = "", + limit: int = 10, + ) -> str: + return run_async( + self._arun(query=query, retriever_id=retriever_id, limit=limit) + ) + + async def _arun( + self, + query: str = "", + retriever_id: str = "", + limit: int = 10, + ) -> str: + client = _get_mixpeek_client() + try: + result = await client.search_content( + retriever_id=retriever_id, + query=query, + limit=limit, + ) + return json.dumps(result, indent=2) + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() diff --git a/tests/unit/test_contextual_enrichment.py b/tests/unit/test_contextual_enrichment.py new file mode 100644 index 0000000..60f375d --- /dev/null +++ b/tests/unit/test_contextual_enrichment.py @@ -0,0 +1,120 @@ +# Author: Green Mountain Systems AI Inc. +# Donated to IAB Tech Lab + +"""Unit tests for contextual enrichment CrewAI tools.""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, patch + +import pytest + +from ad_buyer.tools.research.contextual_enrichment import ( + ClassifyContentTool, + ContextualSearchTool, +) + + +@pytest.fixture +def classify_tool(): + return ClassifyContentTool() + + +@pytest.fixture +def search_tool(): + return ContextualSearchTool() + + +class TestClassifyContentTool: + def test_tool_name(self, classify_tool): + assert classify_tool.name == "classify_content" + + @pytest.mark.asyncio + async def test_returns_error_without_input(self, classify_tool): + result = await classify_tool._arun() + data = json.loads(result) + assert "error" in data + + @pytest.mark.asyncio + async def test_classify_with_taxonomy_id(self, classify_tool): + mock_client = AsyncMock() + mock_client.classify_content.return_value = { + "results": [{"label": "Sports", "score": 0.9}] + } + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await classify_tool._arun( + text="NFL scores", taxonomy_id="tax-123" + ) + + data = json.loads(result) + assert data["results"][0]["label"] == "Sports" + mock_client.close.assert_called_once() + + @pytest.mark.asyncio + async def test_classify_auto_discovers_taxonomy(self, classify_tool): + mock_client = AsyncMock() + mock_client.list_taxonomies.return_value = [ + {"taxonomy_id": "t1", "taxonomy_name": "IAB Content Taxonomy v3.0"} + ] + mock_client.classify_content.return_value = {"results": []} + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await classify_tool._arun(text="test content") + + data = json.loads(result) + mock_client.classify_content.assert_called_once_with( + taxonomy_id="t1", text="test content", url=None, + ) + + @pytest.mark.asyncio + async def test_classify_no_taxonomies(self, classify_tool): + mock_client = AsyncMock() + mock_client.list_taxonomies.return_value = [] + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await classify_tool._arun(text="test") + + data = json.loads(result) + assert "No taxonomies found" in data["error"] + + +class TestContextualSearchTool: + def test_tool_name(self, search_tool): + assert search_tool.name == "contextual_search" + + @pytest.mark.asyncio + async def test_search(self, search_tool): + mock_client = AsyncMock() + mock_client.search_content.return_value = { + "results": [{"doc_id": "d1", "score": 0.85}] + } + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await search_tool._arun( + query="sports news", retriever_id="ret-1", limit=5 + ) + + data = json.loads(result) + assert data["results"][0]["score"] == 0.85 + mock_client.search_content.assert_called_once_with( + retriever_id="ret-1", query="sports news", limit=5, + ) + mock_client.close.assert_called_once() diff --git a/tests/unit/test_mixpeek_client.py b/tests/unit/test_mixpeek_client.py new file mode 100644 index 0000000..f136ada --- /dev/null +++ b/tests/unit/test_mixpeek_client.py @@ -0,0 +1,133 @@ +# Author: Green Mountain Systems AI Inc. +# Donated to IAB Tech Lab + +"""Unit tests for the Mixpeek contextual enrichment client.""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx +import pytest + +from ad_buyer.clients.mixpeek_client import MixpeekClient, MixpeekError + + +@pytest.fixture +def client(): + return MixpeekClient( + api_key="test-key", + base_url="https://api.mixpeek.com", + namespace="test-ns", + ) + + +class TestMixpeekClientInit: + def test_defaults(self): + c = MixpeekClient(api_key="k") + assert c.api_key == "k" + assert c.base_url == "https://api.mixpeek.com" + assert c.namespace is None + + def test_custom_base_url_strips_trailing_slash(self): + c = MixpeekClient(api_key="k", base_url="https://example.com/") + assert c.base_url == "https://example.com" + + def test_headers_include_namespace(self, client): + h = client._headers() + assert h["Authorization"] == "Bearer test-key" + assert h["X-Namespace"] == "test-ns" + + def test_headers_without_namespace(self): + c = MixpeekClient(api_key="k") + h = c._headers() + assert "X-Namespace" not in h + + +class TestClassifyContent: + @pytest.mark.asyncio + async def test_classify_with_text(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "results": [{"label": "Sports", "score": 0.95}] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.classify_content( + taxonomy_id="tax-123", text="NFL football scores" + ) + + assert result["results"][0]["label"] == "Sports" + + @pytest.mark.asyncio + async def test_classify_requires_text_or_url(self, client): + with pytest.raises(ValueError, match="Either text or url"): + await client.classify_content(taxonomy_id="tax-123") + + @pytest.mark.asyncio + async def test_classify_api_error_raises(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 401 + mock_resp.text = "Unauthorized" + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + with pytest.raises(MixpeekError, match="401"): + await client.classify_content( + taxonomy_id="tax-123", text="test" + ) + + +class TestSearchContent: + @pytest.mark.asyncio + async def test_search_builds_correct_body(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"results": []} + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp) as mock_req: + await client.search_content( + retriever_id="ret-456", query="sports news", limit=5 + ) + + call_args = mock_req.call_args + body = call_args.kwargs.get("json") or call_args[1].get("json") + assert body["inputs"]["query"] == "sports news" + assert body["page_size"] == 5 + + +class TestListTaxonomies: + @pytest.mark.asyncio + async def test_list_returns_results(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "results": [{"taxonomy_id": "t1", "taxonomy_name": "IAB v3.0"}] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.list_taxonomies() + + assert len(result) == 1 + assert result[0]["taxonomy_name"] == "IAB v3.0" + + +class TestGetTools: + @pytest.mark.asyncio + async def test_get_tools_no_auth(self, client): + mock_resp = MagicMock() + mock_resp.json.return_value = {"tools": [{"name": "tool1"}]} + + with patch.object(client._client, "get", new_callable=AsyncMock, return_value=mock_resp): + tools = await client.get_tools() + + assert len(tools) == 1 + + +class TestClose: + @pytest.mark.asyncio + async def test_close(self, client): + with patch.object(client._client, "aclose", new_callable=AsyncMock) as mock_close: + await client.close() + mock_close.assert_called_once() From 8bbb35b2821133d376db914b04a853547db69116 Mon Sep 17 00:00:00 2001 From: Ethan Steininger Date: Mon, 30 Mar 2026 15:51:56 -0400 Subject: [PATCH 2/2] feat: add brand-safety scoring, production e2e tests, crew integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major improvements to the Mixpeek contextual enrichment integration: - Rearchitect classify_content to use retriever-based IAB classification (semantic search against IAB category corpus) instead of batch taxonomy execute endpoint which returns empty for real-time queries - Add check_brand_safety method and BrandSafetyTool that flags sensitive IAB categories (gambling, adult, etc.) with risk levels - Add auto-discovery of IAB retrievers when no retriever_id is specified - Wire ClassifyContentTool, BrandSafetyTool, ContextualSearchTool into the research crew (channel_crews.py) — tools activate when MIXPEEK_API_KEY is configured - Register check_brand_safety as MCP tool alongside classify_content and contextual_search - Add 16 e2e tests hitting production Mixpeek API (golden_adtech_iab namespace with 700+ IAB category documents) verifying: - Sports content → "American Football" (score > 0.80) - Automotive content → "Automotive" hierarchy (score > 0.80) - Food content → "Food & Drink" / "Cooking" - Tech content → "Artificial Intelligence" (score > 0.85) - Gambling content flagged as brand-unsafe (high risk) - Safe content not flagged - Threshold filtering works correctly - Error handling for invalid keys and retriever IDs All 41 tests pass (25 unit + 16 e2e against production). --- src/ad_buyer/clients/mixpeek_client.py | 152 ++++++-- src/ad_buyer/crews/channel_crews.py | 17 +- src/ad_buyer/interfaces/mcp_server.py | 102 ++++-- src/ad_buyer/tools/research/__init__.py | 3 +- .../tools/research/contextual_enrichment.py | 201 ++++++++--- tests/e2e/__init__.py | 0 tests/e2e/test_mixpeek_production.py | 331 ++++++++++++++++++ tests/unit/test_contextual_enrichment.py | 97 ++++- tests/unit/test_mixpeek_client.py | 131 ++++++- 9 files changed, 911 insertions(+), 123 deletions(-) create mode 100644 tests/e2e/__init__.py create mode 100644 tests/e2e/test_mixpeek_production.py diff --git a/src/ad_buyer/clients/mixpeek_client.py b/src/ad_buyer/clients/mixpeek_client.py index e5deb05..0221513 100644 --- a/src/ad_buyer/clients/mixpeek_client.py +++ b/src/ad_buyer/clients/mixpeek_client.py @@ -1,7 +1,8 @@ # Mixpeek contextual enrichment client for the Ad Buyer Agent. # -# Provides content classification (IAB taxonomy), brand-safety scoring, -# and visual creative analysis via the Mixpeek REST API. +# Provides IAB taxonomy classification via retriever pipelines, +# brand-safety scoring, and contextual inventory search via the +# Mixpeek REST API. from __future__ import annotations @@ -15,6 +16,25 @@ # Default timeout for Mixpeek API calls (seconds). _DEFAULT_TIMEOUT = 30.0 +# Brand-safety sensitive IAB categories that advertisers typically +# want to avoid or require explicit opt-in for. +BRAND_UNSAFE_CATEGORIES = frozenset({ + "Poker and Professional Gambling", + "Casinos & Gambling", + "Casino Games", + "Lotteries and Scratchcards", + "Sensitive Topics", + "Adult Content", + "Illegal Content", + "Debated Sensitive Social Topics", + "Terrorism", + "Crime", + "Drugs", + "Tobacco", + "Arms & Ammunition", + "Death & Grieving", +}) + class MixpeekError(Exception): """Raised when a Mixpeek API call fails.""" @@ -27,15 +47,17 @@ def __init__(self, message: str, status_code: int | None = None): class MixpeekClient: """Async HTTP client for the Mixpeek content-intelligence API. - The client wraps three capabilities that are useful during the - buyer-agent research & execution phases: + The client wraps capabilities useful during buyer-agent research + and execution phases: - 1. **Content classification** – map a page URL or text to IAB v3.0 - taxonomy categories via Mixpeek's ``execute_taxonomy`` endpoint. - 2. **Brand-safety check** – lightweight sentiment / safety score - for a given URL or text snippet. - 3. **Creative analysis** – extract objects, text (OCR), and brand - logos from an ad-creative image or video URL. + 1. **IAB classification** – classify text/URL content against IAB + v3.0 taxonomy categories using a retriever pipeline that performs + semantic search against an IAB category reference corpus. + 2. **Brand-safety check** – score content for brand-safety risk + by identifying sensitive IAB categories in the classification. + 3. **Contextual search** – search indexed ad inventory via retriever + pipelines combining multimodal search, taxonomy enrichment, + and reranking. All methods are async and use ``httpx.AsyncClient`` under the hood. """ @@ -110,35 +132,113 @@ async def list_taxonomies(self, namespace: str | None = None) -> list[dict]: ) return data.get("results", []) + async def list_retrievers(self, namespace: str | None = None) -> list[dict]: + """List available retriever pipelines in the namespace.""" + data = await self._request( + "POST", "/v1/retrievers/list", namespace=namespace, json={} + ) + return data.get("results", []) + async def classify_content( self, - taxonomy_id: str, - text: str | None = None, - url: str | None = None, + retriever_id: str, + text: str, *, namespace: str | None = None, + limit: int = 10, ) -> dict[str, Any]: - """Classify content against an IAB taxonomy. + """Classify content into IAB taxonomy categories. - Either *text* (raw page text) or *url* (page URL for Mixpeek to - scrape) should be supplied. Returns taxonomy node assignments - with confidence scores. - """ - body: dict[str, Any] = {} - if text: - body["input"] = {"text": text} - elif url: - body["input"] = {"url": url} - else: - raise ValueError("Either text or url must be provided") + Uses a retriever pipeline that performs semantic search against + an IAB category reference corpus. Each result represents an + IAB category match with a confidence score. + Args: + retriever_id: Retriever pipeline configured for IAB search. + text: Content text to classify. + namespace: Override namespace for this call. + limit: Max category matches to return. + + Returns: + Dict with ``documents`` list, each containing + ``iab_category_name``, ``iab_path``, ``iab_tier``, + and ``score``. + """ + body: dict[str, Any] = { + "inputs": {"query": text}, + "page_size": limit, + } return await self._request( "POST", - f"/v1/taxonomies/execute/{taxonomy_id}", + f"/v1/retrievers/{retriever_id}/execute", namespace=namespace, json=body, ) + async def check_brand_safety( + self, + retriever_id: str, + text: str, + *, + namespace: str | None = None, + threshold: float = 0.80, + limit: int = 10, + ) -> dict[str, Any]: + """Check content for brand-safety risk. + + Classifies content via the IAB retriever and flags any matches + against known sensitive categories (gambling, adult, etc.). + + Args: + retriever_id: Retriever pipeline configured for IAB search. + text: Content text to evaluate. + namespace: Override namespace for this call. + threshold: Minimum score to consider a category match. + limit: Max category matches to evaluate. + + Returns: + Dict with ``safe`` bool, ``risk_level`` (low/medium/high), + ``flagged_categories`` list, and full ``categories`` list. + """ + result = await self.classify_content( + retriever_id=retriever_id, + text=text, + namespace=namespace, + limit=limit, + ) + + docs = result.get("documents", []) + categories = [] + flagged = [] + + for doc in docs: + score = doc.get("score", 0) + if score < threshold: + continue + cat_name = doc.get("iab_category_name", "") + entry = { + "category": cat_name, + "path": doc.get("iab_path", []), + "tier": doc.get("iab_tier"), + "score": score, + } + categories.append(entry) + if cat_name in BRAND_UNSAFE_CATEGORIES: + flagged.append(entry) + + if flagged: + max_score = max(f["score"] for f in flagged) + risk_level = "high" if max_score >= 0.85 else "medium" + else: + risk_level = "low" + + return { + "safe": len(flagged) == 0, + "risk_level": risk_level, + "flagged_categories": flagged, + "categories": categories, + } + async def search_content( self, retriever_id: str, diff --git a/src/ad_buyer/crews/channel_crews.py b/src/ad_buyer/crews/channel_crews.py index 0fdcd21..b07b701 100644 --- a/src/ad_buyer/crews/channel_crews.py +++ b/src/ad_buyer/crews/channel_crews.py @@ -19,16 +19,31 @@ from ..tools.execution.line_management import BookLineTool, CreateLineTool, ReserveLineTool from ..tools.execution.order_management import CreateOrderTool from ..tools.research.avails_check import AvailsCheckTool +from ..tools.research.contextual_enrichment import ( + BrandSafetyTool, + ClassifyContentTool, + ContextualSearchTool, +) from ..tools.research.product_search import ProductSearchTool def _create_research_tools(client: OpenDirectClient) -> list[Any]: """Create research tools with the OpenDirect client.""" - return [ + tools: list[Any] = [ ProductSearchTool(client), AvailsCheckTool(client), ] + # Add Mixpeek contextual enrichment tools when configured + if settings.mixpeek_api_key: + tools.extend([ + ClassifyContentTool(), + BrandSafetyTool(), + ContextualSearchTool(), + ]) + + return tools + def _create_execution_tools(client: OpenDirectClient) -> list[Any]: """Create execution tools with the OpenDirect client.""" diff --git a/src/ad_buyer/interfaces/mcp_server.py b/src/ad_buyer/interfaces/mcp_server.py index fd19ae3..abf3a62 100644 --- a/src/ad_buyer/interfaces/mcp_server.py +++ b/src/ad_buyer/interfaces/mcp_server.py @@ -2885,45 +2885,99 @@ def _get_mixpeek_client() -> MixpeekClient: ) +async def _discover_iab_retriever(client: MixpeekClient) -> str | None: + """Find an IAB text search retriever in the current namespace.""" + retrievers = await client.list_retrievers() + for r in retrievers: + name = r.get("retriever_name", "").lower() + if "iab" in name and "text" in name: + return r["retriever_id"] + for r in retrievers: + if "iab" in r.get("retriever_name", "").lower(): + return r["retriever_id"] + return None + + @mcp.tool( name="classify_content", description=( "Classify page or ad-creative content into IAB v3.0 taxonomy " - "categories using Mixpeek. Supply either raw text or a URL. " - "Returns category codes with confidence scores for contextual " - "targeting and brand-safety evaluation." + "categories using Mixpeek. Supply text content to classify. " + "Returns ranked IAB category matches with hierarchical paths " + "(e.g. Sports > American Football) and confidence scores for " + "contextual targeting." ), ) async def classify_content( - text: str | None = None, - url: str | None = None, - taxonomy_id: str | None = None, + text: str, + retriever_id: str | None = None, + limit: int = 10, ) -> str: - """Classify content into IAB taxonomy categories via Mixpeek.""" - if not text and not url: - return json.dumps({"error": "Either text or url must be provided"}) + """Classify content into IAB taxonomy categories via Mixpeek retriever.""" + if not text: + return json.dumps({"error": "text must be provided"}) client = _get_mixpeek_client() try: - tid = taxonomy_id - if not tid: - taxonomies = await client.list_taxonomies() - iab = [ - t for t in taxonomies - if "iab" in t.get("taxonomy_name", "").lower() - ] - if iab: - tid = iab[0]["taxonomy_id"] - elif taxonomies: - tid = taxonomies[0]["taxonomy_id"] - else: + rid = retriever_id + if not rid: + rid = await _discover_iab_retriever(client) + if not rid: return json.dumps({ - "error": "No taxonomies found in this namespace. " - "Create one first via the Mixpeek dashboard." + "error": "No IAB retriever found in this namespace. " + "Set MIXPEEK_NAMESPACE or pass retriever_id explicitly." }) result = await client.classify_content( - taxonomy_id=tid, text=text, url=url, + retriever_id=rid, text=text, limit=limit, + ) + docs = result.get("documents", []) + categories = [ + { + "category": d.get("iab_category_name"), + "path": d.get("iab_path", []), + "tier": d.get("iab_tier"), + "score": round(d.get("score", 0), 4), + } + for d in docs + ] + return json.dumps({"categories": categories}, indent=2) + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +@mcp.tool( + name="check_brand_safety", + description=( + "Evaluate page or ad-creative content for brand-safety risk. " + "Classifies content into IAB categories and flags sensitive " + "categories (gambling, adult, etc.). Returns safe/unsafe " + "verdict, risk level (low/medium/high), and flagged categories." + ), +) +async def check_brand_safety( + text: str, + retriever_id: str | None = None, + threshold: float = 0.80, +) -> str: + """Check content for brand-safety risk via Mixpeek.""" + if not text: + return json.dumps({"error": "text must be provided"}) + + client = _get_mixpeek_client() + try: + rid = retriever_id + if not rid: + rid = await _discover_iab_retriever(client) + if not rid: + return json.dumps({ + "error": "No IAB retriever found in this namespace." + }) + + result = await client.check_brand_safety( + retriever_id=rid, text=text, threshold=threshold, ) return json.dumps(result, indent=2) except MixpeekError as exc: diff --git a/src/ad_buyer/tools/research/__init__.py b/src/ad_buyer/tools/research/__init__.py index 1f1f3e7..2a53d85 100644 --- a/src/ad_buyer/tools/research/__init__.py +++ b/src/ad_buyer/tools/research/__init__.py @@ -4,7 +4,7 @@ """Research tools for inventory discovery.""" from .avails_check import AvailsCheckTool -from .contextual_enrichment import ClassifyContentTool, ContextualSearchTool +from .contextual_enrichment import BrandSafetyTool, ClassifyContentTool, ContextualSearchTool from .product_search import ProductSearchTool __all__ = [ @@ -12,4 +12,5 @@ "AvailsCheckTool", "ClassifyContentTool", "ContextualSearchTool", + "BrandSafetyTool", ] diff --git a/src/ad_buyer/tools/research/contextual_enrichment.py b/src/ad_buyer/tools/research/contextual_enrichment.py index 65f09b0..7e0dc08 100644 --- a/src/ad_buyer/tools/research/contextual_enrichment.py +++ b/src/ad_buyer/tools/research/contextual_enrichment.py @@ -1,8 +1,8 @@ # Contextual enrichment tools powered by Mixpeek. # -# Provides IAB taxonomy classification and brand-safety scoring -# that buyer agents can use during inventory research and deal -# evaluation. +# Provides IAB taxonomy classification, brand-safety scoring, +# and contextual inventory search that buyer agents can use during +# inventory research and deal evaluation. from __future__ import annotations @@ -28,31 +28,38 @@ def _get_mixpeek_client() -> MixpeekClient: # ----------------------------------------------------------------------- -# 1. Content Classification +# 1. Content Classification (IAB Taxonomy) # ----------------------------------------------------------------------- class ClassifyContentInput(BaseModel): """Input for contextual content classification.""" - text: str | None = Field( - default=None, - description="Raw page/creative text to classify", + text: str = Field( + description="Page or ad-creative text to classify into IAB categories", ) - url: str | None = Field( + retriever_id: str | None = Field( default=None, - description="Page URL — Mixpeek will scrape and classify it", + description=( + "Mixpeek retriever pipeline ID for IAB classification. " + "If omitted, auto-discovers an IAB retriever in the namespace." + ), ) - taxonomy_id: str | None = Field( - default=None, - description="Mixpeek taxonomy ID. Uses default IAB taxonomy if omitted.", + limit: int = Field( + default=10, + description="Max IAB category matches to return", + ge=1, + le=50, ) class ClassifyContentTool(BaseTool): """Classify page or creative content into IAB v3.0 categories. - Uses Mixpeek's taxonomy engine to map text or a URL to - standardised IAB content categories with confidence scores. + Uses a Mixpeek retriever pipeline to perform semantic search + against an IAB category reference corpus. Returns ranked + category matches with hierarchical paths (e.g. + Sports > American Football) and confidence scores. + Buyer agents use these categories for contextual targeting decisions and brand-safety evaluation. """ @@ -60,53 +67,138 @@ class ClassifyContentTool(BaseTool): name: str = "classify_content" description: str = ( "Classify page or ad-creative content into IAB v3.0 taxonomy " - "categories using Mixpeek. Supply either raw text or a URL. " - "Returns category codes with confidence scores for contextual " - "targeting and brand-safety evaluation." + "categories using Mixpeek. Supply text content to classify. " + "Returns ranked IAB category matches with hierarchical paths " + "(e.g. Sports > American Football) and confidence scores for " + "contextual targeting and brand-safety evaluation." ) args_schema: type[BaseModel] = ClassifyContentInput def _run( self, - text: str | None = None, - url: str | None = None, - taxonomy_id: str | None = None, + text: str = "", + retriever_id: str | None = None, + limit: int = 10, ) -> str: - return run_async(self._arun(text=text, url=url, taxonomy_id=taxonomy_id)) + return run_async(self._arun(text=text, retriever_id=retriever_id, limit=limit)) async def _arun( self, - text: str | None = None, - url: str | None = None, - taxonomy_id: str | None = None, + text: str = "", + retriever_id: str | None = None, + limit: int = 10, ) -> str: - if not text and not url: - return json.dumps({"error": "Either text or url must be provided"}) + if not text: + return json.dumps({"error": "text must be provided"}) client = _get_mixpeek_client() try: - # If no taxonomy_id supplied, try to find an IAB taxonomy - tid = taxonomy_id - if not tid: - taxonomies = await client.list_taxonomies() - iab = [ - t for t in taxonomies - if "iab" in t.get("taxonomy_name", "").lower() - ] - if iab: - tid = iab[0]["taxonomy_id"] - elif taxonomies: - tid = taxonomies[0]["taxonomy_id"] - else: + rid = retriever_id + if not rid: + rid = await _discover_iab_retriever(client) + if not rid: return json.dumps({ - "error": "No taxonomies found in this namespace. " - "Create one first via the Mixpeek dashboard." + "error": "No IAB retriever found in this namespace. " + "Set MIXPEEK_NAMESPACE to a namespace with IAB data, " + "or pass retriever_id explicitly." }) result = await client.classify_content( - taxonomy_id=tid, - text=text, - url=url, + retriever_id=rid, text=text, limit=limit, + ) + + # Simplify output for the agent + docs = result.get("documents", []) + categories = [ + { + "category": d.get("iab_category_name"), + "path": d.get("iab_path", []), + "tier": d.get("iab_tier"), + "score": round(d.get("score", 0), 4), + } + for d in docs + ] + return json.dumps({"categories": categories}, indent=2) + + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +# ----------------------------------------------------------------------- +# 2. Brand Safety Check +# ----------------------------------------------------------------------- + +class BrandSafetyInput(BaseModel): + """Input for brand-safety evaluation.""" + + text: str = Field( + description="Page or ad-creative text to evaluate for brand safety", + ) + retriever_id: str | None = Field( + default=None, + description=( + "Mixpeek retriever pipeline ID for IAB classification. " + "If omitted, auto-discovers an IAB retriever in the namespace." + ), + ) + threshold: float = Field( + default=0.80, + description="Minimum confidence score to consider a category match", + ge=0.0, + le=1.0, + ) + + +class BrandSafetyTool(BaseTool): + """Evaluate content for brand-safety risk. + + Classifies content via IAB taxonomy and flags matches against + known sensitive categories (gambling, adult content, etc.). + Returns a safety verdict with risk level and flagged categories. + """ + + name: str = "check_brand_safety" + description: str = ( + "Evaluate page or ad-creative content for brand-safety risk. " + "Classifies content into IAB categories and flags sensitive " + "categories (gambling, adult, etc.). Returns safe/unsafe verdict, " + "risk level (low/medium/high), and flagged categories." + ) + args_schema: type[BaseModel] = BrandSafetyInput + + def _run( + self, + text: str = "", + retriever_id: str | None = None, + threshold: float = 0.80, + ) -> str: + return run_async( + self._arun(text=text, retriever_id=retriever_id, threshold=threshold) + ) + + async def _arun( + self, + text: str = "", + retriever_id: str | None = None, + threshold: float = 0.80, + ) -> str: + if not text: + return json.dumps({"error": "text must be provided"}) + + client = _get_mixpeek_client() + try: + rid = retriever_id + if not rid: + rid = await _discover_iab_retriever(client) + if not rid: + return json.dumps({ + "error": "No IAB retriever found in this namespace." + }) + + result = await client.check_brand_safety( + retriever_id=rid, text=text, threshold=threshold, ) return json.dumps(result, indent=2) @@ -117,7 +209,7 @@ async def _arun( # ----------------------------------------------------------------------- -# 2. Contextual Search (inventory enrichment) +# 3. Contextual Search (inventory enrichment) # ----------------------------------------------------------------------- class ContextualSearchInput(BaseModel): @@ -183,3 +275,22 @@ async def _arun( return json.dumps({"error": str(exc)}) finally: await client.close() + + +# ----------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------- + +async def _discover_iab_retriever(client: MixpeekClient) -> str | None: + """Find an IAB text search retriever in the current namespace.""" + retrievers = await client.list_retrievers() + # Prefer a retriever with "iab" and "text" in the name + for r in retrievers: + name = r.get("retriever_name", "").lower() + if "iab" in name and "text" in name: + return r["retriever_id"] + # Fall back to any retriever with "iab" in the name + for r in retrievers: + if "iab" in r.get("retriever_name", "").lower(): + return r["retriever_id"] + return None diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/e2e/test_mixpeek_production.py b/tests/e2e/test_mixpeek_production.py new file mode 100644 index 0000000..84099bc --- /dev/null +++ b/tests/e2e/test_mixpeek_production.py @@ -0,0 +1,331 @@ +# End-to-end test against the production Mixpeek API. +# +# Exercises the full buyer-agent contextual enrichment flow: +# 1. Health check (public endpoint) +# 2. MCP tools discovery (public endpoint) +# 3. IAB taxonomy classification of real ad content +# 4. Brand-safety scoring of safe and sensitive content +# 5. Contextual inventory search via retriever pipeline +# +# Requires: +# MIXPEEK_API_KEY — a valid Mixpeek API key +# MIXPEEK_NAMESPACE — namespace with IAB data (default: golden_adtech_iab) +# +# Run: +# MIXPEEK_API_KEY=mxp_sk_... pytest tests/e2e/test_mixpeek_production.py -v + +from __future__ import annotations + +import os + +import pytest +import pytest_asyncio + +from ad_buyer.clients.mixpeek_client import MixpeekClient, MixpeekError + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +API_KEY = os.environ.get("MIXPEEK_API_KEY", "") +BASE_URL = os.environ.get("MIXPEEK_BASE_URL", "https://api.mixpeek.com") +NAMESPACE = os.environ.get("MIXPEEK_NAMESPACE", "golden_adtech_iab") + +# Known retriever in golden_adtech_iab namespace +IAB_TEXT_RETRIEVER = os.environ.get( + "MIXPEEK_IAB_RETRIEVER_ID", "ret_f7fbefced358bd" +) + +pytestmark = [ + pytest.mark.e2e, + pytest.mark.skipif(not API_KEY, reason="MIXPEEK_API_KEY not set"), +] + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest_asyncio.fixture +async def client(): + c = MixpeekClient( + api_key=API_KEY, + base_url=BASE_URL, + namespace=NAMESPACE, + ) + yield c + await c.close() + + +# --------------------------------------------------------------------------- +# 1. Health & Discovery +# --------------------------------------------------------------------------- + +class TestHealthAndDiscovery: + @pytest.mark.asyncio + async def test_mcp_health(self, client: MixpeekClient): + """MCP server health check returns healthy status.""" + result = await client.health() + assert result["status"] == "healthy" + assert result["tools_count"] >= 40 # currently 48 + + @pytest.mark.asyncio + async def test_mcp_tools_list(self, client: MixpeekClient): + """Public tools endpoint returns the full tool catalog.""" + tools = await client.get_tools() + assert len(tools) >= 40 + names = {t["name"] for t in tools} + # Spot-check a few expected tools + assert "create_namespace" in names + assert "execute_retriever" in names + + @pytest.mark.asyncio + async def test_list_retrievers(self, client: MixpeekClient): + """Namespace has IAB retriever pipelines.""" + retrievers = await client.list_retrievers() + assert len(retrievers) > 0 + names = {r["retriever_name"] for r in retrievers} + assert any("iab" in n.lower() for n in names), ( + f"No IAB retriever found. Available: {names}" + ) + + +# --------------------------------------------------------------------------- +# 2. IAB Content Classification +# --------------------------------------------------------------------------- + +class TestIABClassification: + @pytest.mark.asyncio + async def test_classify_sports_content(self, client: MixpeekClient): + """NFL content classifies as Sports > American Football.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Breaking: NFL playoff scores and highlights from Sunday " + "night football. Tom Brady analysis and Super Bowl predictions." + ), + ) + docs = result["documents"] + assert len(docs) > 0 + + top = docs[0] + assert top["score"] > 0.80 + assert "Sports" in top["iab_path"] + # Top result should be American Football or Sports + assert top["iab_category_name"] in ( + "American Football", "Sports", "College Football", + ) + + @pytest.mark.asyncio + async def test_classify_automotive_content(self, client: MixpeekClient): + """Luxury car review classifies as Automotive > Luxury Cars.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "The 2026 Mercedes-Benz S-Class review: luxury sedan with " + "cutting-edge autonomous driving features, premium leather " + "interior, and a 496-horsepower twin-turbo V8 engine." + ), + ) + docs = result["documents"] + assert len(docs) > 0 + + top = docs[0] + assert top["score"] > 0.80 + assert "Automotive" in top["iab_path"] + # Top result should be an Automotive subcategory + assert "Automotive" in top["iab_path"] + + @pytest.mark.asyncio + async def test_classify_food_content(self, client: MixpeekClient): + """Cooking recipe classifies as Food & Drink > Cooking.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Easy homemade pasta recipe: combine fresh eggs, semolina " + "flour, and olive oil. Roll the dough thin, cut into " + "fettuccine, and cook in salted boiling water for 3 minutes." + ), + ) + docs = result["documents"] + assert len(docs) > 0 + + top = docs[0] + assert top["score"] > 0.80 + # Should be in Food & Drink or Cooking + paths_flat = [ + cat for d in docs[:3] for cat in d.get("iab_path", []) + ] + assert "Food & Drink" in paths_flat or "Cooking" in paths_flat + + @pytest.mark.asyncio + async def test_classify_technology_content(self, client: MixpeekClient): + """AI/ML content classifies as Technology > Artificial Intelligence.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Artificial intelligence and machine learning are " + "transforming enterprise software. New AI startups raised " + "$10B in Q1 2026 for large language model development." + ), + ) + docs = result["documents"] + assert len(docs) > 0 + + top = docs[0] + assert top["score"] > 0.85 + assert top["iab_category_name"] == "Artificial Intelligence" + assert "Technology & Computing" in top["iab_path"] + + @pytest.mark.asyncio + async def test_classify_returns_hierarchical_paths(self, client: MixpeekClient): + """Results include full IAB hierarchy: tier, path, category name.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text="Professional basketball NBA playoffs Lakers vs Celtics", + ) + doc = result["documents"][0] + assert "iab_category_name" in doc + assert "iab_path" in doc + assert "iab_tier" in doc + assert isinstance(doc["iab_path"], list) + assert doc["iab_tier"] in (1, 2, 3, 4) + + +# --------------------------------------------------------------------------- +# 3. Brand Safety +# --------------------------------------------------------------------------- + +class TestBrandSafety: + @pytest.mark.asyncio + async def test_safe_content(self, client: MixpeekClient): + """Benign sports content is flagged as safe.""" + result = await client.check_brand_safety( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Local high school basketball team wins state championship " + "in an exciting overtime game at the civic center." + ), + ) + assert result["safe"] is True + assert result["risk_level"] == "low" + assert len(result["flagged_categories"]) == 0 + + @pytest.mark.asyncio + async def test_gambling_content_flagged(self, client: MixpeekClient): + """Gambling content is flagged as brand-unsafe.""" + result = await client.check_brand_safety( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Online poker tournament with $1M prize pool. Texas Hold'em " + "strategy guide for casino gambling. Bet on sports with our " + "new odds calculator." + ), + ) + assert result["safe"] is False + assert result["risk_level"] in ("medium", "high") + flagged_names = [c["category"] for c in result["flagged_categories"]] + assert any( + cat in flagged_names + for cat in ( + "Poker and Professional Gambling", + "Casinos & Gambling", + "Casino Games", + ) + ), f"Expected gambling categories, got: {flagged_names}" + + @pytest.mark.asyncio + async def test_brand_safety_threshold(self, client: MixpeekClient): + """Higher threshold filters out lower-confidence matches.""" + result_low = await client.check_brand_safety( + retriever_id=IAB_TEXT_RETRIEVER, + text="Online poker tournament guide", + threshold=0.70, + ) + result_high = await client.check_brand_safety( + retriever_id=IAB_TEXT_RETRIEVER, + text="Online poker tournament guide", + threshold=0.95, + ) + # More categories pass at lower threshold + assert len(result_low["categories"]) >= len(result_high["categories"]) + + +# --------------------------------------------------------------------------- +# 4. Contextual Search +# --------------------------------------------------------------------------- + +class TestContextualSearch: + @pytest.mark.asyncio + async def test_search_returns_results(self, client: MixpeekClient): + """Contextual search returns ranked results with scores.""" + result = await client.search_content( + retriever_id=IAB_TEXT_RETRIEVER, + query="sports and athletics", + limit=5, + ) + assert "documents" in result + docs = result["documents"] + assert len(docs) > 0 + + # Each result has a score + for d in docs: + assert "score" in d + assert d["score"] > 0 + + @pytest.mark.asyncio + async def test_search_results_have_iab_metadata(self, client: MixpeekClient): + """Search results include IAB category metadata.""" + result = await client.search_content( + retriever_id=IAB_TEXT_RETRIEVER, + query="technology computing artificial intelligence", + limit=3, + ) + doc = result["documents"][0] + assert "iab_category_name" in doc + assert "iab_path" in doc + assert "score" in doc + + @pytest.mark.asyncio + async def test_search_returns_iab_enriched_results(self, client: MixpeekClient): + """Search results contain IAB enrichment from the retriever.""" + result = await client.search_content( + retriever_id=IAB_TEXT_RETRIEVER, + query="food cooking recipes", + ) + docs = result["documents"] + assert len(docs) > 0 + # Results from IAB retriever have category metadata + for d in docs[:3]: + assert "iab_category_name" in d + assert "iab_path" in d + + +# --------------------------------------------------------------------------- +# 5. Error Handling +# --------------------------------------------------------------------------- + +class TestErrorHandling: + @pytest.mark.asyncio + async def test_invalid_retriever_id(self, client: MixpeekClient): + """Invalid retriever ID raises MixpeekError.""" + with pytest.raises(MixpeekError) as exc_info: + await client.classify_content( + retriever_id="ret_nonexistent", + text="test content", + ) + assert exc_info.value.status_code in (404, 400, 422) + + @pytest.mark.asyncio + async def test_invalid_api_key(self): + """Invalid API key raises MixpeekError.""" + bad_client = MixpeekClient( + api_key="invalid_key", + namespace=NAMESPACE, + ) + try: + with pytest.raises(MixpeekError) as exc_info: + await bad_client.list_retrievers() + assert exc_info.value.status_code in (401, 403, 404) + finally: + await bad_client.close() diff --git a/tests/unit/test_contextual_enrichment.py b/tests/unit/test_contextual_enrichment.py index 60f375d..e9008ff 100644 --- a/tests/unit/test_contextual_enrichment.py +++ b/tests/unit/test_contextual_enrichment.py @@ -11,6 +11,7 @@ import pytest from ad_buyer.tools.research.contextual_enrichment import ( + BrandSafetyTool, ClassifyContentTool, ContextualSearchTool, ) @@ -21,6 +22,11 @@ def classify_tool(): return ClassifyContentTool() +@pytest.fixture +def brand_safety_tool(): + return BrandSafetyTool() + + @pytest.fixture def search_tool(): return ContextualSearchTool() @@ -37,10 +43,17 @@ async def test_returns_error_without_input(self, classify_tool): assert "error" in data @pytest.mark.asyncio - async def test_classify_with_taxonomy_id(self, classify_tool): + async def test_classify_with_retriever_id(self, classify_tool): mock_client = AsyncMock() mock_client.classify_content.return_value = { - "results": [{"label": "Sports", "score": 0.9}] + "documents": [ + { + "iab_category_name": "Sports", + "iab_path": ["Sports"], + "iab_tier": 1, + "score": 0.9, + } + ] } mock_client.close = AsyncMock() @@ -49,20 +62,21 @@ async def test_classify_with_taxonomy_id(self, classify_tool): return_value=mock_client, ): result = await classify_tool._arun( - text="NFL scores", taxonomy_id="tax-123" + text="NFL scores", retriever_id="ret-123" ) data = json.loads(result) - assert data["results"][0]["label"] == "Sports" + assert data["categories"][0]["category"] == "Sports" + assert data["categories"][0]["score"] == 0.9 mock_client.close.assert_called_once() @pytest.mark.asyncio - async def test_classify_auto_discovers_taxonomy(self, classify_tool): + async def test_classify_auto_discovers_retriever(self, classify_tool): mock_client = AsyncMock() - mock_client.list_taxonomies.return_value = [ - {"taxonomy_id": "t1", "taxonomy_name": "IAB Content Taxonomy v3.0"} + mock_client.list_retrievers.return_value = [ + {"retriever_id": "ret-1", "retriever_name": "iab_text_search"} ] - mock_client.classify_content.return_value = {"results": []} + mock_client.classify_content.return_value = {"documents": []} mock_client.close = AsyncMock() with patch( @@ -71,15 +85,14 @@ async def test_classify_auto_discovers_taxonomy(self, classify_tool): ): result = await classify_tool._arun(text="test content") - data = json.loads(result) mock_client.classify_content.assert_called_once_with( - taxonomy_id="t1", text="test content", url=None, + retriever_id="ret-1", text="test content", limit=10, ) @pytest.mark.asyncio - async def test_classify_no_taxonomies(self, classify_tool): + async def test_classify_no_retriever(self, classify_tool): mock_client = AsyncMock() - mock_client.list_taxonomies.return_value = [] + mock_client.list_retrievers.return_value = [] mock_client.close = AsyncMock() with patch( @@ -89,7 +102,61 @@ async def test_classify_no_taxonomies(self, classify_tool): result = await classify_tool._arun(text="test") data = json.loads(result) - assert "No taxonomies found" in data["error"] + assert "No IAB retriever found" in data["error"] + + +class TestBrandSafetyTool: + def test_tool_name(self, brand_safety_tool): + assert brand_safety_tool.name == "check_brand_safety" + + @pytest.mark.asyncio + async def test_safe_content(self, brand_safety_tool): + mock_client = AsyncMock() + mock_client.list_retrievers.return_value = [ + {"retriever_id": "ret-1", "retriever_name": "iab_text_search"} + ] + mock_client.check_brand_safety.return_value = { + "safe": True, + "risk_level": "low", + "flagged_categories": [], + "categories": [{"category": "Sports", "score": 0.9}], + } + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await brand_safety_tool._arun(text="basketball game") + + data = json.loads(result) + assert data["safe"] is True + + @pytest.mark.asyncio + async def test_unsafe_content(self, brand_safety_tool): + mock_client = AsyncMock() + mock_client.list_retrievers.return_value = [ + {"retriever_id": "ret-1", "retriever_name": "iab_text_search"} + ] + mock_client.check_brand_safety.return_value = { + "safe": False, + "risk_level": "high", + "flagged_categories": [ + {"category": "Casinos & Gambling", "score": 0.88} + ], + "categories": [], + } + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await brand_safety_tool._arun(text="casino gambling") + + data = json.loads(result) + assert data["safe"] is False + assert data["risk_level"] == "high" class TestContextualSearchTool: @@ -100,7 +167,7 @@ def test_tool_name(self, search_tool): async def test_search(self, search_tool): mock_client = AsyncMock() mock_client.search_content.return_value = { - "results": [{"doc_id": "d1", "score": 0.85}] + "documents": [{"document_id": "d1", "score": 0.85}] } mock_client.close = AsyncMock() @@ -113,7 +180,7 @@ async def test_search(self, search_tool): ) data = json.loads(result) - assert data["results"][0]["score"] == 0.85 + assert data["documents"][0]["score"] == 0.85 mock_client.search_content.assert_called_once_with( retriever_id="ret-1", query="sports news", limit=5, ) diff --git a/tests/unit/test_mixpeek_client.py b/tests/unit/test_mixpeek_client.py index f136ada..5443e45 100644 --- a/tests/unit/test_mixpeek_client.py +++ b/tests/unit/test_mixpeek_client.py @@ -11,7 +11,11 @@ import httpx import pytest -from ad_buyer.clients.mixpeek_client import MixpeekClient, MixpeekError +from ad_buyer.clients.mixpeek_client import ( + BRAND_UNSAFE_CATEGORIES, + MixpeekClient, + MixpeekError, +) @pytest.fixture @@ -51,20 +55,22 @@ async def test_classify_with_text(self, client): mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = { - "results": [{"label": "Sports", "score": 0.95}] + "documents": [ + { + "iab_category_name": "American Football", + "iab_path": ["Sports", "American Football"], + "iab_tier": 2, + "score": 0.87, + } + ] } with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): result = await client.classify_content( - taxonomy_id="tax-123", text="NFL football scores" + retriever_id="ret-123", text="NFL football scores" ) - assert result["results"][0]["label"] == "Sports" - - @pytest.mark.asyncio - async def test_classify_requires_text_or_url(self, client): - with pytest.raises(ValueError, match="Either text or url"): - await client.classify_content(taxonomy_id="tax-123") + assert result["documents"][0]["iab_category_name"] == "American Football" @pytest.mark.asyncio async def test_classify_api_error_raises(self, client): @@ -75,16 +81,101 @@ async def test_classify_api_error_raises(self, client): with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): with pytest.raises(MixpeekError, match="401"): await client.classify_content( - taxonomy_id="tax-123", text="test" + retriever_id="ret-123", text="test" ) +class TestBrandSafety: + @pytest.mark.asyncio + async def test_safe_content(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "documents": [ + { + "iab_category_name": "Sports", + "iab_path": ["Sports"], + "iab_tier": 1, + "score": 0.90, + } + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.check_brand_safety( + retriever_id="ret-123", text="local basketball game" + ) + + assert result["safe"] is True + assert result["risk_level"] == "low" + assert len(result["flagged_categories"]) == 0 + + @pytest.mark.asyncio + async def test_unsafe_gambling_content(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "documents": [ + { + "iab_category_name": "Poker and Professional Gambling", + "iab_path": ["Sports", "Poker and Professional Gambling"], + "iab_tier": 2, + "score": 0.88, + }, + { + "iab_category_name": "Casinos & Gambling", + "iab_path": ["Attractions", "Casinos & Gambling"], + "iab_tier": 2, + "score": 0.85, + }, + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.check_brand_safety( + retriever_id="ret-123", text="poker casino betting" + ) + + assert result["safe"] is False + assert result["risk_level"] == "high" + assert len(result["flagged_categories"]) == 2 + + @pytest.mark.asyncio + async def test_threshold_filters_low_scores(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "documents": [ + { + "iab_category_name": "Casinos & Gambling", + "iab_path": ["Attractions", "Casinos & Gambling"], + "iab_tier": 2, + "score": 0.75, # Below threshold + }, + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.check_brand_safety( + retriever_id="ret-123", + text="card games", + threshold=0.80, + ) + + assert result["safe"] is True + assert len(result["categories"]) == 0 # Filtered out + + def test_brand_unsafe_categories_is_frozenset(self): + assert isinstance(BRAND_UNSAFE_CATEGORIES, frozenset) + assert "Casinos & Gambling" in BRAND_UNSAFE_CATEGORIES + + class TestSearchContent: @pytest.mark.asyncio async def test_search_builds_correct_body(self, client): mock_resp = MagicMock() mock_resp.status_code = 200 - mock_resp.json.return_value = {"results": []} + mock_resp.json.return_value = {"documents": []} with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp) as mock_req: await client.search_content( @@ -97,6 +188,24 @@ async def test_search_builds_correct_body(self, client): assert body["page_size"] == 5 +class TestListRetrievers: + @pytest.mark.asyncio + async def test_list_returns_results(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "results": [ + {"retriever_id": "r1", "retriever_name": "iab_text_search"} + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.list_retrievers() + + assert len(result) == 1 + assert result[0]["retriever_name"] == "iab_text_search" + + class TestListTaxonomies: @pytest.mark.asyncio async def test_list_returns_results(self, client):