diff --git a/.env.example b/.env.example index 155c3d7..315e8ae 100644 --- a/.env.example +++ b/.env.example @@ -21,6 +21,11 @@ DATABASE_URL=sqlite:///./ad_buyer.db # Optional Redis (for caching/pub-sub) REDIS_URL= +# Mixpeek Contextual Enrichment (optional: enables classify_content and contextual_search tools) +MIXPEEK_API_KEY= +MIXPEEK_BASE_URL=https://api.mixpeek.com +MIXPEEK_NAMESPACE= + # Environment ENVIRONMENT=development LOG_LEVEL=INFO diff --git a/src/ad_buyer/clients/__init__.py b/src/ad_buyer/clients/__init__.py index 1c79355..8f697fe 100644 --- a/src/ad_buyer/clients/__init__.py +++ b/src/ad_buyer/clients/__init__.py @@ -6,6 +6,7 @@ from .a2a_client import A2AClient, A2AError, A2AResponse from .deals_client import DealsClient, DealsClientError from .mcp_client import IABMCPClient, MCPClientError, MCPToolResult +from .mixpeek_client import MixpeekClient, MixpeekError from .opendirect_client import OpenDirectClient from .ucp_client import UCPClient, UCPExchangeResult from .unified_client import Protocol, UnifiedClient, UnifiedResult @@ -31,4 +32,7 @@ # IAB Deals API v1.0 client (quote-then-book flow) "DealsClient", "DealsClientError", + # Mixpeek contextual enrichment + "MixpeekClient", + "MixpeekError", ] diff --git a/src/ad_buyer/clients/mixpeek_client.py b/src/ad_buyer/clients/mixpeek_client.py new file mode 100644 index 0000000..0221513 --- /dev/null +++ b/src/ad_buyer/clients/mixpeek_client.py @@ -0,0 +1,294 @@ +# Mixpeek contextual enrichment client for the Ad Buyer Agent. +# +# Provides IAB taxonomy classification via retriever pipelines, +# brand-safety scoring, and contextual inventory search via the +# Mixpeek REST API. + +from __future__ import annotations + +import logging +from typing import Any + +import httpx + +logger = logging.getLogger(__name__) + +# Default timeout for Mixpeek API calls (seconds). +_DEFAULT_TIMEOUT = 30.0 + +# Brand-safety sensitive IAB categories that advertisers typically +# want to avoid or require explicit opt-in for. +BRAND_UNSAFE_CATEGORIES = frozenset({ + "Poker and Professional Gambling", + "Casinos & Gambling", + "Casino Games", + "Lotteries and Scratchcards", + "Sensitive Topics", + "Adult Content", + "Illegal Content", + "Debated Sensitive Social Topics", + "Terrorism", + "Crime", + "Drugs", + "Tobacco", + "Arms & Ammunition", + "Death & Grieving", +}) + + +class MixpeekError(Exception): + """Raised when a Mixpeek API call fails.""" + + def __init__(self, message: str, status_code: int | None = None): + super().__init__(message) + self.status_code = status_code + + +class MixpeekClient: + """Async HTTP client for the Mixpeek content-intelligence API. + + The client wraps capabilities useful during buyer-agent research + and execution phases: + + 1. **IAB classification** – classify text/URL content against IAB + v3.0 taxonomy categories using a retriever pipeline that performs + semantic search against an IAB category reference corpus. + 2. **Brand-safety check** – score content for brand-safety risk + by identifying sensitive IAB categories in the classification. + 3. **Contextual search** – search indexed ad inventory via retriever + pipelines combining multimodal search, taxonomy enrichment, + and reranking. + + All methods are async and use ``httpx.AsyncClient`` under the hood. + """ + + def __init__( + self, + api_key: str, + base_url: str = "https://api.mixpeek.com", + namespace: str | None = None, + timeout: float = _DEFAULT_TIMEOUT, + ): + self.api_key = api_key + self.base_url = base_url.rstrip("/") + self.namespace = namespace + self._client = httpx.AsyncClient( + base_url=self.base_url, + timeout=timeout, + follow_redirects=True, + ) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _headers(self, namespace: str | None = None) -> dict[str, str]: + """Build request headers with auth and optional namespace.""" + h: dict[str, str] = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + ns = namespace or self.namespace + if ns: + h["X-Namespace"] = ns + return h + + async def _request( + self, + method: str, + path: str, + *, + namespace: str | None = None, + json: dict | None = None, + params: dict | None = None, + ) -> dict[str, Any]: + """Send a request and return the parsed JSON response.""" + try: + resp = await self._client.request( + method, + path, + headers=self._headers(namespace), + json=json, + params=params, + ) + except httpx.HTTPError as exc: + raise MixpeekError(f"HTTP error: {exc}") from exc + + if resp.status_code >= 400: + raise MixpeekError( + f"Mixpeek API error {resp.status_code}: {resp.text}", + status_code=resp.status_code, + ) + return resp.json() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def list_taxonomies(self, namespace: str | None = None) -> list[dict]: + """List available taxonomies in the namespace.""" + data = await self._request( + "POST", "/v1/taxonomies/list", namespace=namespace, json={} + ) + return data.get("results", []) + + async def list_retrievers(self, namespace: str | None = None) -> list[dict]: + """List available retriever pipelines in the namespace.""" + data = await self._request( + "POST", "/v1/retrievers/list", namespace=namespace, json={} + ) + return data.get("results", []) + + async def classify_content( + self, + retriever_id: str, + text: str, + *, + namespace: str | None = None, + limit: int = 10, + ) -> dict[str, Any]: + """Classify content into IAB taxonomy categories. + + Uses a retriever pipeline that performs semantic search against + an IAB category reference corpus. Each result represents an + IAB category match with a confidence score. + + Args: + retriever_id: Retriever pipeline configured for IAB search. + text: Content text to classify. + namespace: Override namespace for this call. + limit: Max category matches to return. + + Returns: + Dict with ``documents`` list, each containing + ``iab_category_name``, ``iab_path``, ``iab_tier``, + and ``score``. + """ + body: dict[str, Any] = { + "inputs": {"query": text}, + "page_size": limit, + } + return await self._request( + "POST", + f"/v1/retrievers/{retriever_id}/execute", + namespace=namespace, + json=body, + ) + + async def check_brand_safety( + self, + retriever_id: str, + text: str, + *, + namespace: str | None = None, + threshold: float = 0.80, + limit: int = 10, + ) -> dict[str, Any]: + """Check content for brand-safety risk. + + Classifies content via the IAB retriever and flags any matches + against known sensitive categories (gambling, adult, etc.). + + Args: + retriever_id: Retriever pipeline configured for IAB search. + text: Content text to evaluate. + namespace: Override namespace for this call. + threshold: Minimum score to consider a category match. + limit: Max category matches to evaluate. + + Returns: + Dict with ``safe`` bool, ``risk_level`` (low/medium/high), + ``flagged_categories`` list, and full ``categories`` list. + """ + result = await self.classify_content( + retriever_id=retriever_id, + text=text, + namespace=namespace, + limit=limit, + ) + + docs = result.get("documents", []) + categories = [] + flagged = [] + + for doc in docs: + score = doc.get("score", 0) + if score < threshold: + continue + cat_name = doc.get("iab_category_name", "") + entry = { + "category": cat_name, + "path": doc.get("iab_path", []), + "tier": doc.get("iab_tier"), + "score": score, + } + categories.append(entry) + if cat_name in BRAND_UNSAFE_CATEGORIES: + flagged.append(entry) + + if flagged: + max_score = max(f["score"] for f in flagged) + risk_level = "high" if max_score >= 0.85 else "medium" + else: + risk_level = "low" + + return { + "safe": len(flagged) == 0, + "risk_level": risk_level, + "flagged_categories": flagged, + "categories": categories, + } + + async def search_content( + self, + retriever_id: str, + query: str, + *, + namespace: str | None = None, + limit: int = 10, + filters: dict | None = None, + ) -> dict[str, Any]: + """Execute a retriever pipeline (contextual search). + + The retriever can combine feature search, brand-safety filtering, + taxonomy enrichment, and reranking in a single call. + """ + inputs: dict[str, Any] = {"query": query} + if filters: + inputs.update(filters) + + body: dict[str, Any] = { + "inputs": inputs, + "page_size": limit, + } + + return await self._request( + "POST", + f"/v1/retrievers/{retriever_id}/execute", + namespace=namespace, + json=body, + ) + + async def get_tools(self) -> list[dict]: + """Fetch the public MCP tools list (no auth required). + + Hits the ``/tools`` REST endpoint on the MCP server, which + returns all 48 tool definitions without authentication. + """ + resp = await self._client.get( + "https://mcp.mixpeek.com/tools", + timeout=10, + ) + return resp.json().get("tools", []) + + async def health(self) -> dict[str, Any]: + """Check MCP server health (no auth required).""" + resp = await self._client.get( + "https://mcp.mixpeek.com/health", + timeout=5, + ) + return resp.json() + + async def close(self) -> None: + """Shut down the underlying HTTP client.""" + await self._client.aclose() diff --git a/src/ad_buyer/config/settings.py b/src/ad_buyer/config/settings.py index 6c8ac08..f6f390f 100644 --- a/src/ad_buyer/config/settings.py +++ b/src/ad_buyer/config/settings.py @@ -71,6 +71,11 @@ def get_cors_origins(self) -> list[str]: return [] return [o.strip() for o in self.cors_allowed_origins.split(",") if o.strip()] + # Mixpeek contextual enrichment + mixpeek_api_key: str = "" + mixpeek_base_url: str = "https://api.mixpeek.com" + mixpeek_namespace: str = "" + # Environment environment: str = "development" log_level: str = "INFO" diff --git a/src/ad_buyer/crews/channel_crews.py b/src/ad_buyer/crews/channel_crews.py index 0fdcd21..b07b701 100644 --- a/src/ad_buyer/crews/channel_crews.py +++ b/src/ad_buyer/crews/channel_crews.py @@ -19,16 +19,31 @@ from ..tools.execution.line_management import BookLineTool, CreateLineTool, ReserveLineTool from ..tools.execution.order_management import CreateOrderTool from ..tools.research.avails_check import AvailsCheckTool +from ..tools.research.contextual_enrichment import ( + BrandSafetyTool, + ClassifyContentTool, + ContextualSearchTool, +) from ..tools.research.product_search import ProductSearchTool def _create_research_tools(client: OpenDirectClient) -> list[Any]: """Create research tools with the OpenDirect client.""" - return [ + tools: list[Any] = [ ProductSearchTool(client), AvailsCheckTool(client), ] + # Add Mixpeek contextual enrichment tools when configured + if settings.mixpeek_api_key: + tools.extend([ + ClassifyContentTool(), + BrandSafetyTool(), + ContextualSearchTool(), + ]) + + return tools + def _create_execution_tools(client: OpenDirectClient) -> list[Any]: """Create execution tools with the OpenDirect client.""" diff --git a/src/ad_buyer/interfaces/mcp_server.py b/src/ad_buyer/interfaces/mcp_server.py index 8eadbd7..abf3a62 100644 --- a/src/ad_buyer/interfaces/mcp_server.py +++ b/src/ad_buyer/interfaces/mcp_server.py @@ -69,6 +69,7 @@ ManualDealEntry, create_manual_deal, ) +from ..clients.mixpeek_client import MixpeekClient, MixpeekError logger = logging.getLogger(__name__) @@ -2869,6 +2870,151 @@ async def help_prompt() -> list[Message]: )] +# --------------------------------------------------------------------------- +# Contextual Enrichment (Mixpeek) +# --------------------------------------------------------------------------- + + +def _get_mixpeek_client() -> MixpeekClient: + """Create a MixpeekClient from current settings.""" + s = Settings() + return MixpeekClient( + api_key=s.mixpeek_api_key, + base_url=s.mixpeek_base_url, + namespace=s.mixpeek_namespace, + ) + + +async def _discover_iab_retriever(client: MixpeekClient) -> str | None: + """Find an IAB text search retriever in the current namespace.""" + retrievers = await client.list_retrievers() + for r in retrievers: + name = r.get("retriever_name", "").lower() + if "iab" in name and "text" in name: + return r["retriever_id"] + for r in retrievers: + if "iab" in r.get("retriever_name", "").lower(): + return r["retriever_id"] + return None + + +@mcp.tool( + name="classify_content", + description=( + "Classify page or ad-creative content into IAB v3.0 taxonomy " + "categories using Mixpeek. Supply text content to classify. " + "Returns ranked IAB category matches with hierarchical paths " + "(e.g. Sports > American Football) and confidence scores for " + "contextual targeting." + ), +) +async def classify_content( + text: str, + retriever_id: str | None = None, + limit: int = 10, +) -> str: + """Classify content into IAB taxonomy categories via Mixpeek retriever.""" + if not text: + return json.dumps({"error": "text must be provided"}) + + client = _get_mixpeek_client() + try: + rid = retriever_id + if not rid: + rid = await _discover_iab_retriever(client) + if not rid: + return json.dumps({ + "error": "No IAB retriever found in this namespace. " + "Set MIXPEEK_NAMESPACE or pass retriever_id explicitly." + }) + + result = await client.classify_content( + retriever_id=rid, text=text, limit=limit, + ) + docs = result.get("documents", []) + categories = [ + { + "category": d.get("iab_category_name"), + "path": d.get("iab_path", []), + "tier": d.get("iab_tier"), + "score": round(d.get("score", 0), 4), + } + for d in docs + ] + return json.dumps({"categories": categories}, indent=2) + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +@mcp.tool( + name="check_brand_safety", + description=( + "Evaluate page or ad-creative content for brand-safety risk. " + "Classifies content into IAB categories and flags sensitive " + "categories (gambling, adult, etc.). Returns safe/unsafe " + "verdict, risk level (low/medium/high), and flagged categories." + ), +) +async def check_brand_safety( + text: str, + retriever_id: str | None = None, + threshold: float = 0.80, +) -> str: + """Check content for brand-safety risk via Mixpeek.""" + if not text: + return json.dumps({"error": "text must be provided"}) + + client = _get_mixpeek_client() + try: + rid = retriever_id + if not rid: + rid = await _discover_iab_retriever(client) + if not rid: + return json.dumps({ + "error": "No IAB retriever found in this namespace." + }) + + result = await client.check_brand_safety( + retriever_id=rid, text=text, threshold=threshold, + ) + return json.dumps(result, indent=2) + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +@mcp.tool( + name="contextual_search", + description=( + "Search indexed ad inventory using a Mixpeek retriever pipeline. " + "Pipelines can combine multimodal search, brand-safety filtering, " + "IAB taxonomy enrichment, and reranking. Returns matching inventory " + "with relevance scores and enriched metadata." + ), +) +async def contextual_search( + query: str, + retriever_id: str, + limit: int = 10, +) -> str: + """Search inventory via a Mixpeek retriever pipeline.""" + client = _get_mixpeek_client() + try: + result = await client.search_content( + retriever_id=retriever_id, + query=query, + limit=limit, + ) + return json.dumps(result, indent=2) + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + # --------------------------------------------------------------------------- # Mounting # --------------------------------------------------------------------------- diff --git a/src/ad_buyer/tools/research/__init__.py b/src/ad_buyer/tools/research/__init__.py index 5e7c982..2a53d85 100644 --- a/src/ad_buyer/tools/research/__init__.py +++ b/src/ad_buyer/tools/research/__init__.py @@ -4,6 +4,13 @@ """Research tools for inventory discovery.""" from .avails_check import AvailsCheckTool +from .contextual_enrichment import BrandSafetyTool, ClassifyContentTool, ContextualSearchTool from .product_search import ProductSearchTool -__all__ = ["ProductSearchTool", "AvailsCheckTool"] +__all__ = [ + "ProductSearchTool", + "AvailsCheckTool", + "ClassifyContentTool", + "ContextualSearchTool", + "BrandSafetyTool", +] diff --git a/src/ad_buyer/tools/research/contextual_enrichment.py b/src/ad_buyer/tools/research/contextual_enrichment.py new file mode 100644 index 0000000..7e0dc08 --- /dev/null +++ b/src/ad_buyer/tools/research/contextual_enrichment.py @@ -0,0 +1,296 @@ +# Contextual enrichment tools powered by Mixpeek. +# +# Provides IAB taxonomy classification, brand-safety scoring, +# and contextual inventory search that buyer agents can use during +# inventory research and deal evaluation. + +from __future__ import annotations + +import json +from typing import Any + +from crewai.tools import BaseTool +from pydantic import BaseModel, Field + +from ...async_utils import run_async +from ...clients.mixpeek_client import MixpeekClient, MixpeekError +from ...config.settings import Settings + + +def _get_mixpeek_client() -> MixpeekClient: + """Create a MixpeekClient from current settings.""" + settings = Settings() + return MixpeekClient( + api_key=settings.mixpeek_api_key, + base_url=settings.mixpeek_base_url, + namespace=settings.mixpeek_namespace, + ) + + +# ----------------------------------------------------------------------- +# 1. Content Classification (IAB Taxonomy) +# ----------------------------------------------------------------------- + +class ClassifyContentInput(BaseModel): + """Input for contextual content classification.""" + + text: str = Field( + description="Page or ad-creative text to classify into IAB categories", + ) + retriever_id: str | None = Field( + default=None, + description=( + "Mixpeek retriever pipeline ID for IAB classification. " + "If omitted, auto-discovers an IAB retriever in the namespace." + ), + ) + limit: int = Field( + default=10, + description="Max IAB category matches to return", + ge=1, + le=50, + ) + + +class ClassifyContentTool(BaseTool): + """Classify page or creative content into IAB v3.0 categories. + + Uses a Mixpeek retriever pipeline to perform semantic search + against an IAB category reference corpus. Returns ranked + category matches with hierarchical paths (e.g. + Sports > American Football) and confidence scores. + + Buyer agents use these categories for contextual targeting + decisions and brand-safety evaluation. + """ + + name: str = "classify_content" + description: str = ( + "Classify page or ad-creative content into IAB v3.0 taxonomy " + "categories using Mixpeek. Supply text content to classify. " + "Returns ranked IAB category matches with hierarchical paths " + "(e.g. Sports > American Football) and confidence scores for " + "contextual targeting and brand-safety evaluation." + ) + args_schema: type[BaseModel] = ClassifyContentInput + + def _run( + self, + text: str = "", + retriever_id: str | None = None, + limit: int = 10, + ) -> str: + return run_async(self._arun(text=text, retriever_id=retriever_id, limit=limit)) + + async def _arun( + self, + text: str = "", + retriever_id: str | None = None, + limit: int = 10, + ) -> str: + if not text: + return json.dumps({"error": "text must be provided"}) + + client = _get_mixpeek_client() + try: + rid = retriever_id + if not rid: + rid = await _discover_iab_retriever(client) + if not rid: + return json.dumps({ + "error": "No IAB retriever found in this namespace. " + "Set MIXPEEK_NAMESPACE to a namespace with IAB data, " + "or pass retriever_id explicitly." + }) + + result = await client.classify_content( + retriever_id=rid, text=text, limit=limit, + ) + + # Simplify output for the agent + docs = result.get("documents", []) + categories = [ + { + "category": d.get("iab_category_name"), + "path": d.get("iab_path", []), + "tier": d.get("iab_tier"), + "score": round(d.get("score", 0), 4), + } + for d in docs + ] + return json.dumps({"categories": categories}, indent=2) + + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +# ----------------------------------------------------------------------- +# 2. Brand Safety Check +# ----------------------------------------------------------------------- + +class BrandSafetyInput(BaseModel): + """Input for brand-safety evaluation.""" + + text: str = Field( + description="Page or ad-creative text to evaluate for brand safety", + ) + retriever_id: str | None = Field( + default=None, + description=( + "Mixpeek retriever pipeline ID for IAB classification. " + "If omitted, auto-discovers an IAB retriever in the namespace." + ), + ) + threshold: float = Field( + default=0.80, + description="Minimum confidence score to consider a category match", + ge=0.0, + le=1.0, + ) + + +class BrandSafetyTool(BaseTool): + """Evaluate content for brand-safety risk. + + Classifies content via IAB taxonomy and flags matches against + known sensitive categories (gambling, adult content, etc.). + Returns a safety verdict with risk level and flagged categories. + """ + + name: str = "check_brand_safety" + description: str = ( + "Evaluate page or ad-creative content for brand-safety risk. " + "Classifies content into IAB categories and flags sensitive " + "categories (gambling, adult, etc.). Returns safe/unsafe verdict, " + "risk level (low/medium/high), and flagged categories." + ) + args_schema: type[BaseModel] = BrandSafetyInput + + def _run( + self, + text: str = "", + retriever_id: str | None = None, + threshold: float = 0.80, + ) -> str: + return run_async( + self._arun(text=text, retriever_id=retriever_id, threshold=threshold) + ) + + async def _arun( + self, + text: str = "", + retriever_id: str | None = None, + threshold: float = 0.80, + ) -> str: + if not text: + return json.dumps({"error": "text must be provided"}) + + client = _get_mixpeek_client() + try: + rid = retriever_id + if not rid: + rid = await _discover_iab_retriever(client) + if not rid: + return json.dumps({ + "error": "No IAB retriever found in this namespace." + }) + + result = await client.check_brand_safety( + retriever_id=rid, text=text, threshold=threshold, + ) + return json.dumps(result, indent=2) + + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +# ----------------------------------------------------------------------- +# 3. Contextual Search (inventory enrichment) +# ----------------------------------------------------------------------- + +class ContextualSearchInput(BaseModel): + """Input for contextual inventory search.""" + + query: str = Field( + description="Natural-language search query (e.g. 'sports news articles')", + ) + retriever_id: str = Field( + description="Mixpeek retriever pipeline ID to execute", + ) + limit: int = Field( + default=10, + description="Max results to return", + ge=1, + le=100, + ) + + +class ContextualSearchTool(BaseTool): + """Search indexed inventory via a Mixpeek retriever pipeline. + + Retriever pipelines can chain feature search, brand-safety + filtering, taxonomy enrichment, and reranking in a single call. + Use this to find contextually relevant inventory during the + research phase. + """ + + name: str = "contextual_search" + description: str = ( + "Search indexed ad inventory using a Mixpeek retriever pipeline. " + "Pipelines can combine multimodal search, brand-safety filtering, " + "IAB taxonomy enrichment, and reranking. Returns matching inventory " + "with relevance scores and enriched metadata." + ) + args_schema: type[BaseModel] = ContextualSearchInput + + def _run( + self, + query: str = "", + retriever_id: str = "", + limit: int = 10, + ) -> str: + return run_async( + self._arun(query=query, retriever_id=retriever_id, limit=limit) + ) + + async def _arun( + self, + query: str = "", + retriever_id: str = "", + limit: int = 10, + ) -> str: + client = _get_mixpeek_client() + try: + result = await client.search_content( + retriever_id=retriever_id, + query=query, + limit=limit, + ) + return json.dumps(result, indent=2) + except MixpeekError as exc: + return json.dumps({"error": str(exc)}) + finally: + await client.close() + + +# ----------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------- + +async def _discover_iab_retriever(client: MixpeekClient) -> str | None: + """Find an IAB text search retriever in the current namespace.""" + retrievers = await client.list_retrievers() + # Prefer a retriever with "iab" and "text" in the name + for r in retrievers: + name = r.get("retriever_name", "").lower() + if "iab" in name and "text" in name: + return r["retriever_id"] + # Fall back to any retriever with "iab" in the name + for r in retrievers: + if "iab" in r.get("retriever_name", "").lower(): + return r["retriever_id"] + return None diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/e2e/test_mixpeek_production.py b/tests/e2e/test_mixpeek_production.py new file mode 100644 index 0000000..84099bc --- /dev/null +++ b/tests/e2e/test_mixpeek_production.py @@ -0,0 +1,331 @@ +# End-to-end test against the production Mixpeek API. +# +# Exercises the full buyer-agent contextual enrichment flow: +# 1. Health check (public endpoint) +# 2. MCP tools discovery (public endpoint) +# 3. IAB taxonomy classification of real ad content +# 4. Brand-safety scoring of safe and sensitive content +# 5. Contextual inventory search via retriever pipeline +# +# Requires: +# MIXPEEK_API_KEY — a valid Mixpeek API key +# MIXPEEK_NAMESPACE — namespace with IAB data (default: golden_adtech_iab) +# +# Run: +# MIXPEEK_API_KEY=mxp_sk_... pytest tests/e2e/test_mixpeek_production.py -v + +from __future__ import annotations + +import os + +import pytest +import pytest_asyncio + +from ad_buyer.clients.mixpeek_client import MixpeekClient, MixpeekError + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +API_KEY = os.environ.get("MIXPEEK_API_KEY", "") +BASE_URL = os.environ.get("MIXPEEK_BASE_URL", "https://api.mixpeek.com") +NAMESPACE = os.environ.get("MIXPEEK_NAMESPACE", "golden_adtech_iab") + +# Known retriever in golden_adtech_iab namespace +IAB_TEXT_RETRIEVER = os.environ.get( + "MIXPEEK_IAB_RETRIEVER_ID", "ret_f7fbefced358bd" +) + +pytestmark = [ + pytest.mark.e2e, + pytest.mark.skipif(not API_KEY, reason="MIXPEEK_API_KEY not set"), +] + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest_asyncio.fixture +async def client(): + c = MixpeekClient( + api_key=API_KEY, + base_url=BASE_URL, + namespace=NAMESPACE, + ) + yield c + await c.close() + + +# --------------------------------------------------------------------------- +# 1. Health & Discovery +# --------------------------------------------------------------------------- + +class TestHealthAndDiscovery: + @pytest.mark.asyncio + async def test_mcp_health(self, client: MixpeekClient): + """MCP server health check returns healthy status.""" + result = await client.health() + assert result["status"] == "healthy" + assert result["tools_count"] >= 40 # currently 48 + + @pytest.mark.asyncio + async def test_mcp_tools_list(self, client: MixpeekClient): + """Public tools endpoint returns the full tool catalog.""" + tools = await client.get_tools() + assert len(tools) >= 40 + names = {t["name"] for t in tools} + # Spot-check a few expected tools + assert "create_namespace" in names + assert "execute_retriever" in names + + @pytest.mark.asyncio + async def test_list_retrievers(self, client: MixpeekClient): + """Namespace has IAB retriever pipelines.""" + retrievers = await client.list_retrievers() + assert len(retrievers) > 0 + names = {r["retriever_name"] for r in retrievers} + assert any("iab" in n.lower() for n in names), ( + f"No IAB retriever found. Available: {names}" + ) + + +# --------------------------------------------------------------------------- +# 2. IAB Content Classification +# --------------------------------------------------------------------------- + +class TestIABClassification: + @pytest.mark.asyncio + async def test_classify_sports_content(self, client: MixpeekClient): + """NFL content classifies as Sports > American Football.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Breaking: NFL playoff scores and highlights from Sunday " + "night football. Tom Brady analysis and Super Bowl predictions." + ), + ) + docs = result["documents"] + assert len(docs) > 0 + + top = docs[0] + assert top["score"] > 0.80 + assert "Sports" in top["iab_path"] + # Top result should be American Football or Sports + assert top["iab_category_name"] in ( + "American Football", "Sports", "College Football", + ) + + @pytest.mark.asyncio + async def test_classify_automotive_content(self, client: MixpeekClient): + """Luxury car review classifies as Automotive > Luxury Cars.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "The 2026 Mercedes-Benz S-Class review: luxury sedan with " + "cutting-edge autonomous driving features, premium leather " + "interior, and a 496-horsepower twin-turbo V8 engine." + ), + ) + docs = result["documents"] + assert len(docs) > 0 + + top = docs[0] + assert top["score"] > 0.80 + assert "Automotive" in top["iab_path"] + # Top result should be an Automotive subcategory + assert "Automotive" in top["iab_path"] + + @pytest.mark.asyncio + async def test_classify_food_content(self, client: MixpeekClient): + """Cooking recipe classifies as Food & Drink > Cooking.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Easy homemade pasta recipe: combine fresh eggs, semolina " + "flour, and olive oil. Roll the dough thin, cut into " + "fettuccine, and cook in salted boiling water for 3 minutes." + ), + ) + docs = result["documents"] + assert len(docs) > 0 + + top = docs[0] + assert top["score"] > 0.80 + # Should be in Food & Drink or Cooking + paths_flat = [ + cat for d in docs[:3] for cat in d.get("iab_path", []) + ] + assert "Food & Drink" in paths_flat or "Cooking" in paths_flat + + @pytest.mark.asyncio + async def test_classify_technology_content(self, client: MixpeekClient): + """AI/ML content classifies as Technology > Artificial Intelligence.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Artificial intelligence and machine learning are " + "transforming enterprise software. New AI startups raised " + "$10B in Q1 2026 for large language model development." + ), + ) + docs = result["documents"] + assert len(docs) > 0 + + top = docs[0] + assert top["score"] > 0.85 + assert top["iab_category_name"] == "Artificial Intelligence" + assert "Technology & Computing" in top["iab_path"] + + @pytest.mark.asyncio + async def test_classify_returns_hierarchical_paths(self, client: MixpeekClient): + """Results include full IAB hierarchy: tier, path, category name.""" + result = await client.classify_content( + retriever_id=IAB_TEXT_RETRIEVER, + text="Professional basketball NBA playoffs Lakers vs Celtics", + ) + doc = result["documents"][0] + assert "iab_category_name" in doc + assert "iab_path" in doc + assert "iab_tier" in doc + assert isinstance(doc["iab_path"], list) + assert doc["iab_tier"] in (1, 2, 3, 4) + + +# --------------------------------------------------------------------------- +# 3. Brand Safety +# --------------------------------------------------------------------------- + +class TestBrandSafety: + @pytest.mark.asyncio + async def test_safe_content(self, client: MixpeekClient): + """Benign sports content is flagged as safe.""" + result = await client.check_brand_safety( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Local high school basketball team wins state championship " + "in an exciting overtime game at the civic center." + ), + ) + assert result["safe"] is True + assert result["risk_level"] == "low" + assert len(result["flagged_categories"]) == 0 + + @pytest.mark.asyncio + async def test_gambling_content_flagged(self, client: MixpeekClient): + """Gambling content is flagged as brand-unsafe.""" + result = await client.check_brand_safety( + retriever_id=IAB_TEXT_RETRIEVER, + text=( + "Online poker tournament with $1M prize pool. Texas Hold'em " + "strategy guide for casino gambling. Bet on sports with our " + "new odds calculator." + ), + ) + assert result["safe"] is False + assert result["risk_level"] in ("medium", "high") + flagged_names = [c["category"] for c in result["flagged_categories"]] + assert any( + cat in flagged_names + for cat in ( + "Poker and Professional Gambling", + "Casinos & Gambling", + "Casino Games", + ) + ), f"Expected gambling categories, got: {flagged_names}" + + @pytest.mark.asyncio + async def test_brand_safety_threshold(self, client: MixpeekClient): + """Higher threshold filters out lower-confidence matches.""" + result_low = await client.check_brand_safety( + retriever_id=IAB_TEXT_RETRIEVER, + text="Online poker tournament guide", + threshold=0.70, + ) + result_high = await client.check_brand_safety( + retriever_id=IAB_TEXT_RETRIEVER, + text="Online poker tournament guide", + threshold=0.95, + ) + # More categories pass at lower threshold + assert len(result_low["categories"]) >= len(result_high["categories"]) + + +# --------------------------------------------------------------------------- +# 4. Contextual Search +# --------------------------------------------------------------------------- + +class TestContextualSearch: + @pytest.mark.asyncio + async def test_search_returns_results(self, client: MixpeekClient): + """Contextual search returns ranked results with scores.""" + result = await client.search_content( + retriever_id=IAB_TEXT_RETRIEVER, + query="sports and athletics", + limit=5, + ) + assert "documents" in result + docs = result["documents"] + assert len(docs) > 0 + + # Each result has a score + for d in docs: + assert "score" in d + assert d["score"] > 0 + + @pytest.mark.asyncio + async def test_search_results_have_iab_metadata(self, client: MixpeekClient): + """Search results include IAB category metadata.""" + result = await client.search_content( + retriever_id=IAB_TEXT_RETRIEVER, + query="technology computing artificial intelligence", + limit=3, + ) + doc = result["documents"][0] + assert "iab_category_name" in doc + assert "iab_path" in doc + assert "score" in doc + + @pytest.mark.asyncio + async def test_search_returns_iab_enriched_results(self, client: MixpeekClient): + """Search results contain IAB enrichment from the retriever.""" + result = await client.search_content( + retriever_id=IAB_TEXT_RETRIEVER, + query="food cooking recipes", + ) + docs = result["documents"] + assert len(docs) > 0 + # Results from IAB retriever have category metadata + for d in docs[:3]: + assert "iab_category_name" in d + assert "iab_path" in d + + +# --------------------------------------------------------------------------- +# 5. Error Handling +# --------------------------------------------------------------------------- + +class TestErrorHandling: + @pytest.mark.asyncio + async def test_invalid_retriever_id(self, client: MixpeekClient): + """Invalid retriever ID raises MixpeekError.""" + with pytest.raises(MixpeekError) as exc_info: + await client.classify_content( + retriever_id="ret_nonexistent", + text="test content", + ) + assert exc_info.value.status_code in (404, 400, 422) + + @pytest.mark.asyncio + async def test_invalid_api_key(self): + """Invalid API key raises MixpeekError.""" + bad_client = MixpeekClient( + api_key="invalid_key", + namespace=NAMESPACE, + ) + try: + with pytest.raises(MixpeekError) as exc_info: + await bad_client.list_retrievers() + assert exc_info.value.status_code in (401, 403, 404) + finally: + await bad_client.close() diff --git a/tests/unit/test_contextual_enrichment.py b/tests/unit/test_contextual_enrichment.py new file mode 100644 index 0000000..e9008ff --- /dev/null +++ b/tests/unit/test_contextual_enrichment.py @@ -0,0 +1,187 @@ +# Author: Green Mountain Systems AI Inc. +# Donated to IAB Tech Lab + +"""Unit tests for contextual enrichment CrewAI tools.""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, patch + +import pytest + +from ad_buyer.tools.research.contextual_enrichment import ( + BrandSafetyTool, + ClassifyContentTool, + ContextualSearchTool, +) + + +@pytest.fixture +def classify_tool(): + return ClassifyContentTool() + + +@pytest.fixture +def brand_safety_tool(): + return BrandSafetyTool() + + +@pytest.fixture +def search_tool(): + return ContextualSearchTool() + + +class TestClassifyContentTool: + def test_tool_name(self, classify_tool): + assert classify_tool.name == "classify_content" + + @pytest.mark.asyncio + async def test_returns_error_without_input(self, classify_tool): + result = await classify_tool._arun() + data = json.loads(result) + assert "error" in data + + @pytest.mark.asyncio + async def test_classify_with_retriever_id(self, classify_tool): + mock_client = AsyncMock() + mock_client.classify_content.return_value = { + "documents": [ + { + "iab_category_name": "Sports", + "iab_path": ["Sports"], + "iab_tier": 1, + "score": 0.9, + } + ] + } + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await classify_tool._arun( + text="NFL scores", retriever_id="ret-123" + ) + + data = json.loads(result) + assert data["categories"][0]["category"] == "Sports" + assert data["categories"][0]["score"] == 0.9 + mock_client.close.assert_called_once() + + @pytest.mark.asyncio + async def test_classify_auto_discovers_retriever(self, classify_tool): + mock_client = AsyncMock() + mock_client.list_retrievers.return_value = [ + {"retriever_id": "ret-1", "retriever_name": "iab_text_search"} + ] + mock_client.classify_content.return_value = {"documents": []} + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await classify_tool._arun(text="test content") + + mock_client.classify_content.assert_called_once_with( + retriever_id="ret-1", text="test content", limit=10, + ) + + @pytest.mark.asyncio + async def test_classify_no_retriever(self, classify_tool): + mock_client = AsyncMock() + mock_client.list_retrievers.return_value = [] + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await classify_tool._arun(text="test") + + data = json.loads(result) + assert "No IAB retriever found" in data["error"] + + +class TestBrandSafetyTool: + def test_tool_name(self, brand_safety_tool): + assert brand_safety_tool.name == "check_brand_safety" + + @pytest.mark.asyncio + async def test_safe_content(self, brand_safety_tool): + mock_client = AsyncMock() + mock_client.list_retrievers.return_value = [ + {"retriever_id": "ret-1", "retriever_name": "iab_text_search"} + ] + mock_client.check_brand_safety.return_value = { + "safe": True, + "risk_level": "low", + "flagged_categories": [], + "categories": [{"category": "Sports", "score": 0.9}], + } + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await brand_safety_tool._arun(text="basketball game") + + data = json.loads(result) + assert data["safe"] is True + + @pytest.mark.asyncio + async def test_unsafe_content(self, brand_safety_tool): + mock_client = AsyncMock() + mock_client.list_retrievers.return_value = [ + {"retriever_id": "ret-1", "retriever_name": "iab_text_search"} + ] + mock_client.check_brand_safety.return_value = { + "safe": False, + "risk_level": "high", + "flagged_categories": [ + {"category": "Casinos & Gambling", "score": 0.88} + ], + "categories": [], + } + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await brand_safety_tool._arun(text="casino gambling") + + data = json.loads(result) + assert data["safe"] is False + assert data["risk_level"] == "high" + + +class TestContextualSearchTool: + def test_tool_name(self, search_tool): + assert search_tool.name == "contextual_search" + + @pytest.mark.asyncio + async def test_search(self, search_tool): + mock_client = AsyncMock() + mock_client.search_content.return_value = { + "documents": [{"document_id": "d1", "score": 0.85}] + } + mock_client.close = AsyncMock() + + with patch( + "ad_buyer.tools.research.contextual_enrichment._get_mixpeek_client", + return_value=mock_client, + ): + result = await search_tool._arun( + query="sports news", retriever_id="ret-1", limit=5 + ) + + data = json.loads(result) + assert data["documents"][0]["score"] == 0.85 + mock_client.search_content.assert_called_once_with( + retriever_id="ret-1", query="sports news", limit=5, + ) + mock_client.close.assert_called_once() diff --git a/tests/unit/test_mixpeek_client.py b/tests/unit/test_mixpeek_client.py new file mode 100644 index 0000000..5443e45 --- /dev/null +++ b/tests/unit/test_mixpeek_client.py @@ -0,0 +1,242 @@ +# Author: Green Mountain Systems AI Inc. +# Donated to IAB Tech Lab + +"""Unit tests for the Mixpeek contextual enrichment client.""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx +import pytest + +from ad_buyer.clients.mixpeek_client import ( + BRAND_UNSAFE_CATEGORIES, + MixpeekClient, + MixpeekError, +) + + +@pytest.fixture +def client(): + return MixpeekClient( + api_key="test-key", + base_url="https://api.mixpeek.com", + namespace="test-ns", + ) + + +class TestMixpeekClientInit: + def test_defaults(self): + c = MixpeekClient(api_key="k") + assert c.api_key == "k" + assert c.base_url == "https://api.mixpeek.com" + assert c.namespace is None + + def test_custom_base_url_strips_trailing_slash(self): + c = MixpeekClient(api_key="k", base_url="https://example.com/") + assert c.base_url == "https://example.com" + + def test_headers_include_namespace(self, client): + h = client._headers() + assert h["Authorization"] == "Bearer test-key" + assert h["X-Namespace"] == "test-ns" + + def test_headers_without_namespace(self): + c = MixpeekClient(api_key="k") + h = c._headers() + assert "X-Namespace" not in h + + +class TestClassifyContent: + @pytest.mark.asyncio + async def test_classify_with_text(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "documents": [ + { + "iab_category_name": "American Football", + "iab_path": ["Sports", "American Football"], + "iab_tier": 2, + "score": 0.87, + } + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.classify_content( + retriever_id="ret-123", text="NFL football scores" + ) + + assert result["documents"][0]["iab_category_name"] == "American Football" + + @pytest.mark.asyncio + async def test_classify_api_error_raises(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 401 + mock_resp.text = "Unauthorized" + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + with pytest.raises(MixpeekError, match="401"): + await client.classify_content( + retriever_id="ret-123", text="test" + ) + + +class TestBrandSafety: + @pytest.mark.asyncio + async def test_safe_content(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "documents": [ + { + "iab_category_name": "Sports", + "iab_path": ["Sports"], + "iab_tier": 1, + "score": 0.90, + } + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.check_brand_safety( + retriever_id="ret-123", text="local basketball game" + ) + + assert result["safe"] is True + assert result["risk_level"] == "low" + assert len(result["flagged_categories"]) == 0 + + @pytest.mark.asyncio + async def test_unsafe_gambling_content(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "documents": [ + { + "iab_category_name": "Poker and Professional Gambling", + "iab_path": ["Sports", "Poker and Professional Gambling"], + "iab_tier": 2, + "score": 0.88, + }, + { + "iab_category_name": "Casinos & Gambling", + "iab_path": ["Attractions", "Casinos & Gambling"], + "iab_tier": 2, + "score": 0.85, + }, + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.check_brand_safety( + retriever_id="ret-123", text="poker casino betting" + ) + + assert result["safe"] is False + assert result["risk_level"] == "high" + assert len(result["flagged_categories"]) == 2 + + @pytest.mark.asyncio + async def test_threshold_filters_low_scores(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "documents": [ + { + "iab_category_name": "Casinos & Gambling", + "iab_path": ["Attractions", "Casinos & Gambling"], + "iab_tier": 2, + "score": 0.75, # Below threshold + }, + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.check_brand_safety( + retriever_id="ret-123", + text="card games", + threshold=0.80, + ) + + assert result["safe"] is True + assert len(result["categories"]) == 0 # Filtered out + + def test_brand_unsafe_categories_is_frozenset(self): + assert isinstance(BRAND_UNSAFE_CATEGORIES, frozenset) + assert "Casinos & Gambling" in BRAND_UNSAFE_CATEGORIES + + +class TestSearchContent: + @pytest.mark.asyncio + async def test_search_builds_correct_body(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"documents": []} + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp) as mock_req: + await client.search_content( + retriever_id="ret-456", query="sports news", limit=5 + ) + + call_args = mock_req.call_args + body = call_args.kwargs.get("json") or call_args[1].get("json") + assert body["inputs"]["query"] == "sports news" + assert body["page_size"] == 5 + + +class TestListRetrievers: + @pytest.mark.asyncio + async def test_list_returns_results(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "results": [ + {"retriever_id": "r1", "retriever_name": "iab_text_search"} + ] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.list_retrievers() + + assert len(result) == 1 + assert result[0]["retriever_name"] == "iab_text_search" + + +class TestListTaxonomies: + @pytest.mark.asyncio + async def test_list_returns_results(self, client): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "results": [{"taxonomy_id": "t1", "taxonomy_name": "IAB v3.0"}] + } + + with patch.object(client._client, "request", new_callable=AsyncMock, return_value=mock_resp): + result = await client.list_taxonomies() + + assert len(result) == 1 + assert result[0]["taxonomy_name"] == "IAB v3.0" + + +class TestGetTools: + @pytest.mark.asyncio + async def test_get_tools_no_auth(self, client): + mock_resp = MagicMock() + mock_resp.json.return_value = {"tools": [{"name": "tool1"}]} + + with patch.object(client._client, "get", new_callable=AsyncMock, return_value=mock_resp): + tools = await client.get_tools() + + assert len(tools) == 1 + + +class TestClose: + @pytest.mark.asyncio + async def test_close(self, client): + with patch.object(client._client, "aclose", new_callable=AsyncMock) as mock_close: + await client.close() + mock_close.assert_called_once()