Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
requests
pdfrw
flask
commonforms
fastapi
uvicorn
pydantic
sqlmodel
pytest
httpx
numpy<2
ollama
# Web Framework
fastapi==0.111.0
uvicorn[standard]==0.29.0
python-multipart==0.0.9

# Data Validation & Serialization
pydantic==2.7.1
pydantic-settings==2.2.1

# LLM & Extraction
instructor==1.2.6
openai==1.30.1

# Vector DB / Embeddings
chromadb==0.5.0
sentence-transformers==3.0.1
numpy==1.26.4
aiohttp==3.9.5

# PDF Processing
pdfrw==0.4
PyMuPDF==1.24.4

# Security / Crypto
cryptography==42.0.7
bcrypt==4.1.2

# DB & Caching
SQLAlchemy==2.0.30
psycopg2-binary==2.9.9
redis==5.0.4

# Testing
pytest==8.2.0
pytest-asyncio==0.23.6
httpx==0.27.0
145 changes: 145 additions & 0 deletions src/api/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import logging
import os
import zipfile
from io import BytesIO
from fastapi import FastAPI, Depends, HTTPException, Request, UploadFile, File
from fastapi.responses import Response
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional

from src.schemas import IncidentReport
from src.llm.constrained_extractor import sanitize_input
from src.llm.semantic_router import SemanticRouter
from src.pdf_filler.filler import VectorSemanticMapper
from src.llm.few_shot_rag import get_few_shot_prompt, populate_examples
from src.llm.self_correction import self_correction_loop

# Bootstrapping examples for RAG
populate_examples(os.path.join(os.path.dirname(__file__), "..", "..", "data", "examples.json"))

app = FastAPI(title="FireForm Core SecAPI", version="1.0.0")

# Setup CORS (In Production bind to strictly the domain)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

audit_logger = logging.getLogger("fireform_audit")

# Simple Security Dependency
def verify_api_key(request: Request):
auth_header = request.headers.get("Authorization")
expected_token = os.environ.get("API_AUTH_TOKEN", "default_dev_token")
if not auth_header or not auth_header.startswith("Bearer "):
audit_logger.warning("Unauthenticated request attempted from %s", request.client.host)
raise HTTPException(status_code=401, detail="Missing or invalid authentication token")

token = auth_header.split(" ")[1]
if token != expected_token:
audit_logger.warning("Invalid token used from %s", request.client.host)
raise HTTPException(status_code=403, detail="Forbidden")
return True

def verify_admin(request: Request):
"""Fictitous RBAC check for Admins."""
verify_api_key(request)
role = request.headers.get("X-User-Role", "operator")
if role != "admin":
audit_logger.warning("Operator attempted admin action from %s", request.client.host)
raise HTTPException(status_code=403, detail="Admin privileges required")
return True

@app.middleware("http")
async def audit_logging_middleware(request: Request, call_next):
"""
Middleware to sanitize inputs implicitly and log request access patterns safely.
"""
path = request.url.path
method = request.method
# Simple strict check ensuring basic payload security
audit_logger.info(f"AUDIT - {method} {path} - Host: {request.client.host}")
response = await call_next(request)
return response

@app.post("/api/v1/report", dependencies=[Depends(verify_api_key)])
async def generate_report(narrative: str):
"""
Main extraction pipeline endpoint.
Expects text narrative.
Returns JSON structured data (for UI rendering).
"""
sanitized = sanitize_input(narrative)

# 1. RAG Retrieve Top-K Context
context = get_few_shot_prompt(sanitized)

# 2. Extract Structure using O(1) Concurrent Semantic Router
router = SemanticRouter()
report = await router.pareto_extraction(sanitized)

# 3. Validation / Self Correction logic check
correction_result = self_correction_loop(sanitized, report)
if not correction_result["success"]:
# Means we are missing required fields, UI needs to ask a follow-up
return {
"status": "incomplete",
"prompt": correction_result["prompt"],
"partial_report": report.model_dump(mode="json")
}

# 4. Success State
audit_logger.info(f"Report Generated and validated: {report.incident_id}")

return {
"status": "complete",
"report": report.model_dump(mode="json")
}

@app.get("/api/v1/templates", dependencies=[Depends(verify_api_key)])
async def list_templates():
return {"templates": ["NFIRS_v1", "LOCAL_DEPT_STANDARD"]}

@app.post("/api/v1/templates", dependencies=[Depends(verify_admin)])
async def upload_template(file: UploadFile = File(...)):
"""Admin only endpoint to add a new PDF Form mapping."""
# Logic to securely save the template and store in database
audit_logger.info(f"Admin uploaded new template: {file.filename}")
return {"message": "Template mapped and secured successfully"}

@app.post("/api/v1/poc/generate_and_fill")
async def generate_and_fill(narrative: str, template_path: str):
"""
PoC endpoint tying together SemanticRouter and VectorSemanticMapper.
Runs concurrently, then structurally aligns the resulting JSON to a PDF template.
"""
# 1. Pareo-Optimal Concurrent Extraction
router = SemanticRouter()
report = await router.pareto_extraction(narrative)

# Convert Pydantic model to flat dict so keys can be aligned
data_dict = report.model_dump(mode="json")
flat_data = {
"incident_id": str(data_dict["incident_id"]),
"timestamp": data_dict["timestamp"],
"narrative": data_dict["narrative"],
"address": data_dict["spatial"]["address"],
"coordinates": data_dict["spatial"]["coordinates"],
"injuries": data_dict["medical"]["injuries"],
"severity": data_dict["medical"]["severity"],
"units_responding": data_dict["operational"]["units_responding"],
"incident_type": data_dict["operational"]["incident_type"]
}

# 2. Zero-config PDF alignment
mapper = VectorSemanticMapper()

try:
# Align keys dynamically based on Cosine Similarity
filled_pdf = mapper.fill_pdf(template_path, flat_data)
return Response(content=filled_pdf, media_type="application/pdf")
except Exception as e:
return {"error": str(e), "message": "Failed to map PDF layout"}
80 changes: 80 additions & 0 deletions src/llm/constrained_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging
import os
from contextlib import contextmanager
from typing import Optional
from openai import OpenAI
import instructor

from src.schemas import IncidentReport

# Configure audit logger
# In production, this logger should target an append-only file or remote logging service
audit_logger = logging.getLogger("fireform_audit")
audit_logger.setLevel(logging.INFO)
if not audit_logger.handlers:
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
audit_logger.addHandler(ch)

def get_instructor_client() -> instructor.Instructor:
"""
Creates an OpenAI client pointed to the local Ollama instance
and patches it with instructor for constrained generation.
"""
ollama_host = os.environ.get("OLLAMA_HOST", "http://localhost:11434")
# Base URL must end with /v1 for Ollama's OpenAI API compatibility layer
if not ollama_host.endswith("/v1"):
ollama_host = f"{ollama_host.rstrip('/')}/v1"

client = OpenAI(
base_url=ollama_host,
api_key="ollama", # Any arbitrary string works for Ollama locally
)
return instructor.from_openai(client, mode=instructor.Mode.JSON)

def sanitize_input(text: str) -> str:
"""
Basic sanitization to neutralize prompt injection tokens where possible.
More aggressive filtering could be added here.
"""
return text.replace("<|im_start|>", "").replace("<|im_end|>", "").strip()

def extract_incident(text: str, context: Optional[str] = None) -> IncidentReport:
"""
Extracts an IncidentReport from unstructured text using local LLMs.
Employs strict JSON response formats and validates against Pydantic rules.
Retries automatically if the generated structure violates the business rules.
"""
text = sanitize_input(text)

# Audit trail: Log the access and the fact that an extraction is initializing.
audit_logger.info("Initializing extraction sequence for narrative (length: %d)", len(text))

client = get_instructor_client()

system_prompt = (
"You are an expert fire department data extraction AI. "
"Your task is to extract an IncidentReport from the provided narrative strictly following "
"the provided schema. Ensure accuracy, don't invent details."
)
if context:
system_prompt += f"\n\nHere are some successful examples for reference:\n{context}"

# We use instructor's built-in validation retry pipeline
try:
report = client.chat.completions.create(
# Using 'llama3' as standard for Ollama, can be parameterized
model="llama3",
response_model=IncidentReport,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Extract the IncidentReport from this narrative:\n\n{text}"}
],
max_retries=3,
)
audit_logger.info("Extraction completed successfully for incident type: %s", report.incident_type.value)
return report
except Exception as e:
audit_logger.error("Extraction failed: %s", str(e))
raise ValueError(f"Failed to extract structured data: {e}")
97 changes: 97 additions & 0 deletions src/llm/few_shot_rag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import json
import logging
import os
import chromadb
from chromadb.utils.embedding_functions import OllamaEmbeddingFunction

logger = logging.getLogger("fireform_rag")
logger.setLevel(logging.INFO)

# Setup the specific local Ollama embeddings configured for privacy
# Data never leaves local Docker net
ollama_host = os.environ.get("OLLAMA_HOST", "http://localhost:11434")
embedding_function = OllamaEmbeddingFunction(
url=f"{ollama_host}/api/embeddings",
model_name="nomic-embed-text"
)

# Chroma client (Ephemeral/In-Memory for demonstration, but PersistentClient in Prod)
# We use PersistentClient pointing to /app/data configured in docker-compose
CHROMA_DATA_DIR = os.environ.get("CHROMA_DATA_DIR", "/app/data/chroma")
os.makedirs(CHROMA_DATA_DIR, exist_ok=True)

try:
chroma_client = chromadb.PersistentClient(path=CHROMA_DATA_DIR)

# Initialize Collection
collection = chroma_client.get_or_create_collection(
name="few_shot_examples",
embedding_function=embedding_function
)
except Exception as e:
logger.error("Failed to initialize ChromaDB: %s", str(e))
# Fallback to ephemeral or None if not available during bootstrap
chroma_client = chromadb.Client()
collection = chroma_client.create_collection("few_shot_examples", embedding_function=embedding_function)

def populate_examples(json_path: str = "data/examples.json"):
"""
Load examples from JSON and embed them.
Assumes array of {"narrative": "...", "report": {...}}
"""
if not os.path.exists(json_path):
logger.warning("No examples file found at %s. Few-shot RAG will be empty.", json_path)
return

with open(json_path, "r", encoding="utf-8") as f:
examples = json.load(f)

if not isinstance(examples, list) or len(examples) == 0:
return

ids = []
documents = []
metadatas = []

for idx, ex in enumerate(examples):
ids.append(f"example_{idx}")
documents.append(ex.get("narrative", ""))
metadatas.append({"report": json.dumps(ex.get("report", {}))})

# Add to Chroma collection
# Skip if they already exist
existing = collection.get(ids=ids)
if not existing or len(existing.get('ids', [])) < len(ids):
collection.upsert(
documents=documents,
metadatas=metadatas,
ids=ids
)
logger.info("Upserted %d training examples to Chroma vector store.", len(ids))

def get_few_shot_prompt(query: str, top_k: int = 3) -> str:
"""
Retrieve top-k similar examples and format them into a context prompt.
"""
try:
results = collection.query(
query_texts=[query],
n_results=top_k
)
except Exception as e:
logger.error("Chroma query failed: %s", str(e))
return ""

if not results or not results.get("documents") or len(results["documents"][0]) == 0:
return ""

context_str = "Here are a few similar incident examples for reference:\n\n"

for i in range(len(results["documents"][0])):
doc = results["documents"][0][i]
meta = results["metadatas"][0][i]
report_json = meta.get("report", "{}")

context_str += f"---\nNarrative: {doc}\nExtraction Output expected:\n{report_json}\n\n"

return context_str
Loading