From 051f26998f8e87c1bd26b410b61873a6a24794c4 Mon Sep 17 00:00:00 2001 From: Farhan Malik Date: Tue, 10 Mar 2026 17:17:53 -0400 Subject: [PATCH 1/2] =?UTF-8?q?Add=20Autonomous=20Legal=20War=20Game=20?= =?UTF-8?q?=E2=80=94=20adversarial=20multi-agent=20legal=20stress-tester?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a full Red Team / Blue Team adversarial pipeline for M&A contract stress-testing using claude-opus-4-6 with adaptive thinking and Pydantic structured outputs. - warroom/models/schemas.py: Pydantic schemas for AttackVector, PlaintiffAnalysis, DefenseAnalysis, and SegmentReport with computed risk scores - warroom/document/processor.py: PDF/TXT ingestion with section-header-aware segmentation, falling back to word-count chunking - warroom/agents/plaintiff.py: Red Team agent — hunts ambiguity, indemnification gaps, liability exposure, and black-swan edge cases (severity 1-5) - warroom/agents/defense.py: Blue Team agent — precision redrafting to neutralise each attack vector while preserving business intent - warroom/orchestrator.py: Drives the pipeline sequentially or in parallel via a thread pool; rich progress display - warroom/report/generator.py: Terminal summary, JSON, and self-contained HTML report generation - main.py: Typer CLI with --parallel, --html, --max-segments, and --words flags Co-Authored-By: Claude Sonnet 4.6 --- legal_warroom/.env.example | 1 + legal_warroom/main.py | 134 ++++++++ legal_warroom/requirements.txt | 7 + legal_warroom/warroom/__init__.py | 0 legal_warroom/warroom/agents/__init__.py | 0 legal_warroom/warroom/agents/defense.py | 131 ++++++++ legal_warroom/warroom/agents/plaintiff.py | 95 ++++++ legal_warroom/warroom/document/__init__.py | 0 legal_warroom/warroom/document/processor.py | 191 +++++++++++ legal_warroom/warroom/models/__init__.py | 0 legal_warroom/warroom/models/schemas.py | 154 +++++++++ legal_warroom/warroom/orchestrator.py | 226 +++++++++++++ legal_warroom/warroom/report/__init__.py | 0 legal_warroom/warroom/report/generator.py | 347 ++++++++++++++++++++ 14 files changed, 1286 insertions(+) create mode 100644 legal_warroom/.env.example create mode 100644 legal_warroom/main.py create mode 100644 legal_warroom/requirements.txt create mode 100644 legal_warroom/warroom/__init__.py create mode 100644 legal_warroom/warroom/agents/__init__.py create mode 100644 legal_warroom/warroom/agents/defense.py create mode 100644 legal_warroom/warroom/agents/plaintiff.py create mode 100644 legal_warroom/warroom/document/__init__.py create mode 100644 legal_warroom/warroom/document/processor.py create mode 100644 legal_warroom/warroom/models/__init__.py create mode 100644 legal_warroom/warroom/models/schemas.py create mode 100644 legal_warroom/warroom/orchestrator.py create mode 100644 legal_warroom/warroom/report/__init__.py create mode 100644 legal_warroom/warroom/report/generator.py diff --git a/legal_warroom/.env.example b/legal_warroom/.env.example new file mode 100644 index 00000000..53c10307 --- /dev/null +++ b/legal_warroom/.env.example @@ -0,0 +1 @@ +ANTHROPIC_API_KEY=sk-ant-... diff --git a/legal_warroom/main.py b/legal_warroom/main.py new file mode 100644 index 00000000..edc56269 --- /dev/null +++ b/legal_warroom/main.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +""" +Autonomous Legal War Game — CLI Entry Point + +Usage examples: + + # Run on a PDF merger agreement, output to ./output/ + python main.py agreement.pdf + + # Text file, custom segment size, parallel mode, HTML report + python main.py nda.txt --words 600 --parallel --html + + # Run only first 3 segments (useful for testing) + python main.py big_agreement.pdf --max-segments 3 + + # Full help + python main.py --help +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from typing import Optional + +import typer +from dotenv import load_dotenv +from rich.console import Console + +from warroom import orchestrator +from warroom.report import generator + +load_dotenv() +app = typer.Typer(add_completion=False, rich_markup_mode="rich") +console = Console() + + +@app.command() +def main( + document: str = typer.Argument( + ..., + help="Path to the legal document (.pdf or .txt)", + ), + words: int = typer.Option( + 800, + "--words", + "-w", + help="Soft word-count cap per segment (default 800 ≈ ~1½ contract pages).", + ), + parallel: bool = typer.Option( + False, + "--parallel", + "-p", + help="Process segments concurrently (faster, higher API concurrency).", + ), + workers: int = typer.Option( + 3, + "--workers", + help="Max parallel threads when --parallel is set.", + ), + output_dir: str = typer.Option( + "output", + "--output", + "-o", + help="Directory for JSON (and optional HTML) reports.", + ), + html: bool = typer.Option( + False, + "--html", + help="Also generate a self-contained HTML report.", + ), + max_segments: Optional[int] = typer.Option( + None, + "--max-segments", + help="Limit to the first N segments (useful for dry-runs).", + ), +) -> None: + """ + [bold cyan]Autonomous Legal War Game[/bold cyan] — M&A stress-testing simulation. + + Pits a [bold red]Plaintiff Agent (Red Team)[/bold red] against a + [bold green]Defense Agent (Blue Team)[/bold green] on every clause of + your document, then outputs a structured vulnerability and remediation report. + """ + doc_path = Path(document) + if not doc_path.exists(): + console.print(f"[bold red]Error:[/bold red] File not found: {doc_path}") + raise typer.Exit(code=1) + + # Run the simulation + reports = orchestrator.run_simulation( + document_path=str(doc_path), + words_per_segment=words, + parallel=parallel, + max_workers=workers, + ) + + # Optionally truncate (for dry-runs) + if max_segments is not None: + reports = reports[:max_segments] + + if not reports: + console.print("[yellow]No segments produced. Check your document.[/yellow]") + raise typer.Exit(code=1) + + # Print terminal report + generator.print_terminal_summary(reports) + + # Save JSON + out_dir = Path(output_dir) + stem = doc_path.stem + json_path = generator.save_json(reports, out_dir / f"{stem}_warroom_report.json") + + # Save HTML (optional) + if html: + generator.save_html(reports, out_dir / f"{stem}_warroom_report.html") + + # Exit with non-zero code if any segment is CRITICAL + has_critical = any(r.status == "CRITICAL" for r in reports) + if has_critical: + console.print( + "\n[bold red]⚠ CRITICAL vulnerabilities detected.[/bold red] " + "Review the report before proceeding." + ) + raise typer.Exit(code=2) + + console.print( + "\n[bold green]✓ Simulation complete.[/bold green] " + f"All outputs written to [cyan]{out_dir}/[/cyan]" + ) + + +if __name__ == "__main__": + app() diff --git a/legal_warroom/requirements.txt b/legal_warroom/requirements.txt new file mode 100644 index 00000000..8cfbe76d --- /dev/null +++ b/legal_warroom/requirements.txt @@ -0,0 +1,7 @@ +anthropic>=0.40.0 +pydantic>=2.0.0 +pdfplumber>=0.10.0 +pypdf>=4.0.0 +rich>=13.0.0 +typer>=0.12.0 +python-dotenv>=1.0.0 diff --git a/legal_warroom/warroom/__init__.py b/legal_warroom/warroom/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/agents/__init__.py b/legal_warroom/warroom/agents/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/agents/defense.py b/legal_warroom/warroom/agents/defense.py new file mode 100644 index 00000000..9a709f0e --- /dev/null +++ b/legal_warroom/warroom/agents/defense.py @@ -0,0 +1,131 @@ +""" +Defense Agent — Blue Team + +Receives the original clause text plus the Plaintiff Agent's attack +report, then returns a DefenseAnalysis containing: + - A fully hardened rewrite of the clause. + - Per-attack-vector remediation detail. + - Residual risk and confidence assessment. + +Uses claude-opus-4-6 with adaptive thinking and structured outputs. +""" + +from __future__ import annotations + +import json +import anthropic +from ..models.schemas import PlaintiffAnalysis, DefenseAnalysis + +# --------------------------------------------------------------------------- +# System prompt (your exact prompt, hardened for structured output) +# --------------------------------------------------------------------------- + +DEFENSE_SYSTEM = """\ +You are the Defense Counsel Agent (Blue Team) and lead drafter for the \ +acquiring party in a high-stakes Mergers & Acquisitions transaction. + +JURISDICTION: Standard US corporate law, contract law precedents, and Delaware \ +Court of Chancery standards. + +OBJECTIVE: Fortify the contract against every vulnerability identified by the \ +Plaintiff Agent. Rewrite, patch, and secure the language to neutralise all \ +attack vectors while preserving the original business intent of the deal. + +EXECUTION DIRECTIVES: +1. PRECISION REDRAFTING — Rewrite exploited clauses with absolute semantic \ + precision. Every defined term must be exact and internally consistent. \ + Close all loopholes identified by the Plaintiff Agent. +2. RISK MITIGATION — Inject necessary legal shields: + • Exact numeric definitions (no vague qualifiers like "material" or \ + "reasonable" without explicit anchors). + • Explicit liability caps with carve-outs stated positively. + • Severability and savings clauses where appropriate. + • Clear, unambiguous governing law and exclusive jurisdiction provisions. + • Representations qualified by knowledge only where commercially necessary, \ + with defined Knowledge Persons. + • No "and/or" constructions. Use "and" or "or" explicitly. +3. INTENT PRESERVATION — Do NOT alter the underlying financial or operational \ + agreement between the parties. Only alter the legal execution of that \ + agreement. If a business term cannot be hardened without changing its \ + substance, identify it in residual_risk. +4. DRAFTING STANDARDS — Use formal contract English. Avoid passive voice \ + where active voice is clearer. Define all new terms introduced. \ + Number sub-clauses sequentially. + +OUTPUT: Respond in the exact JSON structure specified. Include one remedy \ +entry for each attack vector you address. If a vector cannot be addressed \ +without altering business terms, note it in residual_risk. +""" + + +# --------------------------------------------------------------------------- +# Agent call +# --------------------------------------------------------------------------- + +def run( + client: anthropic.Anthropic, + clause_text: str, + plaintiff_analysis: PlaintiffAnalysis, + segment_id: str, +) -> DefenseAnalysis: + """ + Send the clause and the Plaintiff's attack report to the Defense Agent. + + Args: + client: Initialised Anthropic client. + clause_text: The original, un-hardened contract text. + plaintiff_analysis: Validated output from the Plaintiff Agent. + segment_id: Identifier used for logging/reporting. + + Returns: + DefenseAnalysis — schema-validated Pydantic model. + """ + # Serialise the plaintiff report so the Defense Agent can read it cleanly + attack_summary = _format_attack_vectors(plaintiff_analysis) + + user_message = ( + f"[DOCUMENT SEGMENT: {segment_id}]\n\n" + "═══ ORIGINAL CLAUSE (to be hardened) ═══\n" + f"{clause_text}\n\n" + "═══ PLAINTIFF AGENT ATTACK REPORT ═══\n" + f"{attack_summary}\n\n" + "═══ TASK ═══\n" + "Analyse the attack vectors above and produce your defense report with " + "fully hardened clause language." + ) + + response = client.messages.parse( + model="claude-opus-4-6", + max_tokens=12288, # Defense rewrites can be lengthy + thinking={"type": "adaptive"}, + system=DEFENSE_SYSTEM, + messages=[{"role": "user", "content": user_message}], + output_format=DefenseAnalysis, + ) + + return response.parsed_output + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _format_attack_vectors(analysis: PlaintiffAnalysis) -> str: + """Render the PlaintiffAnalysis as readable text for the Defense Agent.""" + lines = [ + f"Executive Summary: {analysis.executive_summary}", + f"Highest Severity: {analysis.highest_severity}/5", + "", + "Attack Vectors (highest severity first):", + ] + for i, v in enumerate(analysis.attack_vectors, 1): + lines += [ + f"\n[{i}] {v.title}", + f" Severity: {v.severity}/5 ({v.vulnerability_type})", + f" Clause Ref: {v.clause_reference}", + f" Description: {v.description}", + f" Legal Theory:{v.legal_theory}", + f" Scenario: {v.exploitation_scenario}", + f" Exposure: {v.estimated_exposure}", + ] + return "\n".join(lines) diff --git a/legal_warroom/warroom/agents/plaintiff.py b/legal_warroom/warroom/agents/plaintiff.py new file mode 100644 index 00000000..51f98a0e --- /dev/null +++ b/legal_warroom/warroom/agents/plaintiff.py @@ -0,0 +1,95 @@ +""" +Plaintiff Agent — Red Team + +Receives a document segment and returns a PlaintiffAnalysis with +prioritised attack vectors graded 1-5. + +Uses claude-opus-4-6 with adaptive thinking and structured outputs +so the Orchestrator receives machine-readable, schema-validated data. +""" + +from __future__ import annotations + +import anthropic +from ..models.schemas import PlaintiffAnalysis + +# --------------------------------------------------------------------------- +# System prompt (your exact prompt, hardened for structured output) +# --------------------------------------------------------------------------- + +PLAINTIFF_SYSTEM = """\ +You are the Plaintiff Counsel Agent (Red Team) in the Autonomous Legal War Game. +You represent a hostile, highly litigious entity — a predatory acquirer, a \ +disgruntled shareholder, or a regulator with unlimited resources — seeking to \ +exploit, break, or extract punitive damages from the provided contract language. + +JURISDICTION: Standard US corporate law, contract law precedents, and Delaware \ +Court of Chancery standards. + +OBJECTIVE: Perform a ruthless, exhaustive analysis of the provided clause or \ +section. + +EXECUTION DIRECTIVES: +1. HUNT FOR AMBIGUITY — Identify poorly defined terms, vague timelines, \ + contradictory obligations, and undefined conditions precedent. +2. EXPLOIT INDEMNIFICATION & LIABILITY — Find scenarios where the drafting \ + party is exposed to uncapped financial risk, breach of warranty, or \ + third-party liabilities. +3. STRESS-TEST EDGE CASES — Formulate highly improbable but legally plausible \ + "black swan" scenarios the current language fails to protect against. \ + Think regulatory intervention, force majeure, insolvency events, \ + jurisdictional conflicts, and successor liability. +4. ATTACK DEFINITIONS — Challenge every defined term. If it is absent, \ + over-broad, or inconsistent with usage elsewhere, flag it. + +OUTPUT: You MUST respond in the exact JSON structure specified. Do not add \ +prose outside the JSON. Do not fabricate specific case citations or docket \ +numbers. Reference legal doctrines and principles only. + +SEVERITY SCALE: + 1 = Minor ambiguity, negligible consequence + 2 = Moderate risk, localised financial exposure + 3 = Significant exposure, likely litigation target + 4 = Severe vulnerability, deal-threatening if exploited + 5 = Catastrophic structural failure — renders clause unenforceable or \ + exposes party to unlimited liability +""" + + +# --------------------------------------------------------------------------- +# Agent call +# --------------------------------------------------------------------------- + +def run( + client: anthropic.Anthropic, + clause_text: str, + segment_id: str, +) -> PlaintiffAnalysis: + """ + Send the clause to the Plaintiff Agent and return a validated + PlaintiffAnalysis. + + Args: + client: Initialised Anthropic client. + clause_text: The raw contract text to attack. + segment_id: Identifier used for logging/reporting. + + Returns: + PlaintiffAnalysis — schema-validated Pydantic model. + """ + user_message = ( + f"[DOCUMENT SEGMENT: {segment_id}]\n\n" + f"{clause_text}\n\n" + "Analyse the above clause and produce your attack report." + ) + + response = client.messages.parse( + model="claude-opus-4-6", + max_tokens=8192, + thinking={"type": "adaptive"}, + system=PLAINTIFF_SYSTEM, + messages=[{"role": "user", "content": user_message}], + output_format=PlaintiffAnalysis, + ) + + return response.parsed_output diff --git a/legal_warroom/warroom/document/__init__.py b/legal_warroom/warroom/document/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/document/processor.py b/legal_warroom/warroom/document/processor.py new file mode 100644 index 00000000..5c407ccf --- /dev/null +++ b/legal_warroom/warroom/document/processor.py @@ -0,0 +1,191 @@ +""" +Document ingestion and segmentation. + +Supports plain text (.txt) and PDF (.pdf) inputs. +Splits the document into semantically meaningful segments for the +adversarial pipeline. Each segment is sized to fit comfortably within +the model's context window while still representing a coherent legal +unit (clause, section, or a fixed-word-count chunk). +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from pathlib import Path +from typing import List + + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + +@dataclass +class DocumentSegment: + segment_id: str # e.g. "seg_001" + page_hint: str # e.g. "Pages 12-15" or "Chunk 3" + text: str # Raw clause / section text + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def load_and_segment( + file_path: str | Path, + words_per_segment: int = 800, +) -> List[DocumentSegment]: + """ + Load a legal document and return a list of DocumentSegments. + + Strategy: + 1. Try to split on legal section headers first + (e.g. "Section 4.", "ARTICLE V", "4.3 Representations"). + 2. Fall back to fixed-word-count chunks if no headers are detected. + + Args: + file_path: Path to a .pdf or .txt file. + words_per_segment: Soft maximum words per segment (default 800). + The model will receive this plus the agent system + prompts, so keep this well below 5 000 words. + """ + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"Document not found: {path}") + + if path.suffix.lower() == ".pdf": + raw_text = _extract_pdf(path) + elif path.suffix.lower() in (".txt", ".md"): + raw_text = path.read_text(encoding="utf-8") + else: + raise ValueError(f"Unsupported file type: {path.suffix}. Use .pdf or .txt") + + # Clean up whitespace artefacts from PDF extraction + raw_text = _clean_text(raw_text) + + # Attempt section-aware splitting + sections = _split_by_section_headers(raw_text) + if len(sections) >= 3: + segments = _merge_short_sections(sections, words_per_segment) + else: + # No clear headers — fall back to word-count chunks + segments = _chunk_by_words(raw_text, words_per_segment) + + return [ + DocumentSegment( + segment_id=f"seg_{i + 1:03d}", + page_hint=f"Segment {i + 1} of {len(segments)}", + text=seg.strip(), + ) + for i, seg in enumerate(segments) + if seg.strip() + ] + + +def load_raw_text(file_path: str | Path) -> str: + """Return the full, cleaned document text (no segmentation).""" + path = Path(file_path) + if path.suffix.lower() == ".pdf": + return _clean_text(_extract_pdf(path)) + return _clean_text(path.read_text(encoding="utf-8")) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _extract_pdf(path: Path) -> str: + """Extract text from a PDF using pdfplumber (preferred) or pypdf.""" + try: + import pdfplumber + + pages: List[str] = [] + with pdfplumber.open(path) as pdf: + for page in pdf.pages: + text = page.extract_text() or "" + pages.append(text) + return "\n\n".join(pages) + except ImportError: + pass + + try: + from pypdf import PdfReader + + reader = PdfReader(str(path)) + pages = [page.extract_text() or "" for page in reader.pages] + return "\n\n".join(pages) + except ImportError: + raise ImportError( + "No PDF library found. Install pdfplumber or pypdf:\n" + " pip install pdfplumber" + ) + + +# Matches common legal section headers: +# "Section 4.", "4.3", "ARTICLE V", "ARTICLE 5", "4.", "(a)", etc. +_SECTION_HEADER_RE = re.compile( + r"(?m)^(?:" + r"(?:Section|SECTION|Article|ARTICLE)\s+[\dA-Z]+[\.\s]" # Section 4. / ARTICLE V + r"|(?:\d+\.){1,3}\s" # 4.3 or 4.3.1 + r"|\d+\.\s+[A-Z]" # 4. Representations + r")" +) + + +def _split_by_section_headers(text: str) -> List[str]: + """Split text at detected legal section header boundaries.""" + boundaries = [m.start() for m in _SECTION_HEADER_RE.finditer(text)] + if not boundaries: + return [] + + chunks: List[str] = [] + for i, start in enumerate(boundaries): + end = boundaries[i + 1] if i + 1 < len(boundaries) else len(text) + chunks.append(text[start:end]) + return chunks + + +def _merge_short_sections(sections: List[str], max_words: int) -> List[str]: + """ + Merge consecutive short sections so every returned chunk is roughly + max_words in size. This avoids sending dozens of 20-word blurbs. + """ + merged: List[str] = [] + buffer = "" + for section in sections: + candidate = (buffer + "\n\n" + section).strip() + if len(candidate.split()) <= max_words: + buffer = candidate + else: + if buffer: + merged.append(buffer) + buffer = section.strip() + if buffer: + merged.append(buffer) + return merged + + +def _chunk_by_words(text: str, max_words: int) -> List[str]: + """Naive fixed-size word-count chunking with a 10% overlap.""" + words = text.split() + overlap = max(1, max_words // 10) + chunks: List[str] = [] + start = 0 + while start < len(words): + end = min(start + max_words, len(words)) + chunks.append(" ".join(words[start:end])) + if end == len(words): + break + start = end - overlap + return chunks + + +def _clean_text(text: str) -> str: + """Normalise whitespace and remove common PDF extraction artefacts.""" + # Collapse runs of spaces/tabs to a single space + text = re.sub(r"[ \t]{2,}", " ", text) + # Collapse 3+ consecutive blank lines to 2 + text = re.sub(r"\n{3,}", "\n\n", text) + # Remove form-feed characters + text = text.replace("\f", "\n") + return text.strip() diff --git a/legal_warroom/warroom/models/__init__.py b/legal_warroom/warroom/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/models/schemas.py b/legal_warroom/warroom/models/schemas.py new file mode 100644 index 00000000..49819a65 --- /dev/null +++ b/legal_warroom/warroom/models/schemas.py @@ -0,0 +1,154 @@ +""" +Pydantic schemas for structured outputs from each agent in the +Autonomous Legal War Game pipeline. +""" + +from __future__ import annotations +from pydantic import BaseModel, Field +from typing import List + + +# --------------------------------------------------------------------------- +# Plaintiff Agent (Red Team) output +# --------------------------------------------------------------------------- + +class AttackVector(BaseModel): + clause_reference: str = Field( + description="The specific clause, sub-clause, or defined term being attacked " + "(e.g., 'Section 7.3(b)', 'Definition of Material Adverse Effect')." + ) + vulnerability_type: str = Field( + description=( + "Category of vulnerability. One of: AMBIGUITY | INDEMNIFICATION_GAP | " + "LIABILITY_EXPOSURE | DEFINITION_FAILURE | EDGE_CASE | " + "JURISDICTIONAL_CONFLICT | WAIVER_TRAP | REPRESENTATION_BREACH" + ) + ) + severity: int = Field( + description=( + "Integer 1-5. " + "1=Minor ambiguity with negligible consequence. " + "2=Moderate risk, localized financial exposure. " + "3=Significant exposure, likely litigation target. " + "4=Severe vulnerability, deal-threatening if exploited. " + "5=Catastrophic structural failure, renders clause unenforceable." + ) + ) + title: str = Field(description="Short, descriptive title for this attack vector.") + description: str = Field( + description="Precise legal description of why this language is vulnerable." + ) + legal_theory: str = Field( + description=( + "The legal doctrine, case law principle, or statutory basis enabling " + "this attack (e.g., contra proferentem, implied duty of good faith, " + "Delaware chancery standards on MAE clauses). " + "Do not fabricate specific case citations." + ) + ) + exploitation_scenario: str = Field( + description=( + "A concrete scenario — including black-swan edge cases — demonstrating " + "how a hostile party would exploit this vulnerability in litigation." + ) + ) + estimated_exposure: str = Field( + description=( + "Estimated financial exposure or legal consequence if this vector is " + "successfully exploited (e.g., 'uncapped indemnification liability', " + "'rescission of the entire transaction', '$X–$Y range')." + ) + ) + + +class PlaintiffAnalysis(BaseModel): + attack_vectors: List[AttackVector] = Field( + description="All identified attack vectors, ordered highest-severity first." + ) + highest_severity: int = Field( + description="The highest severity integer (1-5) among all attack vectors." + ) + executive_summary: str = Field( + description=( + "A 2-4 sentence executive summary of the clause's overall vulnerability " + "profile from the plaintiff's perspective." + ) + ) + + +# --------------------------------------------------------------------------- +# Defense Agent (Blue Team) output +# --------------------------------------------------------------------------- + +class DefenseRemedy(BaseModel): + attack_vector_title: str = Field( + description="Exact title of the attack vector being neutralized." + ) + hardened_language: str = Field( + description=( + "The rewritten clause language that closes this specific vulnerability. " + "Must be precise, legally sound, and written in formal contract English." + ) + ) + rationale: str = Field( + description=( + "Explanation of exactly how the rewritten language neutralizes the " + "plaintiff's attack, referencing the specific legal theory." + ) + ) + + +class DefenseAnalysis(BaseModel): + fully_hardened_clause: str = Field( + description=( + "The complete, integrated rewritten clause incorporating all remediations. " + "Must preserve the original business intent of the agreement." + ) + ) + remedies: List[DefenseRemedy] = Field( + description="Per-attack-vector remediation details, one entry per attack vector addressed." + ) + residual_risk: str = Field( + description=( + "Any remaining risk that cannot be fully mitigated without fundamentally " + "altering the business terms of the deal. If none, state 'None identified.'" + ) + ) + confidence_level: str = Field( + description=( + "Defense counsel's confidence in the hardened clause. " + "One of: HIGH | MEDIUM | LOW" + ) + ) + + +# --------------------------------------------------------------------------- +# Final segment report (output of the full pipeline per document segment) +# --------------------------------------------------------------------------- + +class SegmentReport(BaseModel): + segment_id: str + original_text: str + plaintiff_analysis: PlaintiffAnalysis + defense_analysis: DefenseAnalysis + + @property + def net_risk_score(self) -> int: + """ + Simple composite score: average severity * 20, capped at 100. + Higher = more dangerous original clause. + """ + vectors = self.plaintiff_analysis.attack_vectors + if not vectors: + return 0 + avg = sum(v.severity for v in vectors) / len(vectors) + return min(100, round(avg * 20)) + + @property + def status(self) -> str: + score = self.net_risk_score + if score >= 80: + return "CRITICAL" + if score >= 50: + return "REQUIRES_REVIEW" + return "HARDENED" diff --git a/legal_warroom/warroom/orchestrator.py b/legal_warroom/warroom/orchestrator.py new file mode 100644 index 00000000..3f8d683e --- /dev/null +++ b/legal_warroom/warroom/orchestrator.py @@ -0,0 +1,226 @@ +""" +Orchestrator — The Autonomous Legal War Game + +Drives the full adversarial pipeline: + 1. Ingest document segments. + 2. Route each segment to the Plaintiff Agent (Red Team) for attack. + 3. Route the original text + attack report to the Defense Agent (Blue Team). + 4. Collect SegmentReports for the final output. + +Supports: + - Sequential processing (safe, predictable, lower concurrency cost). + - Parallel processing (faster for large documents; uses concurrent API calls). +""" + +from __future__ import annotations + +import asyncio +import concurrent.futures +from typing import Callable, List, Optional + +import anthropic +from rich.console import Console +from rich.progress import ( + BarColumn, + MofNCompleteColumn, + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, +) +from rich.table import Table + +from .agents import plaintiff, defense +from .document.processor import DocumentSegment, load_and_segment +from .models.schemas import SegmentReport + +console = Console() + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + +def run_simulation( + document_path: str, + words_per_segment: int = 800, + parallel: bool = False, + max_workers: int = 3, + on_segment_complete: Optional[Callable[[SegmentReport], None]] = None, +) -> List[SegmentReport]: + """ + Run the full Legal War Game simulation on a document. + + Args: + document_path: Path to a .pdf or .txt file. + words_per_segment: Soft word-count cap per segment (default 800). + parallel: If True, process segments concurrently. + max_workers: Max parallel threads when parallel=True. + on_segment_complete: Optional callback invoked after each segment. + + Returns: + List of SegmentReport, one per document segment. + """ + client = anthropic.Anthropic() + + console.rule("[bold cyan]AUTONOMOUS LEGAL WAR GAME — SIMULATION ALPHA[/bold cyan]") + console.print(f"\n[bold]Document:[/bold] {document_path}") + + # ── 1. Ingest ────────────────────────────────────────────────────────── + with console.status("[yellow]Ingesting and segmenting document…"): + segments = load_and_segment(document_path, words_per_segment) + + console.print( + f"[green]✓[/green] Segmented into [bold]{len(segments)}[/bold] clause blocks " + f"(~{words_per_segment} words each)\n" + ) + + _print_segment_table(segments) + + # ── 2. Run adversarial pipeline ──────────────────────────────────────── + reports: List[SegmentReport] = [] + + if parallel and len(segments) > 1: + reports = _run_parallel(client, segments, max_workers, on_segment_complete) + else: + reports = _run_sequential(client, segments, on_segment_complete) + + return reports + + +# --------------------------------------------------------------------------- +# Sequential execution +# --------------------------------------------------------------------------- + +def _run_sequential( + client: anthropic.Anthropic, + segments: List[DocumentSegment], + on_complete: Optional[Callable[[SegmentReport], None]], +) -> List[SegmentReport]: + reports: List[SegmentReport] = [] + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + MofNCompleteColumn(), + TimeElapsedColumn(), + console=console, + ) as progress: + task = progress.add_task("Processing segments…", total=len(segments)) + + for seg in segments: + progress.update(task, description=f"[cyan]{seg.segment_id}[/cyan] — Red Team attacking…") + report = _process_segment(client, seg) + reports.append(report) + if on_complete: + on_complete(report) + progress.advance(task) + + return reports + + +# --------------------------------------------------------------------------- +# Parallel execution +# --------------------------------------------------------------------------- + +def _run_parallel( + client: anthropic.Anthropic, + segments: List[DocumentSegment], + max_workers: int, + on_complete: Optional[Callable[[SegmentReport], None]], +) -> List[SegmentReport]: + """ + Process segments in parallel using a thread pool. + The Anthropic SDK is thread-safe; each call creates its own HTTP session. + """ + console.print( + f"[bold yellow]Parallel mode:[/bold yellow] up to {max_workers} concurrent API calls.\n" + ) + + results: dict[str, SegmentReport] = {} + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + MofNCompleteColumn(), + TimeElapsedColumn(), + console=console, + ) as progress: + task = progress.add_task("Processing segments (parallel)…", total=len(segments)) + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_seg = { + executor.submit(_process_segment, client, seg): seg + for seg in segments + } + for future in concurrent.futures.as_completed(future_to_seg): + seg = future_to_seg[future] + try: + report = future.result() + results[seg.segment_id] = report + if on_complete: + on_complete(report) + except Exception as exc: + console.print( + f"[red]ERROR[/red] {seg.segment_id}: {exc}" + ) + finally: + progress.advance(task) + + # Return in original document order + ordered = [results[seg.segment_id] for seg in segments if seg.segment_id in results] + return ordered + + +# --------------------------------------------------------------------------- +# Single-segment pipeline +# --------------------------------------------------------------------------- + +def _process_segment( + client: anthropic.Anthropic, + seg: DocumentSegment, +) -> SegmentReport: + """Run the full Red → Blue pipeline for a single document segment.""" + + # ── Red Team attack ────────────────────────────────────────────────── + plaintiff_analysis = plaintiff.run( + client=client, + clause_text=seg.text, + segment_id=seg.segment_id, + ) + + # ── Blue Team defence ──────────────────────────────────────────────── + defense_analysis = defense.run( + client=client, + clause_text=seg.text, + plaintiff_analysis=plaintiff_analysis, + segment_id=seg.segment_id, + ) + + return SegmentReport( + segment_id=seg.segment_id, + original_text=seg.text, + plaintiff_analysis=plaintiff_analysis, + defense_analysis=defense_analysis, + ) + + +# --------------------------------------------------------------------------- +# Display helpers +# --------------------------------------------------------------------------- + +def _print_segment_table(segments: List[DocumentSegment]) -> None: + table = Table(title="Document Segments", show_lines=True) + table.add_column("ID", style="cyan", no_wrap=True) + table.add_column("Words", justify="right") + table.add_column("Preview", max_width=80) + + for seg in segments: + word_count = len(seg.text.split()) + preview = seg.text[:120].replace("\n", " ") + ("…" if len(seg.text) > 120 else "") + table.add_row(seg.segment_id, str(word_count), preview) + + console.print(table) + console.print() diff --git a/legal_warroom/warroom/report/__init__.py b/legal_warroom/warroom/report/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/report/generator.py b/legal_warroom/warroom/report/generator.py new file mode 100644 index 00000000..878dacb3 --- /dev/null +++ b/legal_warroom/warroom/report/generator.py @@ -0,0 +1,347 @@ +""" +Report Generator + +Transforms a list of SegmentReports into: + 1. A rich terminal summary (always shown). + 2. A JSON file (machine-readable, always saved). + 3. An HTML file (human-readable, optional). + +The JSON structure is designed to be ingested by downstream systems +(dashboards, further LLM analysis, audit trails). +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import List + +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +from ..models.schemas import AttackVector, SegmentReport + +console = Console() + +# ── Severity colours ──────────────────────────────────────────────────────── +_SEVERITY_STYLE = { + 1: "green", + 2: "yellow", + 3: "orange3", + 4: "red", + 5: "bold red", +} + +_STATUS_STYLE = { + "HARDENED": "bold green", + "REQUIRES_REVIEW": "bold yellow", + "CRITICAL": "bold red", +} + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def print_terminal_summary(reports: List[SegmentReport]) -> None: + """Print a rich summary to the terminal after the simulation completes.""" + console.rule("\n[bold cyan]WAR GAME RESULTS — FINAL REPORT[/bold cyan]") + _print_overview_table(reports) + for report in reports: + _print_segment_detail(report) + _print_global_stats(reports) + + +def save_json(reports: List[SegmentReport], output_path: str | Path) -> Path: + """Serialise all reports to a JSON file. Returns the path written.""" + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + + payload = { + "simulation": "Autonomous Legal War Game — Simulation Alpha", + "generated_at": datetime.now(timezone.utc).isoformat(), + "segment_count": len(reports), + "segments": [_segment_to_dict(r) for r in reports], + "summary": _global_summary(reports), + } + + path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + console.print(f"\n[green]✓ JSON report saved:[/green] {path}") + return path + + +def save_html(reports: List[SegmentReport], output_path: str | Path) -> Path: + """Generate a self-contained HTML report. Returns the path written.""" + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(_render_html(reports), encoding="utf-8") + console.print(f"[green]✓ HTML report saved:[/green] {path}") + return path + + +# --------------------------------------------------------------------------- +# Terminal rendering +# --------------------------------------------------------------------------- + +def _print_overview_table(reports: List[SegmentReport]) -> None: + table = Table(title="Segment Overview", show_lines=True, expand=True) + table.add_column("Segment", style="cyan", no_wrap=True) + table.add_column("Vectors", justify="right") + table.add_column("Max Severity", justify="center") + table.add_column("Risk Score", justify="right") + table.add_column("Status", justify="center") + table.add_column("Defense Confidence", justify="center") + + for r in reports: + sev = r.plaintiff_analysis.highest_severity + style = _SEVERITY_STYLE.get(sev, "white") + status_style = _STATUS_STYLE.get(r.status, "white") + + table.add_row( + r.segment_id, + str(len(r.plaintiff_analysis.attack_vectors)), + Text(f"{sev}/5", style=style), + str(r.net_risk_score), + Text(r.status, style=status_style), + r.defense_analysis.confidence_level, + ) + + console.print(table) + + +def _print_segment_detail(report: SegmentReport) -> None: + """Print the full Red/Blue breakdown for one segment.""" + sev = report.plaintiff_analysis.highest_severity + panel_style = _SEVERITY_STYLE.get(sev, "white") + + console.print( + Panel( + f"[bold]Segment:[/bold] {report.segment_id} " + f"[bold]Risk Score:[/bold] {report.net_risk_score}/100 " + f"[bold]Status:[/bold] [{_STATUS_STYLE.get(report.status, 'white')}]{report.status}[/]", + title=f"[bold cyan]─── {report.segment_id} ───[/bold cyan]", + border_style=panel_style, + ) + ) + + # Original clause excerpt + excerpt = report.original_text[:400].replace("\n", " ") + if len(report.original_text) > 400: + excerpt += "…" + console.print(f"[dim]Original:[/dim] {excerpt}\n") + + # Attack vectors + console.print("[bold red]🔴 RED TEAM — ATTACK VECTORS[/bold red]") + console.print(f"[dim]{report.plaintiff_analysis.executive_summary}[/dim]\n") + for v in report.plaintiff_analysis.attack_vectors: + _print_attack_vector(v) + + # Defense + console.print("\n[bold green]🔵 BLUE TEAM — HARDENED CLAUSE[/bold green]") + console.print(report.defense_analysis.fully_hardened_clause) + console.print( + f"\n[dim]Residual Risk:[/dim] {report.defense_analysis.residual_risk}" + ) + console.print( + f"[dim]Defense Confidence:[/dim] {report.defense_analysis.confidence_level}\n" + ) + console.rule(style="dim") + + +def _print_attack_vector(v: AttackVector) -> None: + style = _SEVERITY_STYLE.get(v.severity, "white") + console.print( + f" [{style}][SEV {v.severity}][/] [bold]{v.title}[/bold] " + f"[dim]({v.vulnerability_type})[/dim]" + ) + console.print(f" {v.description}") + console.print(f" [italic]Exposure: {v.estimated_exposure}[/italic]\n") + + +def _print_global_stats(reports: List[SegmentReport]) -> None: + s = _global_summary(reports) + console.rule("[bold cyan]GLOBAL STATISTICS[/bold cyan]") + console.print( + f" Total segments: {s['total_segments']}\n" + f" Total attack vectors: {s['total_attack_vectors']}\n" + f" Critical segments: {s['critical_segments']}\n" + f" Requires review: {s['requires_review_segments']}\n" + f" Hardened: {s['hardened_segments']}\n" + f" Average risk score: {s['average_risk_score']:.1f}/100\n" + f" Peak severity: {s['peak_severity']}/5\n" + ) + + +# --------------------------------------------------------------------------- +# JSON serialisation +# --------------------------------------------------------------------------- + +def _segment_to_dict(r: SegmentReport) -> dict: + return { + "segment_id": r.segment_id, + "net_risk_score": r.net_risk_score, + "status": r.status, + "original_text": r.original_text, + "red_team": { + "executive_summary": r.plaintiff_analysis.executive_summary, + "highest_severity": r.plaintiff_analysis.highest_severity, + "attack_vectors": [ + { + "title": v.title, + "severity": v.severity, + "vulnerability_type": v.vulnerability_type, + "clause_reference": v.clause_reference, + "description": v.description, + "legal_theory": v.legal_theory, + "exploitation_scenario": v.exploitation_scenario, + "estimated_exposure": v.estimated_exposure, + } + for v in r.plaintiff_analysis.attack_vectors + ], + }, + "blue_team": { + "fully_hardened_clause": r.defense_analysis.fully_hardened_clause, + "confidence_level": r.defense_analysis.confidence_level, + "residual_risk": r.defense_analysis.residual_risk, + "remedies": [ + { + "attack_vector_title": rem.attack_vector_title, + "hardened_language": rem.hardened_language, + "rationale": rem.rationale, + } + for rem in r.defense_analysis.remedies + ], + }, + } + + +def _global_summary(reports: List[SegmentReport]) -> dict: + if not reports: + return {} + scores = [r.net_risk_score for r in reports] + all_vectors = [v for r in reports for v in r.plaintiff_analysis.attack_vectors] + return { + "total_segments": len(reports), + "total_attack_vectors": len(all_vectors), + "critical_segments": sum(1 for r in reports if r.status == "CRITICAL"), + "requires_review_segments": sum(1 for r in reports if r.status == "REQUIRES_REVIEW"), + "hardened_segments": sum(1 for r in reports if r.status == "HARDENED"), + "average_risk_score": sum(scores) / len(scores), + "peak_severity": max((v.severity for v in all_vectors), default=0), + } + + +# --------------------------------------------------------------------------- +# HTML rendering +# --------------------------------------------------------------------------- + +_SEVERITY_HEX = {1: "#22c55e", 2: "#eab308", 3: "#f97316", 4: "#ef4444", 5: "#991b1b"} +_STATUS_HEX = {"HARDENED": "#22c55e", "REQUIRES_REVIEW": "#eab308", "CRITICAL": "#dc2626"} + + +def _render_html(reports: List[SegmentReport]) -> str: + s = _global_summary(reports) + segments_html = "\n".join(_segment_html(r) for r in reports) + generated = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + + return f""" + + + +Legal War Game Report + + + +

⚖️ Autonomous Legal War Game — Simulation Alpha

+

Generated: {generated}

+ +

Global Statistics

+
+
{s['total_segments']}
Segments
+
{s['total_attack_vectors']}
Attack Vectors
+
{s['critical_segments']}
Critical
+
{s['requires_review_segments']}
Requires Review
+
{s['hardened_segments']}
Hardened
+
{s['average_risk_score']:.0f}
Avg Risk Score
+
{s['peak_severity']}/5
Peak Severity
+
+ +

Segment Reports

+{segments_html} + +
Autonomous Legal War Game — For stress-testing purposes only. Not legal advice.
+ +""" + + +def _segment_html(r: SegmentReport) -> str: + status_color = _STATUS_HEX.get(r.status, "#fff") + css_class = r.status.lower().replace("_", "_") + attacks_html = "\n".join(_attack_html(v) for v in r.plaintiff_analysis.attack_vectors) + + return f""" +
+

{r.segment_id} + {r.status} + Risk {r.net_risk_score}/100 +

+
+ Original Text +
{_esc(r.original_text)}
+
+ +

🔴 Red Team — {len(r.plaintiff_analysis.attack_vectors)} Attack Vector(s)

+

{_esc(r.plaintiff_analysis.executive_summary)}

+ {attacks_html} + +

🔵 Blue Team — Hardened Clause

+
{_esc(r.defense_analysis.fully_hardened_clause)}
+

Residual Risk: {_esc(r.defense_analysis.residual_risk)}

+

Defense Confidence: {r.defense_analysis.confidence_level}

+
""" + + +def _attack_html(v: AttackVector) -> str: + color = _SEVERITY_HEX.get(v.severity, "#fff") + return f""" +
+ [SEV {v.severity}] {_esc(v.title)} + — {_esc(v.vulnerability_type)} +

{_esc(v.description)}

+

Exposure: {_esc(v.estimated_exposure)}

+
""" + + +def _esc(text: str) -> str: + return ( + text + .replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + ) From 2f8ccc2d38b6c57bf5797e2799133ef68a7dea8c Mon Sep 17 00:00:00 2001 From: Farhan Malik Date: Tue, 10 Mar 2026 17:30:15 -0400 Subject: [PATCH 2/2] Upgrade to full agentic multi-round adversarial loop with Ollama support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Agents now automatically interact across multiple rounds: Plaintiff re-attacks the Defense's hardened clause each round, Defense re-patches, until convergence or max rounds is reached. No human intervention required between rounds. Provider abstraction: - warroom/providers/base.py: LLMProvider protocol + make_provider() factory - warroom/providers/anthropic_p.py: Anthropic SDK (claude-opus-4-6, adaptive thinking) - warroom/providers/ollama_p.py: Ollama via OpenAI-compatible endpoint (100% local, free) Multi-round adversarial loop: - warroom/loop/adversarial.py: Plaintiff attacks current clause → Defense hardens → Plaintiff re-attacks hardened clause → repeat until convergence or max_rounds - Convergence detection: stops early when severity drops to threshold (default ≤2) - Severity trajectory tracked per segment (e.g. 4 → 3 → 2 shows convergence) Schema updates (warroom/models/schemas.py): - AdversarialRound: captures one Red→Blue exchange - IterativeSegmentReport: full round history, severity_trajectory, risk_reduction, initial_risk_score, converged property CLI updates (main.py): - --provider anthropic|ollama (default: anthropic) - --model (default varies by provider) - --plaintiff-provider / --plaintiff-model (per-agent overrides for mixing providers) - --defense-provider / --defense-model - --ollama-url (default: http://localhost:11434/v1) - --rounds (default: 3) - --convergence (severity threshold, default: 2) Report updates: terminal + JSON + HTML now show per-round breakdowns, severity trajectories, risk reduction stats, and convergence status. Co-Authored-By: Claude Sonnet 4.6 --- legal_warroom/main.py | 169 +++++-- legal_warroom/requirements.txt | 1 + legal_warroom/warroom/agents/defense.py | 108 ++--- legal_warroom/warroom/agents/plaintiff.py | 67 ++- legal_warroom/warroom/loop/__init__.py | 0 legal_warroom/warroom/loop/adversarial.py | 161 +++++++ legal_warroom/warroom/models/schemas.py | 128 ++++-- legal_warroom/warroom/orchestrator.py | 242 ++++------ legal_warroom/warroom/providers/__init__.py | 0 .../warroom/providers/anthropic_p.py | 62 +++ legal_warroom/warroom/providers/base.py | 81 ++++ legal_warroom/warroom/providers/ollama_p.py | 109 +++++ legal_warroom/warroom/report/generator.py | 423 +++++++++--------- 13 files changed, 995 insertions(+), 556 deletions(-) create mode 100644 legal_warroom/warroom/loop/__init__.py create mode 100644 legal_warroom/warroom/loop/adversarial.py create mode 100644 legal_warroom/warroom/providers/__init__.py create mode 100644 legal_warroom/warroom/providers/anthropic_p.py create mode 100644 legal_warroom/warroom/providers/base.py create mode 100644 legal_warroom/warroom/providers/ollama_p.py diff --git a/legal_warroom/main.py b/legal_warroom/main.py index edc56269..529aef55 100644 --- a/legal_warroom/main.py +++ b/legal_warroom/main.py @@ -4,22 +4,43 @@ Usage examples: - # Run on a PDF merger agreement, output to ./output/ + # Anthropic (cloud) — full simulation, 3 rounds per segment python main.py agreement.pdf - # Text file, custom segment size, parallel mode, HTML report - python main.py nda.txt --words 600 --parallel --html + # Ollama (local, free) — default model llama3.1:8b + python main.py agreement.pdf --provider ollama - # Run only first 3 segments (useful for testing) - python main.py big_agreement.pdf --max-segments 3 + # Ollama with a specific model + more rounds + python main.py agreement.pdf --provider ollama --model qwen2.5:14b --rounds 4 + + # Mix providers: Plaintiff on Ollama, Defense on Anthropic + python main.py agreement.pdf \\ + --plaintiff-provider ollama --plaintiff-model qwen2.5:14b \\ + --defense-provider anthropic + + # Parallel + HTML report + python main.py agreement.pdf --parallel --html + + # Dry run: only first 2 segments + python main.py agreement.pdf --max-segments 2 # Full help python main.py --help + +Ollama setup: + 1. Install Ollama: https://ollama.com + 2. Pull a model: ollama pull qwen2.5:14b + 3. Run: python main.py agreement.pdf --provider ollama --model qwen2.5:14b + +Recommended Ollama models (best → fastest): + qwen2.5:14b — best local quality for legal reasoning (~9GB) + qwen2.5:7b — good quality (~5GB) + llama3.1:8b — solid baseline (~5GB) + llama3.2:3b — fastest, lower quality (~2GB) """ from __future__ import annotations -import sys from pathlib import Path from typing import Optional @@ -28,6 +49,7 @@ from rich.console import Console from warroom import orchestrator +from warroom.providers.base import make_provider from warroom.report import generator load_dotenv() @@ -38,64 +60,124 @@ @app.command() def main( document: str = typer.Argument( - ..., - help="Path to the legal document (.pdf or .txt)", + ..., help="Path to the legal document (.pdf or .txt)" + ), + # ── Provider shortcuts (same provider for both agents) ───────────────── + provider: str = typer.Option( + "anthropic", + "--provider", "-p", + help="Backend for both agents: 'anthropic' (cloud) or 'ollama' (local).", + ), + model: Optional[str] = typer.Option( + None, + "--model", "-m", + help=( + "Model to use. Defaults: anthropic=claude-opus-4-6, ollama=llama3.1:8b. " + "Override: --model qwen2.5:14b" + ), + ), + # ── Fine-grained per-agent provider overrides ────────────────────────── + plaintiff_provider: Optional[str] = typer.Option( + None, "--plaintiff-provider", + help="Provider override for the Red Team agent.", + ), + plaintiff_model: Optional[str] = typer.Option( + None, "--plaintiff-model", + help="Model override for the Red Team agent.", + ), + defense_provider: Optional[str] = typer.Option( + None, "--defense-provider", + help="Provider override for the Blue Team agent.", + ), + defense_model: Optional[str] = typer.Option( + None, "--defense-model", + help="Model override for the Blue Team agent.", + ), + # ── Ollama config ────────────────────────────────────────────────────── + ollama_url: str = typer.Option( + "http://localhost:11434/v1", + "--ollama-url", + help="Ollama API base URL.", + ), + # ── Simulation config ────────────────────────────────────────────────── + rounds: int = typer.Option( + 3, "--rounds", "-r", + help="Max adversarial rounds per segment (default 3).", + ), + convergence: int = typer.Option( + 2, "--convergence", + help=( + "Stop iterating when max severity drops to this level or below " + "(after ≥2 rounds). Default 2." + ), ), words: int = typer.Option( - 800, - "--words", - "-w", - help="Soft word-count cap per segment (default 800 ≈ ~1½ contract pages).", + 800, "--words", "-w", + help="Soft word-count cap per segment (default 800).", ), parallel: bool = typer.Option( - False, - "--parallel", - "-p", + False, "--parallel", help="Process segments concurrently (faster, higher API concurrency).", ), workers: int = typer.Option( - 3, - "--workers", + 3, "--workers", help="Max parallel threads when --parallel is set.", ), + # ── Output config ────────────────────────────────────────────────────── output_dir: str = typer.Option( - "output", - "--output", - "-o", - help="Directory for JSON (and optional HTML) reports.", + "output", "--output", "-o", + help="Directory for report files.", ), html: bool = typer.Option( - False, - "--html", + False, "--html", help="Also generate a self-contained HTML report.", ), max_segments: Optional[int] = typer.Option( - None, - "--max-segments", + None, "--max-segments", help="Limit to the first N segments (useful for dry-runs).", ), ) -> None: """ - [bold cyan]Autonomous Legal War Game[/bold cyan] — M&A stress-testing simulation. + [bold cyan]Autonomous Legal War Game[/bold cyan] — M&A contract stress-testing. + + Runs a [bold red]Plaintiff Agent (Red Team)[/bold red] against a + [bold green]Defense Agent (Blue Team)[/bold green] across multiple rounds + per clause. Each round, the Plaintiff re-attacks the Defense's latest + hardened rewrite until convergence or max rounds is reached. - Pits a [bold red]Plaintiff Agent (Red Team)[/bold red] against a - [bold green]Defense Agent (Blue Team)[/bold green] on every clause of - your document, then outputs a structured vulnerability and remediation report. + Supports [bold]Anthropic (cloud)[/bold] and [bold]Ollama (local, free)[/bold]. """ doc_path = Path(document) if not doc_path.exists(): console.print(f"[bold red]Error:[/bold red] File not found: {doc_path}") raise typer.Exit(code=1) - # Run the simulation + # ── Build providers ────────────────────────────────────────────────── + # Per-agent overrides take precedence; fall back to --provider / --model + p_provider = plaintiff_provider or provider + p_model = plaintiff_model or model + d_provider = defense_provider or provider + d_model = defense_model or model + + try: + pp = _build_provider(p_provider, p_model, ollama_url) + dp = _build_provider(d_provider, d_model, ollama_url) + except (ValueError, ImportError) as exc: + console.print(f"[bold red]Provider error:[/bold red] {exc}") + raise typer.Exit(code=1) + + # ── Run simulation ──────────────────────────────────────────────────── reports = orchestrator.run_simulation( document_path=str(doc_path), + plaintiff_provider=pp, + defense_provider=dp, words_per_segment=words, + max_rounds=rounds, + convergence_threshold=convergence, parallel=parallel, max_workers=workers, ) - # Optionally truncate (for dry-runs) if max_segments is not None: reports = reports[:max_segments] @@ -103,32 +185,41 @@ def main( console.print("[yellow]No segments produced. Check your document.[/yellow]") raise typer.Exit(code=1) - # Print terminal report + # ── Output ──────────────────────────────────────────────────────────── generator.print_terminal_summary(reports) - # Save JSON out_dir = Path(output_dir) stem = doc_path.stem - json_path = generator.save_json(reports, out_dir / f"{stem}_warroom_report.json") + generator.save_json(reports, out_dir / f"{stem}_warroom_report.json") - # Save HTML (optional) if html: generator.save_html(reports, out_dir / f"{stem}_warroom_report.html") - # Exit with non-zero code if any segment is CRITICAL + # Exit code 2 signals CRITICAL findings (useful in CI / review pipelines) has_critical = any(r.status == "CRITICAL" for r in reports) if has_critical: console.print( - "\n[bold red]⚠ CRITICAL vulnerabilities detected.[/bold red] " - "Review the report before proceeding." + "\n[bold red]⚠ CRITICAL vulnerabilities remain.[/bold red] " + "Manual legal review required before proceeding." ) raise typer.Exit(code=2) console.print( "\n[bold green]✓ Simulation complete.[/bold green] " - f"All outputs written to [cyan]{out_dir}/[/cyan]" + f"Reports in [cyan]{out_dir}/[/cyan]" ) +def _build_provider(name: str, model: Optional[str], ollama_url: str): + """Build a provider, injecting the Ollama URL when needed.""" + if name.lower() == "ollama": + from warroom.providers.ollama_p import OllamaProvider + return OllamaProvider( + model=model or "llama3.1:8b", + base_url=ollama_url, + ) + return make_provider(name, model) + + if __name__ == "__main__": app() diff --git a/legal_warroom/requirements.txt b/legal_warroom/requirements.txt index 8cfbe76d..3f249777 100644 --- a/legal_warroom/requirements.txt +++ b/legal_warroom/requirements.txt @@ -1,4 +1,5 @@ anthropic>=0.40.0 +openai>=1.0.0 # Required for Ollama provider (OpenAI-compatible API) pydantic>=2.0.0 pdfplumber>=0.10.0 pypdf>=4.0.0 diff --git a/legal_warroom/warroom/agents/defense.py b/legal_warroom/warroom/agents/defense.py index 9a709f0e..7597acdb 100644 --- a/legal_warroom/warroom/agents/defense.py +++ b/legal_warroom/warroom/agents/defense.py @@ -1,24 +1,14 @@ """ Defense Agent — Blue Team -Receives the original clause text plus the Plaintiff Agent's attack -report, then returns a DefenseAnalysis containing: - - A fully hardened rewrite of the clause. - - Per-attack-vector remediation detail. - - Residual risk and confidence assessment. - -Uses claude-opus-4-6 with adaptive thinking and structured outputs. +Receives the clause under attack and the Plaintiff's analysis, then +returns a hardened rewrite. Accepts any LLMProvider. """ from __future__ import annotations -import json -import anthropic -from ..models.schemas import PlaintiffAnalysis, DefenseAnalysis - -# --------------------------------------------------------------------------- -# System prompt (your exact prompt, hardened for structured output) -# --------------------------------------------------------------------------- +from ..models.schemas import DefenseAnalysis, PlaintiffAnalysis +from ..providers.base import LLMProvider DEFENSE_SYSTEM = """\ You are the Defense Counsel Agent (Blue Team) and lead drafter for the \ @@ -28,93 +18,71 @@ Court of Chancery standards. OBJECTIVE: Fortify the contract against every vulnerability identified by the \ -Plaintiff Agent. Rewrite, patch, and secure the language to neutralise all \ -attack vectors while preserving the original business intent of the deal. +Plaintiff Agent while preserving the original business intent of the deal. EXECUTION DIRECTIVES: 1. PRECISION REDRAFTING — Rewrite exploited clauses with absolute semantic \ - precision. Every defined term must be exact and internally consistent. \ - Close all loopholes identified by the Plaintiff Agent. -2. RISK MITIGATION — Inject necessary legal shields: - • Exact numeric definitions (no vague qualifiers like "material" or \ - "reasonable" without explicit anchors). - • Explicit liability caps with carve-outs stated positively. - • Severability and savings clauses where appropriate. - • Clear, unambiguous governing law and exclusive jurisdiction provisions. - • Representations qualified by knowledge only where commercially necessary, \ - with defined Knowledge Persons. - • No "and/or" constructions. Use "and" or "or" explicitly. + precision. Close every loophole in the Plaintiff's attack report. +2. RISK MITIGATION — Inject: + • Exact numeric definitions (no vague qualifiers without explicit anchors) + • Explicit liability caps with stated carve-outs + • Severability and savings clauses where appropriate + • Clear governing law and exclusive jurisdiction provisions + • Knowledge qualifiers only where commercially necessary, with defined \ + Knowledge Persons + • No "and/or" — use "and" or "or" explicitly 3. INTENT PRESERVATION — Do NOT alter the underlying financial or operational \ - agreement between the parties. Only alter the legal execution of that \ - agreement. If a business term cannot be hardened without changing its \ - substance, identify it in residual_risk. -4. DRAFTING STANDARDS — Use formal contract English. Avoid passive voice \ - where active voice is clearer. Define all new terms introduced. \ - Number sub-clauses sequentially. - -OUTPUT: Respond in the exact JSON structure specified. Include one remedy \ -entry for each attack vector you address. If a vector cannot be addressed \ -without altering business terms, note it in residual_risk. + agreement. Only alter the legal execution of that agreement. If a business \ + term cannot be hardened without changing its substance, flag it in \ + residual_risk. +4. DRAFTING STANDARDS — Formal contract English, active voice preferred, \ + sequential sub-clause numbering, all new terms defined inline. + +If this is a re-hardening in round 2+: also address any new vulnerabilities \ +the Plaintiff found in your previous rewrite. """ -# --------------------------------------------------------------------------- -# Agent call -# --------------------------------------------------------------------------- - def run( - client: anthropic.Anthropic, + provider: LLMProvider, clause_text: str, plaintiff_analysis: PlaintiffAnalysis, segment_id: str, + round_number: int = 1, ) -> DefenseAnalysis: """ - Send the clause and the Plaintiff's attack report to the Defense Agent. + Harden a clause against the Plaintiff's attack and return DefenseAnalysis. Args: - client: Initialised Anthropic client. - clause_text: The original, un-hardened contract text. - plaintiff_analysis: Validated output from the Plaintiff Agent. - segment_id: Identifier used for logging/reporting. - - Returns: - DefenseAnalysis — schema-validated Pydantic model. + provider: Any LLMProvider (Anthropic, Ollama, …). + clause_text: The clause being defended (may be a prior hardened rewrite). + plaintiff_analysis: Output from the Plaintiff Agent this round. + segment_id: Identifier used for logging. + round_number: Current round number. """ - # Serialise the plaintiff report so the Defense Agent can read it cleanly attack_summary = _format_attack_vectors(plaintiff_analysis) user_message = ( - f"[DOCUMENT SEGMENT: {segment_id}]\n\n" - "═══ ORIGINAL CLAUSE (to be hardened) ═══\n" + f"[SEGMENT: {segment_id} | ROUND: {round_number}]\n\n" + f"━━━ CLAUSE TO HARDEN ━━━\n" f"{clause_text}\n\n" - "═══ PLAINTIFF AGENT ATTACK REPORT ═══\n" + f"━━━ PLAINTIFF ATTACK REPORT ━━━\n" f"{attack_summary}\n\n" - "═══ TASK ═══\n" - "Analyse the attack vectors above and produce your defense report with " - "fully hardened clause language." + "Produce your defense report with fully hardened clause language." ) - response = client.messages.parse( - model="claude-opus-4-6", - max_tokens=12288, # Defense rewrites can be lengthy - thinking={"type": "adaptive"}, + return provider.complete_structured( system=DEFENSE_SYSTEM, messages=[{"role": "user", "content": user_message}], - output_format=DefenseAnalysis, + schema=DefenseAnalysis, + max_tokens=12288, ) - return response.parsed_output - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- def _format_attack_vectors(analysis: PlaintiffAnalysis) -> str: - """Render the PlaintiffAnalysis as readable text for the Defense Agent.""" lines = [ f"Executive Summary: {analysis.executive_summary}", - f"Highest Severity: {analysis.highest_severity}/5", + f"Highest Severity: {analysis.highest_severity}/5", "", "Attack Vectors (highest severity first):", ] diff --git a/legal_warroom/warroom/agents/plaintiff.py b/legal_warroom/warroom/agents/plaintiff.py index 51f98a0e..26db3660 100644 --- a/legal_warroom/warroom/agents/plaintiff.py +++ b/legal_warroom/warroom/agents/plaintiff.py @@ -1,21 +1,15 @@ """ Plaintiff Agent — Red Team -Receives a document segment and returns a PlaintiffAnalysis with -prioritised attack vectors graded 1-5. - -Uses claude-opus-4-6 with adaptive thinking and structured outputs -so the Orchestrator receives machine-readable, schema-validated data. +Attacks a given clause (original OR a previously hardened rewrite) and +returns a structured PlaintiffAnalysis. Accepts any LLMProvider so it +works identically with Anthropic or Ollama. """ from __future__ import annotations -import anthropic from ..models.schemas import PlaintiffAnalysis - -# --------------------------------------------------------------------------- -# System prompt (your exact prompt, hardened for structured output) -# --------------------------------------------------------------------------- +from ..providers.base import LLMProvider PLAINTIFF_SYSTEM = """\ You are the Plaintiff Counsel Agent (Red Team) in the Autonomous Legal War Game. @@ -26,8 +20,7 @@ JURISDICTION: Standard US corporate law, contract law precedents, and Delaware \ Court of Chancery standards. -OBJECTIVE: Perform a ruthless, exhaustive analysis of the provided clause or \ -section. +OBJECTIVE: Perform a ruthless, exhaustive analysis of the provided clause. EXECUTION DIRECTIVES: 1. HUNT FOR AMBIGUITY — Identify poorly defined terms, vague timelines, \ @@ -36,15 +29,15 @@ party is exposed to uncapped financial risk, breach of warranty, or \ third-party liabilities. 3. STRESS-TEST EDGE CASES — Formulate highly improbable but legally plausible \ - "black swan" scenarios the current language fails to protect against. \ - Think regulatory intervention, force majeure, insolvency events, \ - jurisdictional conflicts, and successor liability. + "black swan" scenarios the current language fails to protect against. 4. ATTACK DEFINITIONS — Challenge every defined term. If it is absent, \ - over-broad, or inconsistent with usage elsewhere, flag it. + over-broad, or internally inconsistent, flag it. + +If this is a re-attack on an already-hardened clause: look for NEW \ +vulnerabilities introduced by the rewrite, and re-evaluate whether previously \ +identified vulnerabilities were truly closed. -OUTPUT: You MUST respond in the exact JSON structure specified. Do not add \ -prose outside the JSON. Do not fabricate specific case citations or docket \ -numbers. Reference legal doctrines and principles only. +Do not fabricate specific case citations. Reference legal doctrines only. SEVERITY SCALE: 1 = Minor ambiguity, negligible consequence @@ -56,40 +49,34 @@ """ -# --------------------------------------------------------------------------- -# Agent call -# --------------------------------------------------------------------------- - def run( - client: anthropic.Anthropic, + provider: LLMProvider, clause_text: str, segment_id: str, + round_number: int = 1, ) -> PlaintiffAnalysis: """ - Send the clause to the Plaintiff Agent and return a validated - PlaintiffAnalysis. + Attack a clause and return a validated PlaintiffAnalysis. Args: - client: Initialised Anthropic client. - clause_text: The raw contract text to attack. - segment_id: Identifier used for logging/reporting. - - Returns: - PlaintiffAnalysis — schema-validated Pydantic model. + provider: Any LLMProvider (Anthropic, Ollama, …). + clause_text: The contract text to attack. May be the original clause + or a previously hardened rewrite (in round 2+). + segment_id: Identifier used for logging. + round_number: Current round number (1 = first attack on original text). """ + label = "ORIGINAL CLAUSE" if round_number == 1 else f"HARDENED CLAUSE (round {round_number - 1} output)" + user_message = ( - f"[DOCUMENT SEGMENT: {segment_id}]\n\n" + f"[SEGMENT: {segment_id} | ROUND: {round_number}]\n\n" + f"━━━ {label} (to be attacked) ━━━\n" f"{clause_text}\n\n" "Analyse the above clause and produce your attack report." ) - response = client.messages.parse( - model="claude-opus-4-6", - max_tokens=8192, - thinking={"type": "adaptive"}, + return provider.complete_structured( system=PLAINTIFF_SYSTEM, messages=[{"role": "user", "content": user_message}], - output_format=PlaintiffAnalysis, + schema=PlaintiffAnalysis, + max_tokens=8192, ) - - return response.parsed_output diff --git a/legal_warroom/warroom/loop/__init__.py b/legal_warroom/warroom/loop/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/loop/adversarial.py b/legal_warroom/warroom/loop/adversarial.py new file mode 100644 index 00000000..a7b5dc9a --- /dev/null +++ b/legal_warroom/warroom/loop/adversarial.py @@ -0,0 +1,161 @@ +""" +Multi-Round Adversarial Loop + +This is the core of the automatic agent interaction. + +What happens each round: + 1. Plaintiff Agent attacks the CURRENT clause text + (round 1 = original, round 2+ = the previous Defense rewrite) + 2. Defense Agent hardens the current clause against those specific attacks + 3. The hardened clause becomes the input for the next round + +The loop continues until: + • Max rounds reached (configurable, default 3) + • Convergence detected: severity dropped to ≤ convergence_threshold + AND at least 2 rounds have run (so we always do at least one full exchange) + • All attack vectors are severity 1 (nothing meaningful left to harden) + +This means the agents are genuinely reacting to each other's output: + - Plaintiff sees what Defense wrote and looks for NEW vulnerabilities in it + - Defense sees what Plaintiff found in its own previous rewrite and patches again +""" + +from __future__ import annotations + +from rich.console import Console + +from ..agents import plaintiff, defense +from ..document.processor import DocumentSegment +from ..models.schemas import AdversarialRound, IterativeSegmentReport +from ..providers.base import LLMProvider + +console = Console() + + +def run_adversarial_loop( + plaintiff_provider: LLMProvider, + defense_provider: LLMProvider, + segment: DocumentSegment, + max_rounds: int = 3, + convergence_threshold: int = 2, +) -> IterativeSegmentReport: + """ + Run the full multi-round adversarial simulation for one document segment. + + Args: + plaintiff_provider: Provider for the Red Team agent. + defense_provider: Provider for the Blue Team agent. + Can be the same provider as plaintiff_provider. + segment: The document segment to stress-test. + max_rounds: Maximum number of Red→Blue exchanges (default 3). + convergence_threshold: Stop early if max severity drops to this level + or below after at least 2 rounds (default 2). + + Returns: + IterativeSegmentReport containing all rounds and the final hardened text. + """ + rounds: list[AdversarialRound] = [] + current_clause = segment.text + + for round_num in range(1, max_rounds + 1): + _log_round_start(segment.segment_id, round_num, max_rounds) + + # ── Red Team attacks ────────────────────────────────────────────── + console.print( + f" [red]●[/red] [bold]Plaintiff Agent[/bold] attacking " + f"{'original clause' if round_num == 1 else 'hardened clause'}…" + ) + attack = plaintiff.run( + provider=plaintiff_provider, + clause_text=current_clause, + segment_id=segment.segment_id, + round_number=round_num, + ) + console.print( + f" Found [bold red]{len(attack.attack_vectors)}[/bold red] attack " + f"vector(s) — max severity [bold]{attack.highest_severity}[/bold]/5" + ) + + # ── Blue Team hardens ───────────────────────────────────────────── + console.print( + f" [green]●[/green] [bold]Defense Agent[/bold] hardening clause…" + ) + hardened = defense.run( + provider=defense_provider, + clause_text=current_clause, + plaintiff_analysis=attack, + segment_id=segment.segment_id, + round_number=round_num, + ) + console.print( + f" Hardened — confidence: [bold]{hardened.confidence_level}[/bold]" + ) + + # Record this round + rounds.append( + AdversarialRound( + round_number=round_num, + clause_text=current_clause, + attack=attack, + defense=hardened, + ) + ) + + # The next round attacks the freshly hardened clause + current_clause = hardened.fully_hardened_clause + + # ── Convergence check ───────────────────────────────────────────── + if round_num >= 2: + if attack.highest_severity <= convergence_threshold: + console.print( + f"\n [cyan]✓ Converged[/cyan] — severity dropped to " + f"{attack.highest_severity} ≤ threshold {convergence_threshold}. " + f"Stopping after {round_num} round(s).\n" + ) + break + + if attack.highest_severity == 1: + console.print( + f"\n [cyan]✓ No meaningful vulnerabilities remain[/cyan] " + f"(all severity 1). Stopping after {round_num} round(s).\n" + ) + break + + report = IterativeSegmentReport( + segment_id=segment.segment_id, + original_text=segment.text, + final_hardened_text=current_clause, + rounds=rounds, + ) + + _log_round_summary(report) + return report + + +# --------------------------------------------------------------------------- +# Logging helpers +# --------------------------------------------------------------------------- + +def _log_round_start(segment_id: str, round_num: int, max_rounds: int) -> None: + console.print( + f"\n [bold cyan]Round {round_num}/{max_rounds}[/bold cyan] " + f"[dim]({segment_id})[/dim]" + ) + + +def _log_round_summary(report: IterativeSegmentReport) -> None: + traj = report.severity_trajectory + arrow = " → ".join(str(s) for s in traj) + status_style = { + "HARDENED": "bold green", + "REQUIRES_REVIEW": "bold yellow", + "CRITICAL": "bold red", + }.get(report.status, "white") + + console.print( + f"\n [{status_style}]■ {report.segment_id} complete[/] " + f"Severity trajectory: {arrow} | " + f"Risk: {report.initial_risk_score} → {report.net_risk_score} " + f"(-{report.risk_reduction} pts) | " + f"Status: [{status_style}]{report.status}[/]\n" + ) diff --git a/legal_warroom/warroom/models/schemas.py b/legal_warroom/warroom/models/schemas.py index 49819a65..cd455b83 100644 --- a/legal_warroom/warroom/models/schemas.py +++ b/legal_warroom/warroom/models/schemas.py @@ -1,15 +1,18 @@ """ -Pydantic schemas for structured outputs from each agent in the -Autonomous Legal War Game pipeline. +Pydantic schemas for structured outputs from each agent, plus +data classes that track the full multi-round adversarial simulation. """ from __future__ import annotations -from pydantic import BaseModel, Field + +from dataclasses import dataclass, field from typing import List +from pydantic import BaseModel, Field + # --------------------------------------------------------------------------- -# Plaintiff Agent (Red Team) output +# Plaintiff Agent (Red Team) — structured output # --------------------------------------------------------------------------- class AttackVector(BaseModel): @@ -27,11 +30,11 @@ class AttackVector(BaseModel): severity: int = Field( description=( "Integer 1-5. " - "1=Minor ambiguity with negligible consequence. " - "2=Moderate risk, localized financial exposure. " + "1=Minor ambiguity, negligible consequence. " + "2=Moderate risk, localised financial exposure. " "3=Significant exposure, likely litigation target. " "4=Severe vulnerability, deal-threatening if exploited. " - "5=Catastrophic structural failure, renders clause unenforceable." + "5=Catastrophic structural failure — unenforceable or unlimited liability." ) ) title: str = Field(description="Short, descriptive title for this attack vector.") @@ -40,23 +43,21 @@ class AttackVector(BaseModel): ) legal_theory: str = Field( description=( - "The legal doctrine, case law principle, or statutory basis enabling " - "this attack (e.g., contra proferentem, implied duty of good faith, " - "Delaware chancery standards on MAE clauses). " + "The legal doctrine or principle enabling this attack " + "(e.g., contra proferentem, implied duty of good faith, Delaware MAE standards). " "Do not fabricate specific case citations." ) ) exploitation_scenario: str = Field( description=( - "A concrete scenario — including black-swan edge cases — demonstrating " - "how a hostile party would exploit this vulnerability in litigation." + "A concrete scenario — including black-swan edge cases — showing how a " + "hostile party would exploit this vulnerability in litigation." ) ) estimated_exposure: str = Field( description=( - "Estimated financial exposure or legal consequence if this vector is " - "successfully exploited (e.g., 'uncapped indemnification liability', " - "'rescission of the entire transaction', '$X–$Y range')." + "Estimated financial exposure or legal consequence if successfully exploited " + "(e.g., 'uncapped indemnification liability', 'rescission of entire transaction')." ) ) @@ -70,30 +71,30 @@ class PlaintiffAnalysis(BaseModel): ) executive_summary: str = Field( description=( - "A 2-4 sentence executive summary of the clause's overall vulnerability " + "2-4 sentence executive summary of the clause's overall vulnerability " "profile from the plaintiff's perspective." ) ) # --------------------------------------------------------------------------- -# Defense Agent (Blue Team) output +# Defense Agent (Blue Team) — structured output # --------------------------------------------------------------------------- class DefenseRemedy(BaseModel): attack_vector_title: str = Field( - description="Exact title of the attack vector being neutralized." + description="Exact title of the attack vector being neutralised." ) hardened_language: str = Field( description=( "The rewritten clause language that closes this specific vulnerability. " - "Must be precise, legally sound, and written in formal contract English." + "Precise, legally sound, formal contract English." ) ) rationale: str = Field( description=( - "Explanation of exactly how the rewritten language neutralizes the " - "plaintiff's attack, referencing the specific legal theory." + "Explanation of how the rewritten language neutralises the plaintiff's " + "attack, referencing the specific legal theory." ) ) @@ -106,44 +107,97 @@ class DefenseAnalysis(BaseModel): ) ) remedies: List[DefenseRemedy] = Field( - description="Per-attack-vector remediation details, one entry per attack vector addressed." + description="Per-attack-vector remediation details." ) residual_risk: str = Field( description=( - "Any remaining risk that cannot be fully mitigated without fundamentally " - "altering the business terms of the deal. If none, state 'None identified.'" + "Any remaining risk that cannot be fully mitigated without altering " + "the business terms. If none, state 'None identified.'" ) ) confidence_level: str = Field( - description=( - "Defense counsel's confidence in the hardened clause. " - "One of: HIGH | MEDIUM | LOW" - ) + description="Defense counsel's confidence: HIGH | MEDIUM | LOW" ) # --------------------------------------------------------------------------- -# Final segment report (output of the full pipeline per document segment) +# Adversarial loop tracking — Python dataclasses (not Pydantic, not sent to API) # --------------------------------------------------------------------------- -class SegmentReport(BaseModel): +@dataclass +class AdversarialRound: + """One complete Red → Blue exchange within the multi-round loop.""" + round_number: int + clause_text: str # The clause that was attacked THIS round + attack: PlaintiffAnalysis + defense: DefenseAnalysis + + +@dataclass +class IterativeSegmentReport: + """ + Full report for one document segment after all adversarial rounds. + + The 'rounds' list records every Red→Blue exchange so reviewers can + see how the clause evolved across iterations. + """ segment_id: str original_text: str - plaintiff_analysis: PlaintiffAnalysis - defense_analysis: DefenseAnalysis + final_hardened_text: str # The clause text after the last Defense pass + rounds: List[AdversarialRound] = field(default_factory=list) + + # ── Computed properties ───────────────────────────────────────────── + + @property + def total_rounds(self) -> int: + return len(self.rounds) + + @property + def severity_trajectory(self) -> list[int]: + """Max severity per round — shows convergence over time.""" + return [r.attack.highest_severity for r in self.rounds] + + @property + def converged(self) -> bool: + """True if severity dropped at least 2 points across the simulation.""" + traj = self.severity_trajectory + return len(traj) >= 2 and (traj[0] - traj[-1]) >= 2 + + @property + def final_attack(self) -> PlaintiffAnalysis | None: + return self.rounds[-1].attack if self.rounds else None + + @property + def final_defense(self) -> DefenseAnalysis | None: + return self.rounds[-1].defense if self.rounds else None @property def net_risk_score(self) -> int: - """ - Simple composite score: average severity * 20, capped at 100. - Higher = more dangerous original clause. - """ - vectors = self.plaintiff_analysis.attack_vectors + """Risk score (0-100) based on the FINAL round's attack severity.""" + if not self.rounds: + return 0 + vectors = self.rounds[-1].attack.attack_vectors if not vectors: return 0 avg = sum(v.severity for v in vectors) / len(vectors) return min(100, round(avg * 20)) + @property + def initial_risk_score(self) -> int: + """Risk score of the FIRST round (before any hardening).""" + if not self.rounds: + return 0 + vectors = self.rounds[0].attack.attack_vectors + if not vectors: + return 0 + avg = sum(v.severity for v in vectors) / len(vectors) + return min(100, round(avg * 20)) + + @property + def risk_reduction(self) -> int: + """Points reduced: initial_risk_score - net_risk_score.""" + return max(0, self.initial_risk_score - self.net_risk_score) + @property def status(self) -> str: score = self.net_risk_score diff --git a/legal_warroom/warroom/orchestrator.py b/legal_warroom/warroom/orchestrator.py index 3f8d683e..2af89d9f 100644 --- a/legal_warroom/warroom/orchestrator.py +++ b/legal_warroom/warroom/orchestrator.py @@ -1,214 +1,153 @@ """ -Orchestrator — The Autonomous Legal War Game +Orchestrator — entry point for the simulation. -Drives the full adversarial pipeline: - 1. Ingest document segments. - 2. Route each segment to the Plaintiff Agent (Red Team) for attack. - 3. Route the original text + attack report to the Defense Agent (Blue Team). - 4. Collect SegmentReports for the final output. - -Supports: - - Sequential processing (safe, predictable, lower concurrency cost). - - Parallel processing (faster for large documents; uses concurrent API calls). +Responsibilities: + • Load and segment the document. + • For each segment, run the multi-round adversarial loop. + • Support sequential and parallel (thread-pool) processing. + • Accept any LLMProvider(s) — Anthropic, Ollama, or mixed. """ from __future__ import annotations -import asyncio import concurrent.futures from typing import Callable, List, Optional -import anthropic from rich.console import Console -from rich.progress import ( - BarColumn, - MofNCompleteColumn, - Progress, - SpinnerColumn, - TextColumn, - TimeElapsedColumn, -) from rich.table import Table -from .agents import plaintiff, defense from .document.processor import DocumentSegment, load_and_segment -from .models.schemas import SegmentReport +from .loop.adversarial import run_adversarial_loop +from .models.schemas import IterativeSegmentReport +from .providers.base import LLMProvider console = Console() -# --------------------------------------------------------------------------- -# Public entry point -# --------------------------------------------------------------------------- - def run_simulation( document_path: str, + plaintiff_provider: LLMProvider, + defense_provider: LLMProvider, words_per_segment: int = 800, + max_rounds: int = 3, + convergence_threshold: int = 2, parallel: bool = False, max_workers: int = 3, - on_segment_complete: Optional[Callable[[SegmentReport], None]] = None, -) -> List[SegmentReport]: + on_segment_complete: Optional[Callable[[IterativeSegmentReport], None]] = None, +) -> List[IterativeSegmentReport]: """ - Run the full Legal War Game simulation on a document. + Run the full Legal War Game simulation. Args: - document_path: Path to a .pdf or .txt file. - words_per_segment: Soft word-count cap per segment (default 800). - parallel: If True, process segments concurrently. - max_workers: Max parallel threads when parallel=True. - on_segment_complete: Optional callback invoked after each segment. + document_path: Path to .pdf or .txt file. + plaintiff_provider: LLMProvider for the Red Team. + defense_provider: LLMProvider for the Blue Team (can be same). + words_per_segment: Soft word-count cap per chunk (default 800). + max_rounds: Max adversarial rounds per segment (default 3). + convergence_threshold: Stop early when severity ≤ this (default 2). + parallel: Process segments concurrently via thread pool. + max_workers: Thread-pool size when parallel=True. + on_segment_complete: Optional callback after each segment. Returns: - List of SegmentReport, one per document segment. + List[IterativeSegmentReport], one per segment, in document order. """ - client = anthropic.Anthropic() - console.rule("[bold cyan]AUTONOMOUS LEGAL WAR GAME — SIMULATION ALPHA[/bold cyan]") - console.print(f"\n[bold]Document:[/bold] {document_path}") + console.print( + f"\n[bold]Document:[/bold] {document_path}\n" + f"[bold]Plaintiff model:[/bold] {plaintiff_provider.model}\n" + f"[bold]Defense model:[/bold] {defense_provider.model}\n" + f"[bold]Max rounds/segment:[/bold] {max_rounds}\n" + f"[bold]Parallel:[/bold] {parallel}\n" + ) - # ── 1. Ingest ────────────────────────────────────────────────────────── + # ── Segment ───────────────────────────────────────────────────────────── with console.status("[yellow]Ingesting and segmenting document…"): segments = load_and_segment(document_path, words_per_segment) console.print( - f"[green]✓[/green] Segmented into [bold]{len(segments)}[/bold] clause blocks " - f"(~{words_per_segment} words each)\n" + f"[green]✓[/green] Segmented into [bold]{len(segments)}[/bold] clause blocks\n" ) - _print_segment_table(segments) - # ── 2. Run adversarial pipeline ──────────────────────────────────────── - reports: List[SegmentReport] = [] - + # ── Run ───────────────────────────────────────────────────────────────── if parallel and len(segments) > 1: - reports = _run_parallel(client, segments, max_workers, on_segment_complete) - else: - reports = _run_sequential(client, segments, on_segment_complete) - - return reports + return _run_parallel( + plaintiff_provider, defense_provider, + segments, max_rounds, convergence_threshold, + max_workers, on_segment_complete, + ) + return _run_sequential( + plaintiff_provider, defense_provider, + segments, max_rounds, convergence_threshold, + on_segment_complete, + ) # --------------------------------------------------------------------------- -# Sequential execution +# Sequential # --------------------------------------------------------------------------- def _run_sequential( - client: anthropic.Anthropic, + pp: LLMProvider, + dp: LLMProvider, segments: List[DocumentSegment], - on_complete: Optional[Callable[[SegmentReport], None]], -) -> List[SegmentReport]: - reports: List[SegmentReport] = [] - - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - MofNCompleteColumn(), - TimeElapsedColumn(), - console=console, - ) as progress: - task = progress.add_task("Processing segments…", total=len(segments)) - - for seg in segments: - progress.update(task, description=f"[cyan]{seg.segment_id}[/cyan] — Red Team attacking…") - report = _process_segment(client, seg) - reports.append(report) - if on_complete: - on_complete(report) - progress.advance(task) - + max_rounds: int, + threshold: int, + on_complete: Optional[Callable], +) -> List[IterativeSegmentReport]: + reports: List[IterativeSegmentReport] = [] + for i, seg in enumerate(segments, 1): + console.rule( + f"[cyan]Segment {i}/{len(segments)} — {seg.segment_id}[/cyan]", + style="dim", + ) + report = run_adversarial_loop(pp, dp, seg, max_rounds, threshold) + reports.append(report) + if on_complete: + on_complete(report) return reports # --------------------------------------------------------------------------- -# Parallel execution +# Parallel # --------------------------------------------------------------------------- def _run_parallel( - client: anthropic.Anthropic, + pp: LLMProvider, + dp: LLMProvider, segments: List[DocumentSegment], + max_rounds: int, + threshold: int, max_workers: int, - on_complete: Optional[Callable[[SegmentReport], None]], -) -> List[SegmentReport]: - """ - Process segments in parallel using a thread pool. - The Anthropic SDK is thread-safe; each call creates its own HTTP session. - """ + on_complete: Optional[Callable], +) -> List[IterativeSegmentReport]: console.print( - f"[bold yellow]Parallel mode:[/bold yellow] up to {max_workers} concurrent API calls.\n" + f"[bold yellow]Parallel mode:[/bold yellow] " + f"up to {max_workers} concurrent segments.\n" ) + results: dict[str, IterativeSegmentReport] = {} - results: dict[str, SegmentReport] = {} - - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - MofNCompleteColumn(), - TimeElapsedColumn(), - console=console, - ) as progress: - task = progress.add_task("Processing segments (parallel)…", total=len(segments)) - - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_seg = { - executor.submit(_process_segment, client, seg): seg - for seg in segments - } - for future in concurrent.futures.as_completed(future_to_seg): - seg = future_to_seg[future] - try: - report = future.result() - results[seg.segment_id] = report - if on_complete: - on_complete(report) - except Exception as exc: - console.print( - f"[red]ERROR[/red] {seg.segment_id}: {exc}" - ) - finally: - progress.advance(task) - - # Return in original document order - ordered = [results[seg.segment_id] for seg in segments if seg.segment_id in results] - return ordered - - -# --------------------------------------------------------------------------- -# Single-segment pipeline -# --------------------------------------------------------------------------- - -def _process_segment( - client: anthropic.Anthropic, - seg: DocumentSegment, -) -> SegmentReport: - """Run the full Red → Blue pipeline for a single document segment.""" - - # ── Red Team attack ────────────────────────────────────────────────── - plaintiff_analysis = plaintiff.run( - client=client, - clause_text=seg.text, - segment_id=seg.segment_id, - ) + def _process(seg: DocumentSegment) -> IterativeSegmentReport: + return run_adversarial_loop(pp, dp, seg, max_rounds, threshold) - # ── Blue Team defence ──────────────────────────────────────────────── - defense_analysis = defense.run( - client=client, - clause_text=seg.text, - plaintiff_analysis=plaintiff_analysis, - segment_id=seg.segment_id, - ) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_map = {executor.submit(_process, seg): seg for seg in segments} + for future in concurrent.futures.as_completed(future_map): + seg = future_map[future] + try: + report = future.result() + results[seg.segment_id] = report + if on_complete: + on_complete(report) + except Exception as exc: + console.print(f"[red]ERROR[/red] {seg.segment_id}: {exc}") - return SegmentReport( - segment_id=seg.segment_id, - original_text=seg.text, - plaintiff_analysis=plaintiff_analysis, - defense_analysis=defense_analysis, - ) + return [results[seg.segment_id] for seg in segments if seg.segment_id in results] # --------------------------------------------------------------------------- -# Display helpers +# Display # --------------------------------------------------------------------------- def _print_segment_table(segments: List[DocumentSegment]) -> None: @@ -216,11 +155,8 @@ def _print_segment_table(segments: List[DocumentSegment]) -> None: table.add_column("ID", style="cyan", no_wrap=True) table.add_column("Words", justify="right") table.add_column("Preview", max_width=80) - for seg in segments: - word_count = len(seg.text.split()) preview = seg.text[:120].replace("\n", " ") + ("…" if len(seg.text) > 120 else "") - table.add_row(seg.segment_id, str(word_count), preview) - + table.add_row(seg.segment_id, str(len(seg.text.split())), preview) console.print(table) console.print() diff --git a/legal_warroom/warroom/providers/__init__.py b/legal_warroom/warroom/providers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/providers/anthropic_p.py b/legal_warroom/warroom/providers/anthropic_p.py new file mode 100644 index 00000000..76e0d0f8 --- /dev/null +++ b/legal_warroom/warroom/providers/anthropic_p.py @@ -0,0 +1,62 @@ +""" +Anthropic provider — uses the official anthropic SDK. + +Features used: + • claude-opus-4-6 with adaptive thinking for deep legal reasoning + • client.messages.parse() for schema-validated structured outputs + • Streaming-safe: max_tokens capped at 12 288 (no streaming needed at this size) +""" + +from __future__ import annotations + +from typing import Type, TypeVar + +import anthropic +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +class AnthropicProvider: + def __init__(self, model: str = "claude-opus-4-6") -> None: + self._client = anthropic.Anthropic() + self._model = model + + @property + def model(self) -> str: + return self._model + + def complete( + self, + system: str, + messages: list[dict], + max_tokens: int = 4096, + ) -> str: + response = self._client.messages.create( + model=self._model, + max_tokens=max_tokens, + thinking={"type": "adaptive"}, + system=system, + messages=messages, + ) + for block in response.content: + if block.type == "text": + return block.text + return "" + + def complete_structured( + self, + system: str, + messages: list[dict], + schema: Type[T], + max_tokens: int = 8192, + ) -> T: + response = self._client.messages.parse( + model=self._model, + max_tokens=max_tokens, + thinking={"type": "adaptive"}, + system=system, + messages=messages, + output_format=schema, + ) + return response.parsed_output diff --git a/legal_warroom/warroom/providers/base.py b/legal_warroom/warroom/providers/base.py new file mode 100644 index 00000000..18093c2d --- /dev/null +++ b/legal_warroom/warroom/providers/base.py @@ -0,0 +1,81 @@ +""" +Provider abstraction layer. + +Every backend (Anthropic, Ollama, …) implements LLMProvider. +The rest of the codebase talks only to this interface, so swapping +models or endpoints requires zero changes to agent logic. + +Internal message format is identical to the OpenAI chat API: + {"role": "user"|"assistant"|"system", "content": "..."} +This is the most portable format and maps cleanly to both APIs. +""" + +from __future__ import annotations + +from typing import Protocol, Type, TypeVar +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +class LLMProvider(Protocol): + """ + Minimal interface every backend must implement. + + Two methods: + complete() — free-form text response + complete_structured() — guaranteed Pydantic-model-shaped response + """ + + @property + def model(self) -> str: + """Human-readable model identifier (used for logging).""" + ... + + def complete( + self, + system: str, + messages: list[dict], + max_tokens: int = 4096, + ) -> str: + """ + Send system + messages and return the assistant's text reply. + """ + ... + + def complete_structured( + self, + system: str, + messages: list[dict], + schema: Type[T], + max_tokens: int = 8192, + ) -> T: + """ + Send system + messages and return a validated Pydantic instance. + The provider is responsible for enforcing the schema. + """ + ... + + +def make_provider(provider_name: str, model: str | None = None) -> LLMProvider: + """ + Factory — returns the right provider based on name string. + + Args: + provider_name: "anthropic" or "ollama" + model: Optional model override. Defaults vary per provider. + + Usage: + provider = make_provider("anthropic") + provider = make_provider("ollama", model="qwen2.5:14b") + """ + name = provider_name.lower().strip() + if name == "anthropic": + from .anthropic_p import AnthropicProvider + return AnthropicProvider(model=model or "claude-opus-4-6") + if name == "ollama": + from .ollama_p import OllamaProvider + return OllamaProvider(model=model or "llama3.1:8b") + raise ValueError( + f"Unknown provider '{provider_name}'. Choose 'anthropic' or 'ollama'." + ) diff --git a/legal_warroom/warroom/providers/ollama_p.py b/legal_warroom/warroom/providers/ollama_p.py new file mode 100644 index 00000000..cbaa94c6 --- /dev/null +++ b/legal_warroom/warroom/providers/ollama_p.py @@ -0,0 +1,109 @@ +""" +Ollama provider — run the entire war game 100% locally, no API costs. + +Uses Ollama's OpenAI-compatible REST endpoint (default: localhost:11434). + +Recommended models (support JSON mode and produce coherent legal output): + • qwen2.5:14b — best quality for legal reasoning locally + • qwen2.5:7b — good quality, fits on most consumer GPUs + • llama3.1:8b — solid, widely available + • llama3.2:3b — fast, lower quality + • mistral:7b — good instruction following + +Install a model: + ollama pull qwen2.5:14b + +Structured outputs: + Ollama supports response_format={"type": "json_object"} for JSON mode. + The schema is injected into the system prompt so the model knows the shape. + The response is then validated via Pydantic — if parsing fails, a clear + error is raised telling the user to try a larger/better model. +""" + +from __future__ import annotations + +import json +from typing import Type, TypeVar + +from pydantic import BaseModel, ValidationError + +T = TypeVar("T", bound=BaseModel) + +# Injected at the end of every structured-output system prompt +_SCHEMA_INSTRUCTION = ( + "\n\n━━━ OUTPUT FORMAT (STRICT) ━━━\n" + "You MUST respond with a single valid JSON object that exactly matches " + "the schema below. Do NOT include markdown fences, prose, or any text " + "outside the JSON object.\n\n" + "Schema:\n{schema}" +) + + +class OllamaProvider: + def __init__( + self, + model: str = "llama3.1:8b", + base_url: str = "http://localhost:11434/v1", + ) -> None: + try: + from openai import OpenAI + except ImportError as exc: + raise ImportError( + "The 'openai' package is required for the Ollama provider.\n" + "Install it with: pip install openai" + ) from exc + + self._client = OpenAI(base_url=base_url, api_key="ollama") + self._model = model + + @property + def model(self) -> str: + return self._model + + def complete( + self, + system: str, + messages: list[dict], + max_tokens: int = 4096, + ) -> str: + all_msgs = ([{"role": "system", "content": system}] if system else []) + messages + response = self._client.chat.completions.create( + model=self._model, + messages=all_msgs, + max_tokens=max_tokens, + temperature=0.2, + ) + return response.choices[0].message.content or "" + + def complete_structured( + self, + system: str, + messages: list[dict], + schema: Type[T], + max_tokens: int = 8192, + ) -> T: + schema_json = json.dumps(schema.model_json_schema(), indent=2) + enhanced_system = system + _SCHEMA_INSTRUCTION.format(schema=schema_json) + + all_msgs = ( + [{"role": "system", "content": enhanced_system}] + messages + ) + + response = self._client.chat.completions.create( + model=self._model, + messages=all_msgs, + max_tokens=max_tokens, + temperature=0.1, + response_format={"type": "json_object"}, + ) + raw = response.choices[0].message.content or "{}" + + try: + return schema.model_validate_json(raw) + except ValidationError as exc: + raise ValueError( + f"Ollama model '{self._model}' returned JSON that does not match " + f"the expected schema.\n" + f"Try a larger model (e.g. qwen2.5:14b) for better compliance.\n" + f"Validation errors:\n{exc}" + ) from exc diff --git a/legal_warroom/warroom/report/generator.py b/legal_warroom/warroom/report/generator.py index 878dacb3..ab286e45 100644 --- a/legal_warroom/warroom/report/generator.py +++ b/legal_warroom/warroom/report/generator.py @@ -1,13 +1,10 @@ """ Report Generator -Transforms a list of SegmentReports into: - 1. A rich terminal summary (always shown). - 2. A JSON file (machine-readable, always saved). - 3. An HTML file (human-readable, optional). - -The JSON structure is designed to be ingested by downstream systems -(dashboards, further LLM analysis, audit trails). +Produces: + 1. Rich terminal summary with multi-round progression tables. + 2. JSON file — full structured data for every round of every segment. + 3. HTML file — dark-themed, self-contained, shows severity trajectory. """ from __future__ import annotations @@ -22,32 +19,25 @@ from rich.table import Table from rich.text import Text -from ..models.schemas import AttackVector, SegmentReport +from ..models.schemas import IterativeSegmentReport console = Console() -# ── Severity colours ──────────────────────────────────────────────────────── -_SEVERITY_STYLE = { - 1: "green", - 2: "yellow", - 3: "orange3", - 4: "red", - 5: "bold red", -} - +_SEVERITY_STYLE = {1: "green", 2: "yellow", 3: "orange3", 4: "red", 5: "bold red"} _STATUS_STYLE = { "HARDENED": "bold green", "REQUIRES_REVIEW": "bold yellow", "CRITICAL": "bold red", } +_SEVERITY_HEX = {1: "#22c55e", 2: "#eab308", 3: "#f97316", 4: "#ef4444", 5: "#991b1b"} +_STATUS_HEX = {"HARDENED": "#22c55e", "REQUIRES_REVIEW": "#eab308", "CRITICAL": "#dc2626"} # --------------------------------------------------------------------------- -# Public API +# Terminal # --------------------------------------------------------------------------- -def print_terminal_summary(reports: List[SegmentReport]) -> None: - """Print a rich summary to the terminal after the simulation completes.""" +def print_terminal_summary(reports: List[IterativeSegmentReport]) -> None: console.rule("\n[bold cyan]WAR GAME RESULTS — FINAL REPORT[/bold cyan]") _print_overview_table(reports) for report in reports: @@ -55,194 +45,197 @@ def print_terminal_summary(reports: List[SegmentReport]) -> None: _print_global_stats(reports) -def save_json(reports: List[SegmentReport], output_path: str | Path) -> Path: - """Serialise all reports to a JSON file. Returns the path written.""" - path = Path(output_path) - path.parent.mkdir(parents=True, exist_ok=True) - - payload = { - "simulation": "Autonomous Legal War Game — Simulation Alpha", - "generated_at": datetime.now(timezone.utc).isoformat(), - "segment_count": len(reports), - "segments": [_segment_to_dict(r) for r in reports], - "summary": _global_summary(reports), - } - - path.write_text(json.dumps(payload, indent=2), encoding="utf-8") - console.print(f"\n[green]✓ JSON report saved:[/green] {path}") - return path - - -def save_html(reports: List[SegmentReport], output_path: str | Path) -> Path: - """Generate a self-contained HTML report. Returns the path written.""" - path = Path(output_path) - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(_render_html(reports), encoding="utf-8") - console.print(f"[green]✓ HTML report saved:[/green] {path}") - return path - - -# --------------------------------------------------------------------------- -# Terminal rendering -# --------------------------------------------------------------------------- - -def _print_overview_table(reports: List[SegmentReport]) -> None: +def _print_overview_table(reports: List[IterativeSegmentReport]) -> None: table = Table(title="Segment Overview", show_lines=True, expand=True) table.add_column("Segment", style="cyan", no_wrap=True) - table.add_column("Vectors", justify="right") - table.add_column("Max Severity", justify="center") - table.add_column("Risk Score", justify="right") + table.add_column("Rounds", justify="center") + table.add_column("Severity Trajectory", justify="center") + table.add_column("Risk (start→end)", justify="center") + table.add_column("Reduction", justify="right") table.add_column("Status", justify="center") - table.add_column("Defense Confidence", justify="center") + table.add_column("Converged?", justify="center") for r in reports: - sev = r.plaintiff_analysis.highest_severity - style = _SEVERITY_STYLE.get(sev, "white") + traj = " → ".join(str(s) for s in r.severity_trajectory) status_style = _STATUS_STYLE.get(r.status, "white") - + conv = "[green]Yes[/green]" if r.converged else "[yellow]No[/yellow]" table.add_row( r.segment_id, - str(len(r.plaintiff_analysis.attack_vectors)), - Text(f"{sev}/5", style=style), - str(r.net_risk_score), + str(r.total_rounds), + traj, + f"{r.initial_risk_score} → {r.net_risk_score}", + f"-{r.risk_reduction}", Text(r.status, style=status_style), - r.defense_analysis.confidence_level, + conv, ) - console.print(table) -def _print_segment_detail(report: SegmentReport) -> None: - """Print the full Red/Blue breakdown for one segment.""" - sev = report.plaintiff_analysis.highest_severity - panel_style = _SEVERITY_STYLE.get(sev, "white") - +def _print_segment_detail(report: IterativeSegmentReport) -> None: + status_style = _STATUS_STYLE.get(report.status, "white") + initial_sev = report.rounds[0].attack.highest_severity if report.rounds else 1 console.print( Panel( f"[bold]Segment:[/bold] {report.segment_id} " - f"[bold]Risk Score:[/bold] {report.net_risk_score}/100 " - f"[bold]Status:[/bold] [{_STATUS_STYLE.get(report.status, 'white')}]{report.status}[/]", + f"[bold]Rounds:[/bold] {report.total_rounds} " + f"[bold]Risk:[/bold] {report.initial_risk_score} → {report.net_risk_score} " + f"[bold]Status:[/bold] [{status_style}]{report.status}[/]", title=f"[bold cyan]─── {report.segment_id} ───[/bold cyan]", - border_style=panel_style, + border_style=_SEVERITY_STYLE.get(initial_sev, "white"), ) ) - # Original clause excerpt - excerpt = report.original_text[:400].replace("\n", " ") - if len(report.original_text) > 400: + excerpt = report.original_text[:300].replace("\n", " ") + if len(report.original_text) > 300: excerpt += "…" console.print(f"[dim]Original:[/dim] {excerpt}\n") - # Attack vectors - console.print("[bold red]🔴 RED TEAM — ATTACK VECTORS[/bold red]") - console.print(f"[dim]{report.plaintiff_analysis.executive_summary}[/dim]\n") - for v in report.plaintiff_analysis.attack_vectors: - _print_attack_vector(v) + for rnd in report.rounds: + console.print( + f"[bold]Round {rnd.round_number}[/bold] " + f"[red]Red Team — {len(rnd.attack.attack_vectors)} vector(s) " + f"(max sev {rnd.attack.highest_severity}/5)[/red]" + ) + console.print(f" [dim]{rnd.attack.executive_summary}[/dim]") + for v in rnd.attack.attack_vectors: + s = _SEVERITY_STYLE.get(v.severity, "white") + console.print(f" [{s}][{v.severity}][/] {v.title} — {v.estimated_exposure}") + console.print( + f"\n [green]Blue Team — Confidence: {rnd.defense.confidence_level}[/green]" + ) + console.print(f" [dim]Residual risk: {rnd.defense.residual_risk}[/dim]\n") - # Defense - console.print("\n[bold green]🔵 BLUE TEAM — HARDENED CLAUSE[/bold green]") - console.print(report.defense_analysis.fully_hardened_clause) - console.print( - f"\n[dim]Residual Risk:[/dim] {report.defense_analysis.residual_risk}" - ) - console.print( - f"[dim]Defense Confidence:[/dim] {report.defense_analysis.confidence_level}\n" - ) + console.print("[bold green]Final Hardened Clause:[/bold green]") + console.print(report.final_hardened_text) console.rule(style="dim") -def _print_attack_vector(v: AttackVector) -> None: - style = _SEVERITY_STYLE.get(v.severity, "white") - console.print( - f" [{style}][SEV {v.severity}][/] [bold]{v.title}[/bold] " - f"[dim]({v.vulnerability_type})[/dim]" - ) - console.print(f" {v.description}") - console.print(f" [italic]Exposure: {v.estimated_exposure}[/italic]\n") - - -def _print_global_stats(reports: List[SegmentReport]) -> None: +def _print_global_stats(reports: List[IterativeSegmentReport]) -> None: s = _global_summary(reports) console.rule("[bold cyan]GLOBAL STATISTICS[/bold cyan]") console.print( - f" Total segments: {s['total_segments']}\n" + f" Segments processed: {s['total_segments']}\n" + f" Total rounds run: {s['total_rounds']}\n" f" Total attack vectors: {s['total_attack_vectors']}\n" - f" Critical segments: {s['critical_segments']}\n" + f" Segments converged: {s['converged_segments']}\n" + f" Critical remaining: {s['critical_segments']}\n" f" Requires review: {s['requires_review_segments']}\n" - f" Hardened: {s['hardened_segments']}\n" - f" Average risk score: {s['average_risk_score']:.1f}/100\n" - f" Peak severity: {s['peak_severity']}/5\n" + f" Fully hardened: {s['hardened_segments']}\n" + f" Avg risk reduction: {s['avg_risk_reduction']:.1f} pts\n" + f" Peak initial severity: {s['peak_initial_severity']}/5\n" + f" Peak final severity: {s['peak_final_severity']}/5\n" ) # --------------------------------------------------------------------------- -# JSON serialisation +# JSON # --------------------------------------------------------------------------- -def _segment_to_dict(r: SegmentReport) -> dict: +def save_json(reports: List[IterativeSegmentReport], output_path: str | Path) -> Path: + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "simulation": "Autonomous Legal War Game — Simulation Alpha", + "generated_at": datetime.now(timezone.utc).isoformat(), + "segment_count": len(reports), + "segments": [_segment_to_dict(r) for r in reports], + "summary": _global_summary(reports), + } + path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + console.print(f"\n[green]✓ JSON report saved:[/green] {path}") + return path + + +def _segment_to_dict(r: IterativeSegmentReport) -> dict: return { "segment_id": r.segment_id, - "net_risk_score": r.net_risk_score, + "total_rounds": r.total_rounds, + "severity_trajectory": r.severity_trajectory, + "initial_risk_score": r.initial_risk_score, + "final_risk_score": r.net_risk_score, + "risk_reduction": r.risk_reduction, + "converged": r.converged, "status": r.status, "original_text": r.original_text, - "red_team": { - "executive_summary": r.plaintiff_analysis.executive_summary, - "highest_severity": r.plaintiff_analysis.highest_severity, - "attack_vectors": [ - { - "title": v.title, - "severity": v.severity, - "vulnerability_type": v.vulnerability_type, - "clause_reference": v.clause_reference, - "description": v.description, - "legal_theory": v.legal_theory, - "exploitation_scenario": v.exploitation_scenario, - "estimated_exposure": v.estimated_exposure, - } - for v in r.plaintiff_analysis.attack_vectors - ], - }, - "blue_team": { - "fully_hardened_clause": r.defense_analysis.fully_hardened_clause, - "confidence_level": r.defense_analysis.confidence_level, - "residual_risk": r.defense_analysis.residual_risk, - "remedies": [ - { - "attack_vector_title": rem.attack_vector_title, - "hardened_language": rem.hardened_language, - "rationale": rem.rationale, - } - for rem in r.defense_analysis.remedies - ], - }, + "final_hardened_text": r.final_hardened_text, + "rounds": [ + { + "round_number": rnd.round_number, + "clause_text_attacked": rnd.clause_text, + "red_team": { + "highest_severity": rnd.attack.highest_severity, + "executive_summary": rnd.attack.executive_summary, + "attack_vectors": [ + { + "title": v.title, + "severity": v.severity, + "vulnerability_type": v.vulnerability_type, + "clause_reference": v.clause_reference, + "description": v.description, + "legal_theory": v.legal_theory, + "exploitation_scenario": v.exploitation_scenario, + "estimated_exposure": v.estimated_exposure, + } + for v in rnd.attack.attack_vectors + ], + }, + "blue_team": { + "fully_hardened_clause": rnd.defense.fully_hardened_clause, + "confidence_level": rnd.defense.confidence_level, + "residual_risk": rnd.defense.residual_risk, + "remedies": [ + { + "attack_vector_title": rem.attack_vector_title, + "hardened_language": rem.hardened_language, + "rationale": rem.rationale, + } + for rem in rnd.defense.remedies + ], + }, + } + for rnd in r.rounds + ], } -def _global_summary(reports: List[SegmentReport]) -> dict: +def _global_summary(reports: List[IterativeSegmentReport]) -> dict: if not reports: return {} - scores = [r.net_risk_score for r in reports] - all_vectors = [v for r in reports for v in r.plaintiff_analysis.attack_vectors] + all_initial = [ + v for r in reports for v in (r.rounds[0].attack.attack_vectors if r.rounds else []) + ] + all_final = [ + v for r in reports for v in (r.rounds[-1].attack.attack_vectors if r.rounds else []) + ] return { "total_segments": len(reports), - "total_attack_vectors": len(all_vectors), + "total_rounds": sum(r.total_rounds for r in reports), + "total_attack_vectors": sum( + sum(len(rnd.attack.attack_vectors) for rnd in r.rounds) for r in reports + ), + "converged_segments": sum(1 for r in reports if r.converged), "critical_segments": sum(1 for r in reports if r.status == "CRITICAL"), "requires_review_segments": sum(1 for r in reports if r.status == "REQUIRES_REVIEW"), "hardened_segments": sum(1 for r in reports if r.status == "HARDENED"), - "average_risk_score": sum(scores) / len(scores), - "peak_severity": max((v.severity for v in all_vectors), default=0), + "avg_risk_reduction": sum(r.risk_reduction for r in reports) / len(reports), + "peak_initial_severity": max((v.severity for v in all_initial), default=0), + "peak_final_severity": max((v.severity for v in all_final), default=0), } # --------------------------------------------------------------------------- -# HTML rendering +# HTML # --------------------------------------------------------------------------- -_SEVERITY_HEX = {1: "#22c55e", 2: "#eab308", 3: "#f97316", 4: "#ef4444", 5: "#991b1b"} -_STATUS_HEX = {"HARDENED": "#22c55e", "REQUIRES_REVIEW": "#eab308", "CRITICAL": "#dc2626"} +def save_html(reports: List[IterativeSegmentReport], output_path: str | Path) -> Path: + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(_render_html(reports), encoding="utf-8") + console.print(f"[green]✓ HTML report saved:[/green] {path}") + return path -def _render_html(reports: List[SegmentReport]) -> str: +def _render_html(reports: List[IterativeSegmentReport]) -> str: s = _global_summary(reports) segments_html = "\n".join(_segment_html(r) for r in reports) generated = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") @@ -250,98 +243,94 @@ def _render_html(reports: List[SegmentReport]) -> str: return f""" - -Legal War Game Report +Legal War Game Report

⚖️ Autonomous Legal War Game — Simulation Alpha

Generated: {generated}

-

Global Statistics

{s['total_segments']}
Segments
+
{s['total_rounds']}
Total Rounds
{s['total_attack_vectors']}
Attack Vectors
+
{s['converged_segments']}
Converged
{s['critical_segments']}
Critical
-
{s['requires_review_segments']}
Requires Review
-
{s['hardened_segments']}
Hardened
-
{s['average_risk_score']:.0f}
Avg Risk Score
-
{s['peak_severity']}/5
Peak Severity
+
{s['avg_risk_reduction']:.0f} pts
Avg Risk Reduction
+
{s['peak_initial_severity']}→{s['peak_final_severity']}
Peak Severity
-

Segment Reports

{segments_html} +
Autonomous Legal War Game — For stress-testing purposes only. Not legal advice.
+""" -
Autonomous Legal War Game — For stress-testing purposes only. Not legal advice.
- -""" - - -def _segment_html(r: SegmentReport) -> str: - status_color = _STATUS_HEX.get(r.status, "#fff") - css_class = r.status.lower().replace("_", "_") - attacks_html = "\n".join(_attack_html(v) for v in r.plaintiff_analysis.attack_vectors) +def _segment_html(r: IterativeSegmentReport) -> str: + color = _STATUS_HEX.get(r.status, "#fff") + traj = "".join( + f'
{s}
' + + ('' if i < len(r.severity_trajectory) - 1 else "") + for i, s in enumerate(r.severity_trajectory) + ) + rounds_html = "\n".join(_round_html(rnd) for rnd in r.rounds) + conv_badge = "Converged" if r.converged else "" return f""" -
+

{r.segment_id} - {r.status} - Risk {r.net_risk_score}/100 + {r.status} + Risk {r.initial_risk_score}→{r.net_risk_score} (-{r.risk_reduction}) + {r.total_rounds} round(s) + {conv_badge}

-
- Original Text -
{_esc(r.original_text)}
-
- -

🔴 Red Team — {len(r.plaintiff_analysis.attack_vectors)} Attack Vector(s)

-

{_esc(r.plaintiff_analysis.executive_summary)}

- {attacks_html} - -

🔵 Blue Team — Hardened Clause

-
{_esc(r.defense_analysis.fully_hardened_clause)}
-

Residual Risk: {_esc(r.defense_analysis.residual_risk)}

-

Defense Confidence: {r.defense_analysis.confidence_level}

+
{traj}
+
Original clause
{_esc(r.original_text)}
+ {rounds_html} +

Final Hardened Clause

+
{_esc(r.final_hardened_text)}
""" -def _attack_html(v: AttackVector) -> str: - color = _SEVERITY_HEX.get(v.severity, "#fff") +def _round_html(rnd) -> str: + attacks = "".join( + f'
' + f'[{v.severity}] {_esc(v.title)}' + f' — {_esc(v.vulnerability_type)}' + f'

{_esc(v.description)}

' + f'

Exposure: {_esc(v.estimated_exposure)}

' + for v in rnd.attack.attack_vectors + ) return f""" -
- [SEV {v.severity}] {_esc(v.title)} - — {_esc(v.vulnerability_type)} -

{_esc(v.description)}

-

Exposure: {_esc(v.estimated_exposure)}

+
+

Round {rnd.round_number} + 🔴 {len(rnd.attack.attack_vectors)} vector(s) · max sev {rnd.attack.highest_severity}/5 +  |  + 🔵 Confidence: {rnd.defense.confidence_level} +

+

{_esc(rnd.attack.executive_summary)}

+ {attacks} +
+ Defense rewrite (round {rnd.round_number}) +
{_esc(rnd.defense.fully_hardened_clause)}
+

Residual risk: {_esc(rnd.defense.residual_risk)}

+
""" -def _esc(text: str) -> str: - return ( - text - .replace("&", "&") - .replace("<", "<") - .replace(">", ">") - .replace('"', """) - ) +def _esc(t: str) -> str: + return t.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """)