diff --git a/legal_warroom/.env.example b/legal_warroom/.env.example new file mode 100644 index 00000000..53c10307 --- /dev/null +++ b/legal_warroom/.env.example @@ -0,0 +1 @@ +ANTHROPIC_API_KEY=sk-ant-... diff --git a/legal_warroom/main.py b/legal_warroom/main.py new file mode 100644 index 00000000..529aef55 --- /dev/null +++ b/legal_warroom/main.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +""" +Autonomous Legal War Game — CLI Entry Point + +Usage examples: + + # Anthropic (cloud) — full simulation, 3 rounds per segment + python main.py agreement.pdf + + # Ollama (local, free) — default model llama3.1:8b + python main.py agreement.pdf --provider ollama + + # Ollama with a specific model + more rounds + python main.py agreement.pdf --provider ollama --model qwen2.5:14b --rounds 4 + + # Mix providers: Plaintiff on Ollama, Defense on Anthropic + python main.py agreement.pdf \\ + --plaintiff-provider ollama --plaintiff-model qwen2.5:14b \\ + --defense-provider anthropic + + # Parallel + HTML report + python main.py agreement.pdf --parallel --html + + # Dry run: only first 2 segments + python main.py agreement.pdf --max-segments 2 + + # Full help + python main.py --help + +Ollama setup: + 1. Install Ollama: https://ollama.com + 2. Pull a model: ollama pull qwen2.5:14b + 3. Run: python main.py agreement.pdf --provider ollama --model qwen2.5:14b + +Recommended Ollama models (best → fastest): + qwen2.5:14b — best local quality for legal reasoning (~9GB) + qwen2.5:7b — good quality (~5GB) + llama3.1:8b — solid baseline (~5GB) + llama3.2:3b — fastest, lower quality (~2GB) +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +import typer +from dotenv import load_dotenv +from rich.console import Console + +from warroom import orchestrator +from warroom.providers.base import make_provider +from warroom.report import generator + +load_dotenv() +app = typer.Typer(add_completion=False, rich_markup_mode="rich") +console = Console() + + +@app.command() +def main( + document: str = typer.Argument( + ..., help="Path to the legal document (.pdf or .txt)" + ), + # ── Provider shortcuts (same provider for both agents) ───────────────── + provider: str = typer.Option( + "anthropic", + "--provider", "-p", + help="Backend for both agents: 'anthropic' (cloud) or 'ollama' (local).", + ), + model: Optional[str] = typer.Option( + None, + "--model", "-m", + help=( + "Model to use. Defaults: anthropic=claude-opus-4-6, ollama=llama3.1:8b. " + "Override: --model qwen2.5:14b" + ), + ), + # ── Fine-grained per-agent provider overrides ────────────────────────── + plaintiff_provider: Optional[str] = typer.Option( + None, "--plaintiff-provider", + help="Provider override for the Red Team agent.", + ), + plaintiff_model: Optional[str] = typer.Option( + None, "--plaintiff-model", + help="Model override for the Red Team agent.", + ), + defense_provider: Optional[str] = typer.Option( + None, "--defense-provider", + help="Provider override for the Blue Team agent.", + ), + defense_model: Optional[str] = typer.Option( + None, "--defense-model", + help="Model override for the Blue Team agent.", + ), + # ── Ollama config ────────────────────────────────────────────────────── + ollama_url: str = typer.Option( + "http://localhost:11434/v1", + "--ollama-url", + help="Ollama API base URL.", + ), + # ── Simulation config ────────────────────────────────────────────────── + rounds: int = typer.Option( + 3, "--rounds", "-r", + help="Max adversarial rounds per segment (default 3).", + ), + convergence: int = typer.Option( + 2, "--convergence", + help=( + "Stop iterating when max severity drops to this level or below " + "(after ≥2 rounds). Default 2." + ), + ), + words: int = typer.Option( + 800, "--words", "-w", + help="Soft word-count cap per segment (default 800).", + ), + parallel: bool = typer.Option( + False, "--parallel", + help="Process segments concurrently (faster, higher API concurrency).", + ), + workers: int = typer.Option( + 3, "--workers", + help="Max parallel threads when --parallel is set.", + ), + # ── Output config ────────────────────────────────────────────────────── + output_dir: str = typer.Option( + "output", "--output", "-o", + help="Directory for report files.", + ), + html: bool = typer.Option( + False, "--html", + help="Also generate a self-contained HTML report.", + ), + max_segments: Optional[int] = typer.Option( + None, "--max-segments", + help="Limit to the first N segments (useful for dry-runs).", + ), +) -> None: + """ + [bold cyan]Autonomous Legal War Game[/bold cyan] — M&A contract stress-testing. + + Runs a [bold red]Plaintiff Agent (Red Team)[/bold red] against a + [bold green]Defense Agent (Blue Team)[/bold green] across multiple rounds + per clause. Each round, the Plaintiff re-attacks the Defense's latest + hardened rewrite until convergence or max rounds is reached. + + Supports [bold]Anthropic (cloud)[/bold] and [bold]Ollama (local, free)[/bold]. + """ + doc_path = Path(document) + if not doc_path.exists(): + console.print(f"[bold red]Error:[/bold red] File not found: {doc_path}") + raise typer.Exit(code=1) + + # ── Build providers ────────────────────────────────────────────────── + # Per-agent overrides take precedence; fall back to --provider / --model + p_provider = plaintiff_provider or provider + p_model = plaintiff_model or model + d_provider = defense_provider or provider + d_model = defense_model or model + + try: + pp = _build_provider(p_provider, p_model, ollama_url) + dp = _build_provider(d_provider, d_model, ollama_url) + except (ValueError, ImportError) as exc: + console.print(f"[bold red]Provider error:[/bold red] {exc}") + raise typer.Exit(code=1) + + # ── Run simulation ──────────────────────────────────────────────────── + reports = orchestrator.run_simulation( + document_path=str(doc_path), + plaintiff_provider=pp, + defense_provider=dp, + words_per_segment=words, + max_rounds=rounds, + convergence_threshold=convergence, + parallel=parallel, + max_workers=workers, + ) + + if max_segments is not None: + reports = reports[:max_segments] + + if not reports: + console.print("[yellow]No segments produced. Check your document.[/yellow]") + raise typer.Exit(code=1) + + # ── Output ──────────────────────────────────────────────────────────── + generator.print_terminal_summary(reports) + + out_dir = Path(output_dir) + stem = doc_path.stem + generator.save_json(reports, out_dir / f"{stem}_warroom_report.json") + + if html: + generator.save_html(reports, out_dir / f"{stem}_warroom_report.html") + + # Exit code 2 signals CRITICAL findings (useful in CI / review pipelines) + has_critical = any(r.status == "CRITICAL" for r in reports) + if has_critical: + console.print( + "\n[bold red]⚠ CRITICAL vulnerabilities remain.[/bold red] " + "Manual legal review required before proceeding." + ) + raise typer.Exit(code=2) + + console.print( + "\n[bold green]✓ Simulation complete.[/bold green] " + f"Reports in [cyan]{out_dir}/[/cyan]" + ) + + +def _build_provider(name: str, model: Optional[str], ollama_url: str): + """Build a provider, injecting the Ollama URL when needed.""" + if name.lower() == "ollama": + from warroom.providers.ollama_p import OllamaProvider + return OllamaProvider( + model=model or "llama3.1:8b", + base_url=ollama_url, + ) + return make_provider(name, model) + + +if __name__ == "__main__": + app() diff --git a/legal_warroom/requirements.txt b/legal_warroom/requirements.txt new file mode 100644 index 00000000..3f249777 --- /dev/null +++ b/legal_warroom/requirements.txt @@ -0,0 +1,8 @@ +anthropic>=0.40.0 +openai>=1.0.0 # Required for Ollama provider (OpenAI-compatible API) +pydantic>=2.0.0 +pdfplumber>=0.10.0 +pypdf>=4.0.0 +rich>=13.0.0 +typer>=0.12.0 +python-dotenv>=1.0.0 diff --git a/legal_warroom/warroom/__init__.py b/legal_warroom/warroom/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/agents/__init__.py b/legal_warroom/warroom/agents/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/agents/defense.py b/legal_warroom/warroom/agents/defense.py new file mode 100644 index 00000000..7597acdb --- /dev/null +++ b/legal_warroom/warroom/agents/defense.py @@ -0,0 +1,99 @@ +""" +Defense Agent — Blue Team + +Receives the clause under attack and the Plaintiff's analysis, then +returns a hardened rewrite. Accepts any LLMProvider. +""" + +from __future__ import annotations + +from ..models.schemas import DefenseAnalysis, PlaintiffAnalysis +from ..providers.base import LLMProvider + +DEFENSE_SYSTEM = """\ +You are the Defense Counsel Agent (Blue Team) and lead drafter for the \ +acquiring party in a high-stakes Mergers & Acquisitions transaction. + +JURISDICTION: Standard US corporate law, contract law precedents, and Delaware \ +Court of Chancery standards. + +OBJECTIVE: Fortify the contract against every vulnerability identified by the \ +Plaintiff Agent while preserving the original business intent of the deal. + +EXECUTION DIRECTIVES: +1. PRECISION REDRAFTING — Rewrite exploited clauses with absolute semantic \ + precision. Close every loophole in the Plaintiff's attack report. +2. RISK MITIGATION — Inject: + • Exact numeric definitions (no vague qualifiers without explicit anchors) + • Explicit liability caps with stated carve-outs + • Severability and savings clauses where appropriate + • Clear governing law and exclusive jurisdiction provisions + • Knowledge qualifiers only where commercially necessary, with defined \ + Knowledge Persons + • No "and/or" — use "and" or "or" explicitly +3. INTENT PRESERVATION — Do NOT alter the underlying financial or operational \ + agreement. Only alter the legal execution of that agreement. If a business \ + term cannot be hardened without changing its substance, flag it in \ + residual_risk. +4. DRAFTING STANDARDS — Formal contract English, active voice preferred, \ + sequential sub-clause numbering, all new terms defined inline. + +If this is a re-hardening in round 2+: also address any new vulnerabilities \ +the Plaintiff found in your previous rewrite. +""" + + +def run( + provider: LLMProvider, + clause_text: str, + plaintiff_analysis: PlaintiffAnalysis, + segment_id: str, + round_number: int = 1, +) -> DefenseAnalysis: + """ + Harden a clause against the Plaintiff's attack and return DefenseAnalysis. + + Args: + provider: Any LLMProvider (Anthropic, Ollama, …). + clause_text: The clause being defended (may be a prior hardened rewrite). + plaintiff_analysis: Output from the Plaintiff Agent this round. + segment_id: Identifier used for logging. + round_number: Current round number. + """ + attack_summary = _format_attack_vectors(plaintiff_analysis) + + user_message = ( + f"[SEGMENT: {segment_id} | ROUND: {round_number}]\n\n" + f"━━━ CLAUSE TO HARDEN ━━━\n" + f"{clause_text}\n\n" + f"━━━ PLAINTIFF ATTACK REPORT ━━━\n" + f"{attack_summary}\n\n" + "Produce your defense report with fully hardened clause language." + ) + + return provider.complete_structured( + system=DEFENSE_SYSTEM, + messages=[{"role": "user", "content": user_message}], + schema=DefenseAnalysis, + max_tokens=12288, + ) + + +def _format_attack_vectors(analysis: PlaintiffAnalysis) -> str: + lines = [ + f"Executive Summary: {analysis.executive_summary}", + f"Highest Severity: {analysis.highest_severity}/5", + "", + "Attack Vectors (highest severity first):", + ] + for i, v in enumerate(analysis.attack_vectors, 1): + lines += [ + f"\n[{i}] {v.title}", + f" Severity: {v.severity}/5 ({v.vulnerability_type})", + f" Clause Ref: {v.clause_reference}", + f" Description: {v.description}", + f" Legal Theory:{v.legal_theory}", + f" Scenario: {v.exploitation_scenario}", + f" Exposure: {v.estimated_exposure}", + ] + return "\n".join(lines) diff --git a/legal_warroom/warroom/agents/plaintiff.py b/legal_warroom/warroom/agents/plaintiff.py new file mode 100644 index 00000000..26db3660 --- /dev/null +++ b/legal_warroom/warroom/agents/plaintiff.py @@ -0,0 +1,82 @@ +""" +Plaintiff Agent — Red Team + +Attacks a given clause (original OR a previously hardened rewrite) and +returns a structured PlaintiffAnalysis. Accepts any LLMProvider so it +works identically with Anthropic or Ollama. +""" + +from __future__ import annotations + +from ..models.schemas import PlaintiffAnalysis +from ..providers.base import LLMProvider + +PLAINTIFF_SYSTEM = """\ +You are the Plaintiff Counsel Agent (Red Team) in the Autonomous Legal War Game. +You represent a hostile, highly litigious entity — a predatory acquirer, a \ +disgruntled shareholder, or a regulator with unlimited resources — seeking to \ +exploit, break, or extract punitive damages from the provided contract language. + +JURISDICTION: Standard US corporate law, contract law precedents, and Delaware \ +Court of Chancery standards. + +OBJECTIVE: Perform a ruthless, exhaustive analysis of the provided clause. + +EXECUTION DIRECTIVES: +1. HUNT FOR AMBIGUITY — Identify poorly defined terms, vague timelines, \ + contradictory obligations, and undefined conditions precedent. +2. EXPLOIT INDEMNIFICATION & LIABILITY — Find scenarios where the drafting \ + party is exposed to uncapped financial risk, breach of warranty, or \ + third-party liabilities. +3. STRESS-TEST EDGE CASES — Formulate highly improbable but legally plausible \ + "black swan" scenarios the current language fails to protect against. +4. ATTACK DEFINITIONS — Challenge every defined term. If it is absent, \ + over-broad, or internally inconsistent, flag it. + +If this is a re-attack on an already-hardened clause: look for NEW \ +vulnerabilities introduced by the rewrite, and re-evaluate whether previously \ +identified vulnerabilities were truly closed. + +Do not fabricate specific case citations. Reference legal doctrines only. + +SEVERITY SCALE: + 1 = Minor ambiguity, negligible consequence + 2 = Moderate risk, localised financial exposure + 3 = Significant exposure, likely litigation target + 4 = Severe vulnerability, deal-threatening if exploited + 5 = Catastrophic structural failure — renders clause unenforceable or \ + exposes party to unlimited liability +""" + + +def run( + provider: LLMProvider, + clause_text: str, + segment_id: str, + round_number: int = 1, +) -> PlaintiffAnalysis: + """ + Attack a clause and return a validated PlaintiffAnalysis. + + Args: + provider: Any LLMProvider (Anthropic, Ollama, …). + clause_text: The contract text to attack. May be the original clause + or a previously hardened rewrite (in round 2+). + segment_id: Identifier used for logging. + round_number: Current round number (1 = first attack on original text). + """ + label = "ORIGINAL CLAUSE" if round_number == 1 else f"HARDENED CLAUSE (round {round_number - 1} output)" + + user_message = ( + f"[SEGMENT: {segment_id} | ROUND: {round_number}]\n\n" + f"━━━ {label} (to be attacked) ━━━\n" + f"{clause_text}\n\n" + "Analyse the above clause and produce your attack report." + ) + + return provider.complete_structured( + system=PLAINTIFF_SYSTEM, + messages=[{"role": "user", "content": user_message}], + schema=PlaintiffAnalysis, + max_tokens=8192, + ) diff --git a/legal_warroom/warroom/document/__init__.py b/legal_warroom/warroom/document/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/document/processor.py b/legal_warroom/warroom/document/processor.py new file mode 100644 index 00000000..5c407ccf --- /dev/null +++ b/legal_warroom/warroom/document/processor.py @@ -0,0 +1,191 @@ +""" +Document ingestion and segmentation. + +Supports plain text (.txt) and PDF (.pdf) inputs. +Splits the document into semantically meaningful segments for the +adversarial pipeline. Each segment is sized to fit comfortably within +the model's context window while still representing a coherent legal +unit (clause, section, or a fixed-word-count chunk). +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from pathlib import Path +from typing import List + + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + +@dataclass +class DocumentSegment: + segment_id: str # e.g. "seg_001" + page_hint: str # e.g. "Pages 12-15" or "Chunk 3" + text: str # Raw clause / section text + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def load_and_segment( + file_path: str | Path, + words_per_segment: int = 800, +) -> List[DocumentSegment]: + """ + Load a legal document and return a list of DocumentSegments. + + Strategy: + 1. Try to split on legal section headers first + (e.g. "Section 4.", "ARTICLE V", "4.3 Representations"). + 2. Fall back to fixed-word-count chunks if no headers are detected. + + Args: + file_path: Path to a .pdf or .txt file. + words_per_segment: Soft maximum words per segment (default 800). + The model will receive this plus the agent system + prompts, so keep this well below 5 000 words. + """ + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"Document not found: {path}") + + if path.suffix.lower() == ".pdf": + raw_text = _extract_pdf(path) + elif path.suffix.lower() in (".txt", ".md"): + raw_text = path.read_text(encoding="utf-8") + else: + raise ValueError(f"Unsupported file type: {path.suffix}. Use .pdf or .txt") + + # Clean up whitespace artefacts from PDF extraction + raw_text = _clean_text(raw_text) + + # Attempt section-aware splitting + sections = _split_by_section_headers(raw_text) + if len(sections) >= 3: + segments = _merge_short_sections(sections, words_per_segment) + else: + # No clear headers — fall back to word-count chunks + segments = _chunk_by_words(raw_text, words_per_segment) + + return [ + DocumentSegment( + segment_id=f"seg_{i + 1:03d}", + page_hint=f"Segment {i + 1} of {len(segments)}", + text=seg.strip(), + ) + for i, seg in enumerate(segments) + if seg.strip() + ] + + +def load_raw_text(file_path: str | Path) -> str: + """Return the full, cleaned document text (no segmentation).""" + path = Path(file_path) + if path.suffix.lower() == ".pdf": + return _clean_text(_extract_pdf(path)) + return _clean_text(path.read_text(encoding="utf-8")) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _extract_pdf(path: Path) -> str: + """Extract text from a PDF using pdfplumber (preferred) or pypdf.""" + try: + import pdfplumber + + pages: List[str] = [] + with pdfplumber.open(path) as pdf: + for page in pdf.pages: + text = page.extract_text() or "" + pages.append(text) + return "\n\n".join(pages) + except ImportError: + pass + + try: + from pypdf import PdfReader + + reader = PdfReader(str(path)) + pages = [page.extract_text() or "" for page in reader.pages] + return "\n\n".join(pages) + except ImportError: + raise ImportError( + "No PDF library found. Install pdfplumber or pypdf:\n" + " pip install pdfplumber" + ) + + +# Matches common legal section headers: +# "Section 4.", "4.3", "ARTICLE V", "ARTICLE 5", "4.", "(a)", etc. +_SECTION_HEADER_RE = re.compile( + r"(?m)^(?:" + r"(?:Section|SECTION|Article|ARTICLE)\s+[\dA-Z]+[\.\s]" # Section 4. / ARTICLE V + r"|(?:\d+\.){1,3}\s" # 4.3 or 4.3.1 + r"|\d+\.\s+[A-Z]" # 4. Representations + r")" +) + + +def _split_by_section_headers(text: str) -> List[str]: + """Split text at detected legal section header boundaries.""" + boundaries = [m.start() for m in _SECTION_HEADER_RE.finditer(text)] + if not boundaries: + return [] + + chunks: List[str] = [] + for i, start in enumerate(boundaries): + end = boundaries[i + 1] if i + 1 < len(boundaries) else len(text) + chunks.append(text[start:end]) + return chunks + + +def _merge_short_sections(sections: List[str], max_words: int) -> List[str]: + """ + Merge consecutive short sections so every returned chunk is roughly + max_words in size. This avoids sending dozens of 20-word blurbs. + """ + merged: List[str] = [] + buffer = "" + for section in sections: + candidate = (buffer + "\n\n" + section).strip() + if len(candidate.split()) <= max_words: + buffer = candidate + else: + if buffer: + merged.append(buffer) + buffer = section.strip() + if buffer: + merged.append(buffer) + return merged + + +def _chunk_by_words(text: str, max_words: int) -> List[str]: + """Naive fixed-size word-count chunking with a 10% overlap.""" + words = text.split() + overlap = max(1, max_words // 10) + chunks: List[str] = [] + start = 0 + while start < len(words): + end = min(start + max_words, len(words)) + chunks.append(" ".join(words[start:end])) + if end == len(words): + break + start = end - overlap + return chunks + + +def _clean_text(text: str) -> str: + """Normalise whitespace and remove common PDF extraction artefacts.""" + # Collapse runs of spaces/tabs to a single space + text = re.sub(r"[ \t]{2,}", " ", text) + # Collapse 3+ consecutive blank lines to 2 + text = re.sub(r"\n{3,}", "\n\n", text) + # Remove form-feed characters + text = text.replace("\f", "\n") + return text.strip() diff --git a/legal_warroom/warroom/loop/__init__.py b/legal_warroom/warroom/loop/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/loop/adversarial.py b/legal_warroom/warroom/loop/adversarial.py new file mode 100644 index 00000000..a7b5dc9a --- /dev/null +++ b/legal_warroom/warroom/loop/adversarial.py @@ -0,0 +1,161 @@ +""" +Multi-Round Adversarial Loop + +This is the core of the automatic agent interaction. + +What happens each round: + 1. Plaintiff Agent attacks the CURRENT clause text + (round 1 = original, round 2+ = the previous Defense rewrite) + 2. Defense Agent hardens the current clause against those specific attacks + 3. The hardened clause becomes the input for the next round + +The loop continues until: + • Max rounds reached (configurable, default 3) + • Convergence detected: severity dropped to ≤ convergence_threshold + AND at least 2 rounds have run (so we always do at least one full exchange) + • All attack vectors are severity 1 (nothing meaningful left to harden) + +This means the agents are genuinely reacting to each other's output: + - Plaintiff sees what Defense wrote and looks for NEW vulnerabilities in it + - Defense sees what Plaintiff found in its own previous rewrite and patches again +""" + +from __future__ import annotations + +from rich.console import Console + +from ..agents import plaintiff, defense +from ..document.processor import DocumentSegment +from ..models.schemas import AdversarialRound, IterativeSegmentReport +from ..providers.base import LLMProvider + +console = Console() + + +def run_adversarial_loop( + plaintiff_provider: LLMProvider, + defense_provider: LLMProvider, + segment: DocumentSegment, + max_rounds: int = 3, + convergence_threshold: int = 2, +) -> IterativeSegmentReport: + """ + Run the full multi-round adversarial simulation for one document segment. + + Args: + plaintiff_provider: Provider for the Red Team agent. + defense_provider: Provider for the Blue Team agent. + Can be the same provider as plaintiff_provider. + segment: The document segment to stress-test. + max_rounds: Maximum number of Red→Blue exchanges (default 3). + convergence_threshold: Stop early if max severity drops to this level + or below after at least 2 rounds (default 2). + + Returns: + IterativeSegmentReport containing all rounds and the final hardened text. + """ + rounds: list[AdversarialRound] = [] + current_clause = segment.text + + for round_num in range(1, max_rounds + 1): + _log_round_start(segment.segment_id, round_num, max_rounds) + + # ── Red Team attacks ────────────────────────────────────────────── + console.print( + f" [red]●[/red] [bold]Plaintiff Agent[/bold] attacking " + f"{'original clause' if round_num == 1 else 'hardened clause'}…" + ) + attack = plaintiff.run( + provider=plaintiff_provider, + clause_text=current_clause, + segment_id=segment.segment_id, + round_number=round_num, + ) + console.print( + f" Found [bold red]{len(attack.attack_vectors)}[/bold red] attack " + f"vector(s) — max severity [bold]{attack.highest_severity}[/bold]/5" + ) + + # ── Blue Team hardens ───────────────────────────────────────────── + console.print( + f" [green]●[/green] [bold]Defense Agent[/bold] hardening clause…" + ) + hardened = defense.run( + provider=defense_provider, + clause_text=current_clause, + plaintiff_analysis=attack, + segment_id=segment.segment_id, + round_number=round_num, + ) + console.print( + f" Hardened — confidence: [bold]{hardened.confidence_level}[/bold]" + ) + + # Record this round + rounds.append( + AdversarialRound( + round_number=round_num, + clause_text=current_clause, + attack=attack, + defense=hardened, + ) + ) + + # The next round attacks the freshly hardened clause + current_clause = hardened.fully_hardened_clause + + # ── Convergence check ───────────────────────────────────────────── + if round_num >= 2: + if attack.highest_severity <= convergence_threshold: + console.print( + f"\n [cyan]✓ Converged[/cyan] — severity dropped to " + f"{attack.highest_severity} ≤ threshold {convergence_threshold}. " + f"Stopping after {round_num} round(s).\n" + ) + break + + if attack.highest_severity == 1: + console.print( + f"\n [cyan]✓ No meaningful vulnerabilities remain[/cyan] " + f"(all severity 1). Stopping after {round_num} round(s).\n" + ) + break + + report = IterativeSegmentReport( + segment_id=segment.segment_id, + original_text=segment.text, + final_hardened_text=current_clause, + rounds=rounds, + ) + + _log_round_summary(report) + return report + + +# --------------------------------------------------------------------------- +# Logging helpers +# --------------------------------------------------------------------------- + +def _log_round_start(segment_id: str, round_num: int, max_rounds: int) -> None: + console.print( + f"\n [bold cyan]Round {round_num}/{max_rounds}[/bold cyan] " + f"[dim]({segment_id})[/dim]" + ) + + +def _log_round_summary(report: IterativeSegmentReport) -> None: + traj = report.severity_trajectory + arrow = " → ".join(str(s) for s in traj) + status_style = { + "HARDENED": "bold green", + "REQUIRES_REVIEW": "bold yellow", + "CRITICAL": "bold red", + }.get(report.status, "white") + + console.print( + f"\n [{status_style}]■ {report.segment_id} complete[/] " + f"Severity trajectory: {arrow} | " + f"Risk: {report.initial_risk_score} → {report.net_risk_score} " + f"(-{report.risk_reduction} pts) | " + f"Status: [{status_style}]{report.status}[/]\n" + ) diff --git a/legal_warroom/warroom/models/__init__.py b/legal_warroom/warroom/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/models/schemas.py b/legal_warroom/warroom/models/schemas.py new file mode 100644 index 00000000..cd455b83 --- /dev/null +++ b/legal_warroom/warroom/models/schemas.py @@ -0,0 +1,208 @@ +""" +Pydantic schemas for structured outputs from each agent, plus +data classes that track the full multi-round adversarial simulation. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List + +from pydantic import BaseModel, Field + + +# --------------------------------------------------------------------------- +# Plaintiff Agent (Red Team) — structured output +# --------------------------------------------------------------------------- + +class AttackVector(BaseModel): + clause_reference: str = Field( + description="The specific clause, sub-clause, or defined term being attacked " + "(e.g., 'Section 7.3(b)', 'Definition of Material Adverse Effect')." + ) + vulnerability_type: str = Field( + description=( + "Category of vulnerability. One of: AMBIGUITY | INDEMNIFICATION_GAP | " + "LIABILITY_EXPOSURE | DEFINITION_FAILURE | EDGE_CASE | " + "JURISDICTIONAL_CONFLICT | WAIVER_TRAP | REPRESENTATION_BREACH" + ) + ) + severity: int = Field( + description=( + "Integer 1-5. " + "1=Minor ambiguity, negligible consequence. " + "2=Moderate risk, localised financial exposure. " + "3=Significant exposure, likely litigation target. " + "4=Severe vulnerability, deal-threatening if exploited. " + "5=Catastrophic structural failure — unenforceable or unlimited liability." + ) + ) + title: str = Field(description="Short, descriptive title for this attack vector.") + description: str = Field( + description="Precise legal description of why this language is vulnerable." + ) + legal_theory: str = Field( + description=( + "The legal doctrine or principle enabling this attack " + "(e.g., contra proferentem, implied duty of good faith, Delaware MAE standards). " + "Do not fabricate specific case citations." + ) + ) + exploitation_scenario: str = Field( + description=( + "A concrete scenario — including black-swan edge cases — showing how a " + "hostile party would exploit this vulnerability in litigation." + ) + ) + estimated_exposure: str = Field( + description=( + "Estimated financial exposure or legal consequence if successfully exploited " + "(e.g., 'uncapped indemnification liability', 'rescission of entire transaction')." + ) + ) + + +class PlaintiffAnalysis(BaseModel): + attack_vectors: List[AttackVector] = Field( + description="All identified attack vectors, ordered highest-severity first." + ) + highest_severity: int = Field( + description="The highest severity integer (1-5) among all attack vectors." + ) + executive_summary: str = Field( + description=( + "2-4 sentence executive summary of the clause's overall vulnerability " + "profile from the plaintiff's perspective." + ) + ) + + +# --------------------------------------------------------------------------- +# Defense Agent (Blue Team) — structured output +# --------------------------------------------------------------------------- + +class DefenseRemedy(BaseModel): + attack_vector_title: str = Field( + description="Exact title of the attack vector being neutralised." + ) + hardened_language: str = Field( + description=( + "The rewritten clause language that closes this specific vulnerability. " + "Precise, legally sound, formal contract English." + ) + ) + rationale: str = Field( + description=( + "Explanation of how the rewritten language neutralises the plaintiff's " + "attack, referencing the specific legal theory." + ) + ) + + +class DefenseAnalysis(BaseModel): + fully_hardened_clause: str = Field( + description=( + "The complete, integrated rewritten clause incorporating all remediations. " + "Must preserve the original business intent of the agreement." + ) + ) + remedies: List[DefenseRemedy] = Field( + description="Per-attack-vector remediation details." + ) + residual_risk: str = Field( + description=( + "Any remaining risk that cannot be fully mitigated without altering " + "the business terms. If none, state 'None identified.'" + ) + ) + confidence_level: str = Field( + description="Defense counsel's confidence: HIGH | MEDIUM | LOW" + ) + + +# --------------------------------------------------------------------------- +# Adversarial loop tracking — Python dataclasses (not Pydantic, not sent to API) +# --------------------------------------------------------------------------- + +@dataclass +class AdversarialRound: + """One complete Red → Blue exchange within the multi-round loop.""" + round_number: int + clause_text: str # The clause that was attacked THIS round + attack: PlaintiffAnalysis + defense: DefenseAnalysis + + +@dataclass +class IterativeSegmentReport: + """ + Full report for one document segment after all adversarial rounds. + + The 'rounds' list records every Red→Blue exchange so reviewers can + see how the clause evolved across iterations. + """ + segment_id: str + original_text: str + final_hardened_text: str # The clause text after the last Defense pass + rounds: List[AdversarialRound] = field(default_factory=list) + + # ── Computed properties ───────────────────────────────────────────── + + @property + def total_rounds(self) -> int: + return len(self.rounds) + + @property + def severity_trajectory(self) -> list[int]: + """Max severity per round — shows convergence over time.""" + return [r.attack.highest_severity for r in self.rounds] + + @property + def converged(self) -> bool: + """True if severity dropped at least 2 points across the simulation.""" + traj = self.severity_trajectory + return len(traj) >= 2 and (traj[0] - traj[-1]) >= 2 + + @property + def final_attack(self) -> PlaintiffAnalysis | None: + return self.rounds[-1].attack if self.rounds else None + + @property + def final_defense(self) -> DefenseAnalysis | None: + return self.rounds[-1].defense if self.rounds else None + + @property + def net_risk_score(self) -> int: + """Risk score (0-100) based on the FINAL round's attack severity.""" + if not self.rounds: + return 0 + vectors = self.rounds[-1].attack.attack_vectors + if not vectors: + return 0 + avg = sum(v.severity for v in vectors) / len(vectors) + return min(100, round(avg * 20)) + + @property + def initial_risk_score(self) -> int: + """Risk score of the FIRST round (before any hardening).""" + if not self.rounds: + return 0 + vectors = self.rounds[0].attack.attack_vectors + if not vectors: + return 0 + avg = sum(v.severity for v in vectors) / len(vectors) + return min(100, round(avg * 20)) + + @property + def risk_reduction(self) -> int: + """Points reduced: initial_risk_score - net_risk_score.""" + return max(0, self.initial_risk_score - self.net_risk_score) + + @property + def status(self) -> str: + score = self.net_risk_score + if score >= 80: + return "CRITICAL" + if score >= 50: + return "REQUIRES_REVIEW" + return "HARDENED" diff --git a/legal_warroom/warroom/orchestrator.py b/legal_warroom/warroom/orchestrator.py new file mode 100644 index 00000000..2af89d9f --- /dev/null +++ b/legal_warroom/warroom/orchestrator.py @@ -0,0 +1,162 @@ +""" +Orchestrator — entry point for the simulation. + +Responsibilities: + • Load and segment the document. + • For each segment, run the multi-round adversarial loop. + • Support sequential and parallel (thread-pool) processing. + • Accept any LLMProvider(s) — Anthropic, Ollama, or mixed. +""" + +from __future__ import annotations + +import concurrent.futures +from typing import Callable, List, Optional + +from rich.console import Console +from rich.table import Table + +from .document.processor import DocumentSegment, load_and_segment +from .loop.adversarial import run_adversarial_loop +from .models.schemas import IterativeSegmentReport +from .providers.base import LLMProvider + +console = Console() + + +def run_simulation( + document_path: str, + plaintiff_provider: LLMProvider, + defense_provider: LLMProvider, + words_per_segment: int = 800, + max_rounds: int = 3, + convergence_threshold: int = 2, + parallel: bool = False, + max_workers: int = 3, + on_segment_complete: Optional[Callable[[IterativeSegmentReport], None]] = None, +) -> List[IterativeSegmentReport]: + """ + Run the full Legal War Game simulation. + + Args: + document_path: Path to .pdf or .txt file. + plaintiff_provider: LLMProvider for the Red Team. + defense_provider: LLMProvider for the Blue Team (can be same). + words_per_segment: Soft word-count cap per chunk (default 800). + max_rounds: Max adversarial rounds per segment (default 3). + convergence_threshold: Stop early when severity ≤ this (default 2). + parallel: Process segments concurrently via thread pool. + max_workers: Thread-pool size when parallel=True. + on_segment_complete: Optional callback after each segment. + + Returns: + List[IterativeSegmentReport], one per segment, in document order. + """ + console.rule("[bold cyan]AUTONOMOUS LEGAL WAR GAME — SIMULATION ALPHA[/bold cyan]") + console.print( + f"\n[bold]Document:[/bold] {document_path}\n" + f"[bold]Plaintiff model:[/bold] {plaintiff_provider.model}\n" + f"[bold]Defense model:[/bold] {defense_provider.model}\n" + f"[bold]Max rounds/segment:[/bold] {max_rounds}\n" + f"[bold]Parallel:[/bold] {parallel}\n" + ) + + # ── Segment ───────────────────────────────────────────────────────────── + with console.status("[yellow]Ingesting and segmenting document…"): + segments = load_and_segment(document_path, words_per_segment) + + console.print( + f"[green]✓[/green] Segmented into [bold]{len(segments)}[/bold] clause blocks\n" + ) + _print_segment_table(segments) + + # ── Run ───────────────────────────────────────────────────────────────── + if parallel and len(segments) > 1: + return _run_parallel( + plaintiff_provider, defense_provider, + segments, max_rounds, convergence_threshold, + max_workers, on_segment_complete, + ) + return _run_sequential( + plaintiff_provider, defense_provider, + segments, max_rounds, convergence_threshold, + on_segment_complete, + ) + + +# --------------------------------------------------------------------------- +# Sequential +# --------------------------------------------------------------------------- + +def _run_sequential( + pp: LLMProvider, + dp: LLMProvider, + segments: List[DocumentSegment], + max_rounds: int, + threshold: int, + on_complete: Optional[Callable], +) -> List[IterativeSegmentReport]: + reports: List[IterativeSegmentReport] = [] + for i, seg in enumerate(segments, 1): + console.rule( + f"[cyan]Segment {i}/{len(segments)} — {seg.segment_id}[/cyan]", + style="dim", + ) + report = run_adversarial_loop(pp, dp, seg, max_rounds, threshold) + reports.append(report) + if on_complete: + on_complete(report) + return reports + + +# --------------------------------------------------------------------------- +# Parallel +# --------------------------------------------------------------------------- + +def _run_parallel( + pp: LLMProvider, + dp: LLMProvider, + segments: List[DocumentSegment], + max_rounds: int, + threshold: int, + max_workers: int, + on_complete: Optional[Callable], +) -> List[IterativeSegmentReport]: + console.print( + f"[bold yellow]Parallel mode:[/bold yellow] " + f"up to {max_workers} concurrent segments.\n" + ) + results: dict[str, IterativeSegmentReport] = {} + + def _process(seg: DocumentSegment) -> IterativeSegmentReport: + return run_adversarial_loop(pp, dp, seg, max_rounds, threshold) + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_map = {executor.submit(_process, seg): seg for seg in segments} + for future in concurrent.futures.as_completed(future_map): + seg = future_map[future] + try: + report = future.result() + results[seg.segment_id] = report + if on_complete: + on_complete(report) + except Exception as exc: + console.print(f"[red]ERROR[/red] {seg.segment_id}: {exc}") + + return [results[seg.segment_id] for seg in segments if seg.segment_id in results] + + +# --------------------------------------------------------------------------- +# Display +# --------------------------------------------------------------------------- + +def _print_segment_table(segments: List[DocumentSegment]) -> None: + table = Table(title="Document Segments", show_lines=True) + table.add_column("ID", style="cyan", no_wrap=True) + table.add_column("Words", justify="right") + table.add_column("Preview", max_width=80) + for seg in segments: + preview = seg.text[:120].replace("\n", " ") + ("…" if len(seg.text) > 120 else "") + table.add_row(seg.segment_id, str(len(seg.text.split())), preview) + console.print(table) + console.print() diff --git a/legal_warroom/warroom/providers/__init__.py b/legal_warroom/warroom/providers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/providers/anthropic_p.py b/legal_warroom/warroom/providers/anthropic_p.py new file mode 100644 index 00000000..76e0d0f8 --- /dev/null +++ b/legal_warroom/warroom/providers/anthropic_p.py @@ -0,0 +1,62 @@ +""" +Anthropic provider — uses the official anthropic SDK. + +Features used: + • claude-opus-4-6 with adaptive thinking for deep legal reasoning + • client.messages.parse() for schema-validated structured outputs + • Streaming-safe: max_tokens capped at 12 288 (no streaming needed at this size) +""" + +from __future__ import annotations + +from typing import Type, TypeVar + +import anthropic +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +class AnthropicProvider: + def __init__(self, model: str = "claude-opus-4-6") -> None: + self._client = anthropic.Anthropic() + self._model = model + + @property + def model(self) -> str: + return self._model + + def complete( + self, + system: str, + messages: list[dict], + max_tokens: int = 4096, + ) -> str: + response = self._client.messages.create( + model=self._model, + max_tokens=max_tokens, + thinking={"type": "adaptive"}, + system=system, + messages=messages, + ) + for block in response.content: + if block.type == "text": + return block.text + return "" + + def complete_structured( + self, + system: str, + messages: list[dict], + schema: Type[T], + max_tokens: int = 8192, + ) -> T: + response = self._client.messages.parse( + model=self._model, + max_tokens=max_tokens, + thinking={"type": "adaptive"}, + system=system, + messages=messages, + output_format=schema, + ) + return response.parsed_output diff --git a/legal_warroom/warroom/providers/base.py b/legal_warroom/warroom/providers/base.py new file mode 100644 index 00000000..18093c2d --- /dev/null +++ b/legal_warroom/warroom/providers/base.py @@ -0,0 +1,81 @@ +""" +Provider abstraction layer. + +Every backend (Anthropic, Ollama, …) implements LLMProvider. +The rest of the codebase talks only to this interface, so swapping +models or endpoints requires zero changes to agent logic. + +Internal message format is identical to the OpenAI chat API: + {"role": "user"|"assistant"|"system", "content": "..."} +This is the most portable format and maps cleanly to both APIs. +""" + +from __future__ import annotations + +from typing import Protocol, Type, TypeVar +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +class LLMProvider(Protocol): + """ + Minimal interface every backend must implement. + + Two methods: + complete() — free-form text response + complete_structured() — guaranteed Pydantic-model-shaped response + """ + + @property + def model(self) -> str: + """Human-readable model identifier (used for logging).""" + ... + + def complete( + self, + system: str, + messages: list[dict], + max_tokens: int = 4096, + ) -> str: + """ + Send system + messages and return the assistant's text reply. + """ + ... + + def complete_structured( + self, + system: str, + messages: list[dict], + schema: Type[T], + max_tokens: int = 8192, + ) -> T: + """ + Send system + messages and return a validated Pydantic instance. + The provider is responsible for enforcing the schema. + """ + ... + + +def make_provider(provider_name: str, model: str | None = None) -> LLMProvider: + """ + Factory — returns the right provider based on name string. + + Args: + provider_name: "anthropic" or "ollama" + model: Optional model override. Defaults vary per provider. + + Usage: + provider = make_provider("anthropic") + provider = make_provider("ollama", model="qwen2.5:14b") + """ + name = provider_name.lower().strip() + if name == "anthropic": + from .anthropic_p import AnthropicProvider + return AnthropicProvider(model=model or "claude-opus-4-6") + if name == "ollama": + from .ollama_p import OllamaProvider + return OllamaProvider(model=model or "llama3.1:8b") + raise ValueError( + f"Unknown provider '{provider_name}'. Choose 'anthropic' or 'ollama'." + ) diff --git a/legal_warroom/warroom/providers/ollama_p.py b/legal_warroom/warroom/providers/ollama_p.py new file mode 100644 index 00000000..cbaa94c6 --- /dev/null +++ b/legal_warroom/warroom/providers/ollama_p.py @@ -0,0 +1,109 @@ +""" +Ollama provider — run the entire war game 100% locally, no API costs. + +Uses Ollama's OpenAI-compatible REST endpoint (default: localhost:11434). + +Recommended models (support JSON mode and produce coherent legal output): + • qwen2.5:14b — best quality for legal reasoning locally + • qwen2.5:7b — good quality, fits on most consumer GPUs + • llama3.1:8b — solid, widely available + • llama3.2:3b — fast, lower quality + • mistral:7b — good instruction following + +Install a model: + ollama pull qwen2.5:14b + +Structured outputs: + Ollama supports response_format={"type": "json_object"} for JSON mode. + The schema is injected into the system prompt so the model knows the shape. + The response is then validated via Pydantic — if parsing fails, a clear + error is raised telling the user to try a larger/better model. +""" + +from __future__ import annotations + +import json +from typing import Type, TypeVar + +from pydantic import BaseModel, ValidationError + +T = TypeVar("T", bound=BaseModel) + +# Injected at the end of every structured-output system prompt +_SCHEMA_INSTRUCTION = ( + "\n\n━━━ OUTPUT FORMAT (STRICT) ━━━\n" + "You MUST respond with a single valid JSON object that exactly matches " + "the schema below. Do NOT include markdown fences, prose, or any text " + "outside the JSON object.\n\n" + "Schema:\n{schema}" +) + + +class OllamaProvider: + def __init__( + self, + model: str = "llama3.1:8b", + base_url: str = "http://localhost:11434/v1", + ) -> None: + try: + from openai import OpenAI + except ImportError as exc: + raise ImportError( + "The 'openai' package is required for the Ollama provider.\n" + "Install it with: pip install openai" + ) from exc + + self._client = OpenAI(base_url=base_url, api_key="ollama") + self._model = model + + @property + def model(self) -> str: + return self._model + + def complete( + self, + system: str, + messages: list[dict], + max_tokens: int = 4096, + ) -> str: + all_msgs = ([{"role": "system", "content": system}] if system else []) + messages + response = self._client.chat.completions.create( + model=self._model, + messages=all_msgs, + max_tokens=max_tokens, + temperature=0.2, + ) + return response.choices[0].message.content or "" + + def complete_structured( + self, + system: str, + messages: list[dict], + schema: Type[T], + max_tokens: int = 8192, + ) -> T: + schema_json = json.dumps(schema.model_json_schema(), indent=2) + enhanced_system = system + _SCHEMA_INSTRUCTION.format(schema=schema_json) + + all_msgs = ( + [{"role": "system", "content": enhanced_system}] + messages + ) + + response = self._client.chat.completions.create( + model=self._model, + messages=all_msgs, + max_tokens=max_tokens, + temperature=0.1, + response_format={"type": "json_object"}, + ) + raw = response.choices[0].message.content or "{}" + + try: + return schema.model_validate_json(raw) + except ValidationError as exc: + raise ValueError( + f"Ollama model '{self._model}' returned JSON that does not match " + f"the expected schema.\n" + f"Try a larger model (e.g. qwen2.5:14b) for better compliance.\n" + f"Validation errors:\n{exc}" + ) from exc diff --git a/legal_warroom/warroom/report/__init__.py b/legal_warroom/warroom/report/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/legal_warroom/warroom/report/generator.py b/legal_warroom/warroom/report/generator.py new file mode 100644 index 00000000..ab286e45 --- /dev/null +++ b/legal_warroom/warroom/report/generator.py @@ -0,0 +1,336 @@ +""" +Report Generator + +Produces: + 1. Rich terminal summary with multi-round progression tables. + 2. JSON file — full structured data for every round of every segment. + 3. HTML file — dark-themed, self-contained, shows severity trajectory. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import List + +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +from ..models.schemas import IterativeSegmentReport + +console = Console() + +_SEVERITY_STYLE = {1: "green", 2: "yellow", 3: "orange3", 4: "red", 5: "bold red"} +_STATUS_STYLE = { + "HARDENED": "bold green", + "REQUIRES_REVIEW": "bold yellow", + "CRITICAL": "bold red", +} +_SEVERITY_HEX = {1: "#22c55e", 2: "#eab308", 3: "#f97316", 4: "#ef4444", 5: "#991b1b"} +_STATUS_HEX = {"HARDENED": "#22c55e", "REQUIRES_REVIEW": "#eab308", "CRITICAL": "#dc2626"} + + +# --------------------------------------------------------------------------- +# Terminal +# --------------------------------------------------------------------------- + +def print_terminal_summary(reports: List[IterativeSegmentReport]) -> None: + console.rule("\n[bold cyan]WAR GAME RESULTS — FINAL REPORT[/bold cyan]") + _print_overview_table(reports) + for report in reports: + _print_segment_detail(report) + _print_global_stats(reports) + + +def _print_overview_table(reports: List[IterativeSegmentReport]) -> None: + table = Table(title="Segment Overview", show_lines=True, expand=True) + table.add_column("Segment", style="cyan", no_wrap=True) + table.add_column("Rounds", justify="center") + table.add_column("Severity Trajectory", justify="center") + table.add_column("Risk (start→end)", justify="center") + table.add_column("Reduction", justify="right") + table.add_column("Status", justify="center") + table.add_column("Converged?", justify="center") + + for r in reports: + traj = " → ".join(str(s) for s in r.severity_trajectory) + status_style = _STATUS_STYLE.get(r.status, "white") + conv = "[green]Yes[/green]" if r.converged else "[yellow]No[/yellow]" + table.add_row( + r.segment_id, + str(r.total_rounds), + traj, + f"{r.initial_risk_score} → {r.net_risk_score}", + f"-{r.risk_reduction}", + Text(r.status, style=status_style), + conv, + ) + console.print(table) + + +def _print_segment_detail(report: IterativeSegmentReport) -> None: + status_style = _STATUS_STYLE.get(report.status, "white") + initial_sev = report.rounds[0].attack.highest_severity if report.rounds else 1 + console.print( + Panel( + f"[bold]Segment:[/bold] {report.segment_id} " + f"[bold]Rounds:[/bold] {report.total_rounds} " + f"[bold]Risk:[/bold] {report.initial_risk_score} → {report.net_risk_score} " + f"[bold]Status:[/bold] [{status_style}]{report.status}[/]", + title=f"[bold cyan]─── {report.segment_id} ───[/bold cyan]", + border_style=_SEVERITY_STYLE.get(initial_sev, "white"), + ) + ) + + excerpt = report.original_text[:300].replace("\n", " ") + if len(report.original_text) > 300: + excerpt += "…" + console.print(f"[dim]Original:[/dim] {excerpt}\n") + + for rnd in report.rounds: + console.print( + f"[bold]Round {rnd.round_number}[/bold] " + f"[red]Red Team — {len(rnd.attack.attack_vectors)} vector(s) " + f"(max sev {rnd.attack.highest_severity}/5)[/red]" + ) + console.print(f" [dim]{rnd.attack.executive_summary}[/dim]") + for v in rnd.attack.attack_vectors: + s = _SEVERITY_STYLE.get(v.severity, "white") + console.print(f" [{s}][{v.severity}][/] {v.title} — {v.estimated_exposure}") + console.print( + f"\n [green]Blue Team — Confidence: {rnd.defense.confidence_level}[/green]" + ) + console.print(f" [dim]Residual risk: {rnd.defense.residual_risk}[/dim]\n") + + console.print("[bold green]Final Hardened Clause:[/bold green]") + console.print(report.final_hardened_text) + console.rule(style="dim") + + +def _print_global_stats(reports: List[IterativeSegmentReport]) -> None: + s = _global_summary(reports) + console.rule("[bold cyan]GLOBAL STATISTICS[/bold cyan]") + console.print( + f" Segments processed: {s['total_segments']}\n" + f" Total rounds run: {s['total_rounds']}\n" + f" Total attack vectors: {s['total_attack_vectors']}\n" + f" Segments converged: {s['converged_segments']}\n" + f" Critical remaining: {s['critical_segments']}\n" + f" Requires review: {s['requires_review_segments']}\n" + f" Fully hardened: {s['hardened_segments']}\n" + f" Avg risk reduction: {s['avg_risk_reduction']:.1f} pts\n" + f" Peak initial severity: {s['peak_initial_severity']}/5\n" + f" Peak final severity: {s['peak_final_severity']}/5\n" + ) + + +# --------------------------------------------------------------------------- +# JSON +# --------------------------------------------------------------------------- + +def save_json(reports: List[IterativeSegmentReport], output_path: str | Path) -> Path: + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "simulation": "Autonomous Legal War Game — Simulation Alpha", + "generated_at": datetime.now(timezone.utc).isoformat(), + "segment_count": len(reports), + "segments": [_segment_to_dict(r) for r in reports], + "summary": _global_summary(reports), + } + path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + console.print(f"\n[green]✓ JSON report saved:[/green] {path}") + return path + + +def _segment_to_dict(r: IterativeSegmentReport) -> dict: + return { + "segment_id": r.segment_id, + "total_rounds": r.total_rounds, + "severity_trajectory": r.severity_trajectory, + "initial_risk_score": r.initial_risk_score, + "final_risk_score": r.net_risk_score, + "risk_reduction": r.risk_reduction, + "converged": r.converged, + "status": r.status, + "original_text": r.original_text, + "final_hardened_text": r.final_hardened_text, + "rounds": [ + { + "round_number": rnd.round_number, + "clause_text_attacked": rnd.clause_text, + "red_team": { + "highest_severity": rnd.attack.highest_severity, + "executive_summary": rnd.attack.executive_summary, + "attack_vectors": [ + { + "title": v.title, + "severity": v.severity, + "vulnerability_type": v.vulnerability_type, + "clause_reference": v.clause_reference, + "description": v.description, + "legal_theory": v.legal_theory, + "exploitation_scenario": v.exploitation_scenario, + "estimated_exposure": v.estimated_exposure, + } + for v in rnd.attack.attack_vectors + ], + }, + "blue_team": { + "fully_hardened_clause": rnd.defense.fully_hardened_clause, + "confidence_level": rnd.defense.confidence_level, + "residual_risk": rnd.defense.residual_risk, + "remedies": [ + { + "attack_vector_title": rem.attack_vector_title, + "hardened_language": rem.hardened_language, + "rationale": rem.rationale, + } + for rem in rnd.defense.remedies + ], + }, + } + for rnd in r.rounds + ], + } + + +def _global_summary(reports: List[IterativeSegmentReport]) -> dict: + if not reports: + return {} + all_initial = [ + v for r in reports for v in (r.rounds[0].attack.attack_vectors if r.rounds else []) + ] + all_final = [ + v for r in reports for v in (r.rounds[-1].attack.attack_vectors if r.rounds else []) + ] + return { + "total_segments": len(reports), + "total_rounds": sum(r.total_rounds for r in reports), + "total_attack_vectors": sum( + sum(len(rnd.attack.attack_vectors) for rnd in r.rounds) for r in reports + ), + "converged_segments": sum(1 for r in reports if r.converged), + "critical_segments": sum(1 for r in reports if r.status == "CRITICAL"), + "requires_review_segments": sum(1 for r in reports if r.status == "REQUIRES_REVIEW"), + "hardened_segments": sum(1 for r in reports if r.status == "HARDENED"), + "avg_risk_reduction": sum(r.risk_reduction for r in reports) / len(reports), + "peak_initial_severity": max((v.severity for v in all_initial), default=0), + "peak_final_severity": max((v.severity for v in all_final), default=0), + } + + +# --------------------------------------------------------------------------- +# HTML +# --------------------------------------------------------------------------- + +def save_html(reports: List[IterativeSegmentReport], output_path: str | Path) -> Path: + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(_render_html(reports), encoding="utf-8") + console.print(f"[green]✓ HTML report saved:[/green] {path}") + return path + + +def _render_html(reports: List[IterativeSegmentReport]) -> str: + s = _global_summary(reports) + segments_html = "\n".join(_segment_html(r) for r in reports) + generated = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + + return f""" + + +Legal War Game Report + + + +

⚖️ Autonomous Legal War Game — Simulation Alpha

+

Generated: {generated}

+

Global Statistics

+
+
{s['total_segments']}
Segments
+
{s['total_rounds']}
Total Rounds
+
{s['total_attack_vectors']}
Attack Vectors
+
{s['converged_segments']}
Converged
+
{s['critical_segments']}
Critical
+
{s['avg_risk_reduction']:.0f} pts
Avg Risk Reduction
+
{s['peak_initial_severity']}→{s['peak_final_severity']}
Peak Severity
+
+

Segment Reports

+{segments_html} + +""" + + +def _segment_html(r: IterativeSegmentReport) -> str: + color = _STATUS_HEX.get(r.status, "#fff") + traj = "".join( + f'
{s}
' + + ('' if i < len(r.severity_trajectory) - 1 else "") + for i, s in enumerate(r.severity_trajectory) + ) + rounds_html = "\n".join(_round_html(rnd) for rnd in r.rounds) + conv_badge = "Converged" if r.converged else "" + return f""" +
+

{r.segment_id} + {r.status} + Risk {r.initial_risk_score}→{r.net_risk_score} (-{r.risk_reduction}) + {r.total_rounds} round(s) + {conv_badge} +

+
{traj}
+
Original clause
{_esc(r.original_text)}
+ {rounds_html} +

Final Hardened Clause

+
{_esc(r.final_hardened_text)}
+
""" + + +def _round_html(rnd) -> str: + attacks = "".join( + f'
' + f'[{v.severity}] {_esc(v.title)}' + f' — {_esc(v.vulnerability_type)}' + f'

{_esc(v.description)}

' + f'

Exposure: {_esc(v.estimated_exposure)}

' + for v in rnd.attack.attack_vectors + ) + return f""" +
+

Round {rnd.round_number} + 🔴 {len(rnd.attack.attack_vectors)} vector(s) · max sev {rnd.attack.highest_severity}/5 +  |  + 🔵 Confidence: {rnd.defense.confidence_level} +

+

{_esc(rnd.attack.executive_summary)}

+ {attacks} +
+ Defense rewrite (round {rnd.round_number}) +
{_esc(rnd.defense.fully_hardened_clause)}
+

Residual risk: {_esc(rnd.defense.residual_risk)}

+
+
""" + + +def _esc(t: str) -> str: + return t.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """)