From a141fd8830f006d9392f0d5097eae12b618dd404 Mon Sep 17 00:00:00 2001 From: Blazzzeee Date: Sat, 28 Mar 2026 18:36:13 +0530 Subject: [PATCH 1/2] push final code --- the_fumblers/heimdall/.gitignore | 4 + the_fumblers/heimdall/api.py | 765 ++++++++++++++++++ the_fumblers/heimdall/app/__init__.py | 0 the_fumblers/heimdall/app/auth.py | 20 + the_fumblers/heimdall/app/main.py | 157 ++++ the_fumblers/heimdall/app/models.py | 89 ++ the_fumblers/heimdall/app/ops.py | 126 +++ the_fumblers/heimdall/app/store.py | 8 + the_fumblers/heimdall/db.py | 112 +++ the_fumblers/heimdall/discord_bot/bot.py | 328 ++++++++ .../heimdall/examples/api_service/flake.lock | 61 ++ .../heimdall/examples/api_service/flake.nix | 44 + .../heimdall/examples/api_service/main.py | 25 + the_fumblers/heimdall/fastapi_agent/main.py | 340 ++++++++ .../heimdall/fastapi_agent/state.json | 22 + .../heimdall/fastapi_agent/test_app.py | 21 + .../heimdall/fastapi_agent/test_main.py | 43 + the_fumblers/heimdall/flake.lock | 27 + the_fumblers/heimdall/flake.nix | 23 + the_fumblers/heimdall/node.py | 13 + the_fumblers/heimdall/requirements.txt | 8 + the_fumblers/heimdall/start.sh | 105 +++ the_fumblers/heimdall/test.py | 246 ++++++ the_fumblers/heimdall/test_orchaestrator.py | 140 ++++ the_fumblers/heimdall/test_webhook.py | 89 ++ 25 files changed, 2816 insertions(+) create mode 100644 the_fumblers/heimdall/.gitignore create mode 100644 the_fumblers/heimdall/api.py create mode 100644 the_fumblers/heimdall/app/__init__.py create mode 100644 the_fumblers/heimdall/app/auth.py create mode 100644 the_fumblers/heimdall/app/main.py create mode 100644 the_fumblers/heimdall/app/models.py create mode 100644 the_fumblers/heimdall/app/ops.py create mode 100644 the_fumblers/heimdall/app/store.py create mode 100644 the_fumblers/heimdall/db.py create mode 100644 the_fumblers/heimdall/discord_bot/bot.py create mode 100644 the_fumblers/heimdall/examples/api_service/flake.lock create mode 100644 the_fumblers/heimdall/examples/api_service/flake.nix create mode 100644 the_fumblers/heimdall/examples/api_service/main.py create mode 100755 the_fumblers/heimdall/fastapi_agent/main.py create mode 100644 the_fumblers/heimdall/fastapi_agent/state.json create mode 100755 the_fumblers/heimdall/fastapi_agent/test_app.py create mode 100755 the_fumblers/heimdall/fastapi_agent/test_main.py create mode 100644 the_fumblers/heimdall/flake.lock create mode 100644 the_fumblers/heimdall/flake.nix create mode 100644 the_fumblers/heimdall/node.py create mode 100644 the_fumblers/heimdall/requirements.txt create mode 100755 the_fumblers/heimdall/start.sh create mode 100644 the_fumblers/heimdall/test.py create mode 100644 the_fumblers/heimdall/test_orchaestrator.py create mode 100644 the_fumblers/heimdall/test_webhook.py diff --git a/the_fumblers/heimdall/.gitignore b/the_fumblers/heimdall/.gitignore new file mode 100644 index 0000000..218a994 --- /dev/null +++ b/the_fumblers/heimdall/.gitignore @@ -0,0 +1,4 @@ +.direnv/ +.envrc +.env +.venv diff --git a/the_fumblers/heimdall/api.py b/the_fumblers/heimdall/api.py new file mode 100644 index 0000000..6fb6566 --- /dev/null +++ b/the_fumblers/heimdall/api.py @@ -0,0 +1,765 @@ +""" +Heimdall — Unified Infra Control API + +Endpoints: + POST /webhook — HMAC-signed webhook ingestion (deploy/register) + POST /deploy — Deploy a service (API key auth) + POST /teardown — Teardown a service (API key auth) + POST /rollback — Rollback a service (API key auth) + GET /operations/{id} — Check operation status (API key auth) + GET /operations — List recent operations (API key auth) + GET /nodes — List registered nodes + GET /health — Health check +""" + +import hashlib +import hmac +import os +import uuid +import json +import asyncio +import httpx +import time +from datetime import datetime, UTC +from contextlib import asynccontextmanager + +from fastapi import Depends, FastAPI, Header, HTTPException, Request, BackgroundTasks, status +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from db import SessionLocal, Node, ServiceInstance, Operation, init_db +from app.models import ( + DeployRequest, DeployResponse, + TeardownRequest, TeardownResponse, + RollbackRequest, RollbackResponse, + OperationStatus, + DeclareServiceRequest, + DeployAllResponse, + RegisterNodeRequest, RegisterNodeResponse, +) +from app.auth import verify_api_key +from app.ops import run_deploy, run_teardown, run_rollback + + +# ── App setup ───────────────────────────────────────────────────────────────── + +@asynccontextmanager +async def lifespan(app: FastAPI): + print("Heimdall starting up...") + monitor_task = asyncio.create_task(monitor()) + yield + monitor_task.cancel() + try: + await monitor_task + except asyncio.CancelledError: + pass + +app = FastAPI( + title="Heimdall — Infra Control API", + description="Deterministic infrastructure control — deploy, teardown, rollback, monitor.", + version="1.0.0", + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "super-secret-key") +FAIL_THRESHOLD = 3 + +init_db() + + +# ── Models ──────────────────────────────────────────────────────────────────── + +class WebhookPayload(BaseModel): + action: str + service: str + version: str | None = None + env: str + metadata: dict | None = None + + +# ── Webhook helpers ─────────────────────────────────────────────────────────── + +def get_nodes_by_env(env: str): + db = SessionLocal() + try: + return db.query(Node).filter(Node.env == env).all() + finally: + db.close() + + +def get_or_create_service(node: Node, payload: WebhookPayload): + db = SessionLocal() + try: + service = db.query(ServiceInstance).filter( + ServiceInstance.node_id == node.id, + ServiceInstance.name == payload.service, + ).first() + + if not service: + service = ServiceInstance( + node_id=node.id, + name=payload.service, + service_uuid=payload.service, + flake="default", + commands=[], + env=payload.env, + ) + db.add(service) + db.commit() + db.refresh(service) + return service + finally: + db.close() + + +def create_operation(service_id: str, op_type: str, metadata: dict): + db = SessionLocal() + try: + operation = Operation( + service_id=service_id, + type=op_type, + metadata_json=metadata, + ) + db.add(operation) + db.commit() + db.refresh(operation) + return operation + finally: + db.close() + + +def handle_webhook(payload: WebhookPayload): + if payload.action == "register": + db = SessionLocal() + try: + name = payload.metadata.get("name") if payload.metadata else payload.service + host = payload.metadata.get("host") if payload.metadata else "http://localhost:8000" + + node = db.query(Node).filter(Node.name == name).first() + if not node: + node = Node( + name=name, + uuid=name, + host=host, + env=payload.env, + ) + db.add(node) + db.commit() + finally: + db.close() + return + + nodes = get_nodes_by_env(payload.env) + for node in nodes: + service = get_or_create_service(node, payload) + create_operation( + service_id=service.id, + op_type=payload.action, + metadata={"version": payload.version}, + ) + + +# ── Node monitoring ────────────────────────────────────────────────────────── + +async def check_node(node_id: str): + db = SessionLocal() + try: + node = db.query(Node).filter(Node.id == node_id).first() + if not node: + return + + try: + async with httpx.AsyncClient(timeout=2.0) as client: + res = await client.get(f"{node.host}/heartbeat") + if res.status_code == 200: + node.status = "ONLINE" + node.fail_count = 0 + node.last_seen = datetime.now(UTC).isoformat() + "Z" + db.commit() + return + except Exception: + pass + + node.fail_count += 1 + if node.fail_count >= FAIL_THRESHOLD: + node.status = "OFFLINE" + db.commit() + finally: + db.close() + + +async def monitor(): + while True: + db = SessionLocal() + try: + node_ids = [n.id for n in db.query(Node).all()] + finally: + db.close() + + tasks = [check_node(nid) for nid in node_ids] + if tasks: + await asyncio.gather(*tasks) + await asyncio.sleep(5) + + +# ── Webhook auth ───────────────────────────────────────────────────────────── + +def verify_webhook_signature(payload: bytes, x_signature: str, x_timestamp: str | None = None): + if x_timestamp: + # Agent style: hash(body + timestamp) + message = payload.decode('utf-8') + x_timestamp + expected = hmac.new(WEBHOOK_SECRET.encode("utf-8"), message.encode('utf-8'), hashlib.sha256).hexdigest() + if not hmac.compare_digest(expected, x_signature): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid agent webhook signature") + else: + # Legacy style + if not x_signature.startswith("sha256="): + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid signature header") + expected = x_signature.split("=", 1)[1] + mac = hmac.new(WEBHOOK_SECRET.encode("utf-8"), payload, hashlib.sha256) + computed = mac.hexdigest() + if not hmac.compare_digest(computed, expected): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid legacy webhook signature") + + +# ── Webhook Handlers ──────────────────────────────────────────────────────── + +def handle_legacy_webhook(payload: dict): + from pydantic import BaseModel + class LegacyWebhookPayload(BaseModel): + action: str + service: str + version: str | None = None + env: str + metadata: dict | None = None + + p = LegacyWebhookPayload(**payload) + if p.action == "register": + db = SessionLocal() + try: + name = p.metadata.get("name") if p.metadata else p.service + host = p.metadata.get("host") if p.metadata else "http://localhost:8000" + + node = db.query(Node).filter(Node.name == name).first() + if not node: + node = Node( + name=name, + uuid=name, + host=host, + env=p.env, + ) + db.add(node) + db.commit() + finally: + db.close() + return + + nodes = get_nodes_by_env(p.env) + for node in nodes: + service = get_or_create_service(node, p) + create_operation( + service_id=service.id, + op_type=p.action, + metadata={"version": p.version}, + ) + + +def handle_agent_webhook(payload: dict): + wtype = payload.get("type") + + if wtype == "node_status": + # Example: {"type": "node_status", "services": {"api": {"pid": 123, "status": "healthy"}}} + pass # We could sync this back to ServiceInstance status if we wanted to + + elif wtype == "status": + # Example: {"type": "status", "service": "api", "status": "healthy|dead|failed", "error": "..."} + svc_name = payload.get("service") + new_status = payload.get("status") + err = payload.get("error") + db = SessionLocal() + try: + svc = db.query(ServiceInstance).filter(ServiceInstance.name == svc_name).first() + svc_id = svc.id if svc else None + + # Find the most recent 'running' operation for this service + query = db.query(Operation).filter(Operation.status == "running") + if svc_id: + query = query.filter(Operation.service_id == svc_id) + else: + query = query.filter(Operation.service_name == svc_name) + + op = query.order_by(Operation.created_at.desc()).first() + + if op: + if new_status == "healthy": + op.status = "success" + op.message = "Agent reported service is healthy." + op.finished_at = datetime.now(UTC) + elif new_status in ("failed", "dead"): + op.status = "failed" + op.message = f"Agent reported service is {new_status}." + op.error = err + op.finished_at = datetime.now(UTC) + db.commit() + + # Also update ServiceInstance if you wish + svc = db.query(ServiceInstance).filter(ServiceInstance.name == svc_name).first() + if svc: + svc.status = new_status + db.commit() + finally: + db.close() + + elif wtype == "logs_batch": + # Example: {"type": "logs_batch", "logs": [{"service": "api", "stream": "stdout", "log": "..."}]} + logs = payload.get("logs", []) + for log_entry in logs: + svc = log_entry.get("service", "unknown") + stream = log_entry.get("stream", "output") + log_line = log_entry.get("log", "") + # Print remotely streamed logs to the control plane stdout for now + print(f"[Remote {svc} | {stream}] {log_line}") + +def handle_webhook(payload: dict): + if "action" in payload: + handle_legacy_webhook(payload) + elif "type" in payload: + handle_agent_webhook(payload) + +# ── Endpoints: Health ──────────────────────────────────────────────────────── + +@app.get("/health", tags=["meta"]) +async def health(): + return {"status": "ok", "timestamp": time.time()} + + +# ── Endpoints: Webhook ─────────────────────────────────────────────────────── + +@app.post("/webhook", tags=["webhook"]) +async def webhook_listener( + request: Request, + x_signature: str = Header(...), + x_timestamp: str = Header(None) +): + body = await request.body() + verify_webhook_signature(body, x_signature, x_timestamp) + + payload = json.loads(body) + handle_webhook(payload) + + return {"status": "accepted"} + + +# ── Endpoints: Services (Declarative) ──────────────────────────────────────── + +from app.ops import run_deploy, run_teardown, run_rollback, send_agent_inspect + +@app.post("/services", tags=["services"]) +async def declare_service( + req: DeclareServiceRequest, + _: str = Depends(verify_api_key), +): + db = SessionLocal() + try: + node = db.query(Node).filter(Node.name == req.node_name).first() + if not node: + raise HTTPException(status_code=404, detail=f"Node '{req.node_name}' not found") + + svc = db.query(ServiceInstance).filter( + ServiceInstance.node_id == node.id, + ServiceInstance.name == req.service + ).first() + + if not svc: + svc = ServiceInstance( + node_id=node.id, + name=req.service, + service_uuid=req.service, + env=req.environment + ) + db.add(svc) + + # Attempt to discover metadata from manifest if missing + if req.flake and (not req.commands or not req.healthcheck_url): + try: + # node is already fetched at line 369 + if node: + inspection = await send_agent_inspect(node.host, req.flake) + if inspection.get("status") == "success": + manifest = inspection.get("manifest", {}) + if not req.commands: + req.commands = manifest.get("commands", []) + if not req.healthcheck_url: + req.healthcheck_url = manifest.get("healthcheck_url") + except Exception as e: + print(f"Manifest inspection failed: {e}") + + svc.repo_url = req.repo_url + svc.flake = req.flake + svc.commands = req.commands + svc.healthcheck_url = req.healthcheck_url + svc.env = req.environment + svc.triggered_by = req.triggered_by # Audit + + db.commit() + return {"status": "success", "message": f"Service '{req.service}' declared on node '{req.node_name}'."} + finally: + db.close() + + +@app.get("/services", tags=["services"]) +async def list_services(): + db = SessionLocal() + try: + services = db.query(ServiceInstance).all() + return [ + { + "name": s.name, + "status": s.status, + "node_name": s.node.name, + "environment": s.env, + } + for s in services + ] + finally: + db.close() + + +@app.get("/services/{service_name}", tags=["services"]) +async def get_service_detail(service_name: str): + db = SessionLocal() + try: + svc = db.query(ServiceInstance).filter(ServiceInstance.name == service_name).first() + if not svc: + raise HTTPException(status_code=404, detail=f"Service '{service_name}' not found") + + return { + "name": svc.name, + "status": svc.status, + "healthcheck_url": svc.healthcheck_url, + "node": svc.node.name, + "env": svc.env, + } + finally: + db.close() + +# ── Endpoints: Deploy ──────────────────────────────────────────────────────── + +@app.post("/deploy-all", response_model=DeployAllResponse, tags=["operations"]) +async def deploy_all( + background_tasks: BackgroundTasks, + triggered_by: str = None, + _: str = Depends(verify_api_key), +): + db = SessionLocal() + try: + services = db.query(ServiceInstance).all() + if not services: + return DeployAllResponse(status="success", message="No services to deploy.", operation_ids=[]) + + op_ids = [] + for svc in services: + op_id = str(uuid.uuid4()) + op = Operation( + id=op_id, + type="deploy", + service_id=svc.id, + service_name=svc.name, + environment=svc.env, + version="latest", + triggered_by=triggered_by, + message="Queued (Bulk)", + started_at=datetime.now(UTC), + ) + db.add(op) + db.commit() + + # Use real node host + node_host = svc.node.host + # Create a dummy request for the background task + req = DeployRequest( + service=svc.name, + version="latest", + environment=svc.env, + flake=svc.flake, + commands=svc.commands, + healthcheck_url=svc.healthcheck_url, + triggered_by=triggered_by + ) + background_tasks.add_task(run_deploy, op_id, req, node_host) + op_ids.append(op_id) + + return DeployAllResponse(status="success", message=f"Queued {len(op_ids)} deployments.", operation_ids=op_ids) + finally: + db.close() + +@app.post("/deploy", response_model=DeployResponse, tags=["operations"]) +async def deploy( + req: DeployRequest, + background_tasks: BackgroundTasks, + _: str = Depends(verify_api_key), +): + db = SessionLocal() + try: + if not req.node_name: + # Try to infer from a declared service + svc = db.query(ServiceInstance).filter(ServiceInstance.name == req.service).first() + if not svc: + raise HTTPException(status_code=400, detail=f"Service '{req.service}' not declared. Please provide node_name.") + req.node_name = svc.node.name + req.flake = req.flake or svc.flake + req.repo_url = req.repo_url or svc.repo_url + req.commands = req.commands or svc.commands + req.healthcheck_url = req.healthcheck_url or svc.healthcheck_url + + # Resolve node + node = db.query(Node).filter(Node.name == req.node_name).first() + if not node: + raise HTTPException(status_code=404, detail=f"Node '{req.node_name}' not found") + + node_host = node.host + + # Link to service if found + svc = db.query(ServiceInstance).filter(ServiceInstance.name == req.service).first() + svc_id = svc.id if svc else None + + # Clean up stale operations for this service + state_ops = db.query(Operation).filter( + Operation.service_name == req.service, + Operation.status.in_(["pending", "running"]) + ).all() + for old_op in state_ops: + old_op.status = "failed" + old_op.message = "Superseded by new deployment." + old_op.finished_at = datetime.now(UTC) + + op_id = str(uuid.uuid4()) + op = Operation( + id=op_id, + type="deploy", + service_id=svc_id, + service_name=req.service, + environment=req.environment, + version=req.version, + triggered_by=req.triggered_by, + message="Queued", + started_at=datetime.now(UTC), + metadata_json={ + "repo_url": req.repo_url, + "flake": req.flake, + "commands": req.commands, + "healthcheck_url": req.healthcheck_url, + } + ) + db.add(op) + db.commit() + finally: + db.close() + + background_tasks.add_task(run_deploy, op_id, req, node_host) + return DeployResponse( + operation_id=op_id, + status="pending", + message=f"Deploy of {req.service}@{req.version} to {req.node_name} queued.", + ) + + +# ── Endpoints: Teardown ───────────────────────────────────────────────────── + +@app.post("/teardown", response_model=TeardownResponse, tags=["operations"]) +async def teardown( + req: TeardownRequest, + background_tasks: BackgroundTasks, + _: str = Depends(verify_api_key), +): + op_id = str(uuid.uuid4()) + db = SessionLocal() + try: + svc = db.query(ServiceInstance).filter(ServiceInstance.name == req.service).first() + env = svc.env if svc else "dev" + op = Operation( + id=op_id, + type="teardown", + service_id=svc.id if svc else None, + service_name=req.service, + environment=env, + triggered_by=req.triggered_by, + message="Queued", + started_at=datetime.now(UTC), + ) + db.add(op) + db.commit() + finally: + db.close() + + background_tasks.add_task(run_teardown, op_id, req) + return TeardownResponse( + operation_id=op_id, + status="pending", + message=f"Teardown of {req.service} queued.", + ) + + +# ── Endpoints: Rollback ───────────────────────────────────────────────────── + +@app.post("/rollback", response_model=RollbackResponse, tags=["operations"]) +async def rollback( + req: RollbackRequest, + background_tasks: BackgroundTasks, + _: str = Depends(verify_api_key), +): + op_id = str(uuid.uuid4()) + db = SessionLocal() + try: + svc = db.query(ServiceInstance).filter(ServiceInstance.name == req.service).first() + op = Operation( + id=op_id, + type="rollback", + service_id=svc.id if svc else None, + service_name=req.service, + environment=req.environment, + target_version=req.target_version, + triggered_by=req.triggered_by, + message="Queued", + started_at=datetime.now(UTC), + ) + db.add(op) + db.commit() + finally: + db.close() + + background_tasks.add_task(run_rollback, op_id, req) + return RollbackResponse( + operation_id=op_id, + status="pending", + message=f"Rollback of {req.service} to {req.target_version} in {req.environment} queued.", + ) + + +# ── Endpoints: Operation Status ───────────────────────────────────────────── + +@app.get("/operations/{operation_id}", response_model=OperationStatus, tags=["operations"]) +async def get_operation( + operation_id: str, + _: str = Depends(verify_api_key), +): + db = SessionLocal() + try: + op = db.query(Operation).filter(Operation.id == operation_id).first() + if not op: + raise HTTPException(status_code=404, detail="Operation not found.") + return OperationStatus( + id=op.id, + type=op.type, + status=op.status, + service=op.service_name or "", + environment=op.environment or "", + version=op.version, + target_version=op.target_version, + healthcheck_url=op.metadata_json.get("healthcheck_url") if op.metadata_json else None, + started_at=op.started_at.timestamp() if op.started_at else 0, + finished_at=op.finished_at.timestamp() if op.finished_at else None, + message=op.message or "", + error=op.error, + ) + finally: + db.close() + + +@app.get("/operations/audit", tags=["operations"]) +async def get_audit_logs(limit: int = 50): + db = SessionLocal() + try: + ops = db.query(Operation).order_by(Operation.created_at.desc()).limit(limit).all() + return [ + { + "id": op.id, + "type": op.type, + "service": op.service_name, + "status": op.status, + "triggered_by": op.triggered_by, + "created_at": op.created_at.isoformat() if op.created_at else None, + "message": op.message + } + for op in ops + ] + finally: + db.close() +async def list_operations( + _: str = Depends(verify_api_key), + limit: int = 20, +): + db = SessionLocal() + try: + ops = db.query(Operation).order_by(Operation.created_at.desc()).limit(limit).all() + return { + "operations": [ + { + "id": op.id, + "type": op.type, + "status": op.status, + "service": op.service_name, + "environment": op.environment, + "version": op.version, + "target_version": op.target_version, + "message": op.message, + "error": op.error, + "started_at": op.started_at.timestamp() if op.started_at else None, + "finished_at": op.finished_at.timestamp() if op.finished_at else None, + } + for op in ops + ], + "total": db.query(Operation).count(), + } + finally: + db.close() + + +# ── Endpoints: Nodes ───────────────────────────────────────────────────────── + +@app.post("/nodes", response_model=RegisterNodeResponse, tags=["nodes"]) +async def register_node( + req: RegisterNodeRequest, + _: str = Depends(verify_api_key), +): + db = SessionLocal() + try: + node = db.query(Node).filter( + (Node.name == req.name) | (Node.uuid == req.uuid) + ).first() + + if not node: + node = Node( + name=req.name, + uuid=req.uuid, + host=req.host, + env=req.environment + ) + db.add(node) + message = f"Node '{req.name}' registered successfully." + else: + node.name = req.name + node.host = req.host + node.env = req.environment + message = f"Node '{req.name}' updated successfully." + + db.commit() + return RegisterNodeResponse(status="success", message=message) + finally: + db.close() + + +@app.get("/nodes", tags=["nodes"]) +async def get_nodes(): + db = SessionLocal() + try: + return db.query(Node).all() + finally: + db.close() diff --git a/the_fumblers/heimdall/app/__init__.py b/the_fumblers/heimdall/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/the_fumblers/heimdall/app/auth.py b/the_fumblers/heimdall/app/auth.py new file mode 100644 index 0000000..0d97198 --- /dev/null +++ b/the_fumblers/heimdall/app/auth.py @@ -0,0 +1,20 @@ +""" +API Key authentication. +Set INFRA_API_KEY in your environment (or .env file). +Discord bot passes it as: X-API-Key: +""" + +import os +from typing import Optional +from fastapi import HTTPException, Security +from fastapi.security import APIKeyHeader + +API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False) + +_API_KEY = os.getenv("INFRA_API_KEY", "heimdall") + + +async def verify_api_key(api_key: Optional[str] = Security(API_KEY_HEADER)) -> str: + if api_key is None or api_key != _API_KEY: + raise HTTPException(status_code=401, detail="Invalid or missing API key.") + return api_key diff --git a/the_fumblers/heimdall/app/main.py b/the_fumblers/heimdall/app/main.py new file mode 100644 index 0000000..3abb95c --- /dev/null +++ b/the_fumblers/heimdall/app/main.py @@ -0,0 +1,157 @@ +""" +Infra Control API — Deploy / Teardown / Rollback +Authentication: Bearer API Key via X-API-Key header +""" + +from fastapi import FastAPI, HTTPException, Depends, Header, BackgroundTasks +from fastapi.middleware.cors import CORSMiddleware +from contextlib import asynccontextmanager +import uuid +import time + +from app.models import ( + DeployRequest, DeployResponse, + TeardownRequest, TeardownResponse, + RollbackRequest, RollbackResponse, + OperationStatus, +) +from app.store import operation_store +from app.auth import verify_api_key +from app.ops import run_deploy, run_teardown, run_rollback + + +@asynccontextmanager +async def lifespan(app: FastAPI): + print("Infra Control API starting up...") + yield + print("Infra Control API shutting down...") + + +app = FastAPI( + title="Infra Control API", + description="Conversational infrastructure control — deploy, teardown, rollback.", + version="1.0.0", + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + +# ── Health ──────────────────────────────────────────────────────────────────── + +@app.get("/health", tags=["meta"]) +async def health(): + return {"status": "ok", "timestamp": time.time()} + + +# ── Deploy ──────────────────────────────────────────────────────────────────── + +@app.post("/deploy", response_model=DeployResponse, tags=["operations"]) +async def deploy( + req: DeployRequest, + background_tasks: BackgroundTasks, + _: str = Depends(verify_api_key), +): + op_id = str(uuid.uuid4()) + operation_store[op_id] = { + "id": op_id, + "type": "deploy", + "status": "pending", + "service": req.service, + "version": req.version, + "environment": req.environment, + "started_at": time.time(), + "finished_at": None, + "message": "Queued", + "error": None, + } + background_tasks.add_task(run_deploy, op_id, req) + return DeployResponse( + operation_id=op_id, + status="pending", + message=f"Deploy of {req.service}@{req.version} to {req.environment} queued.", + ) + + +# ── Teardown ────────────────────────────────────────────────────────────────── + +@app.post("/teardown", response_model=TeardownResponse, tags=["operations"]) +async def teardown( + req: TeardownRequest, + background_tasks: BackgroundTasks, + _: str = Depends(verify_api_key), +): + op_id = str(uuid.uuid4()) + operation_store[op_id] = { + "id": op_id, + "type": "teardown", + "status": "pending", + "service": req.service, + "environment": req.environment, + "started_at": time.time(), + "finished_at": None, + "message": "Queued", + "error": None, + } + background_tasks.add_task(run_teardown, op_id, req) + return TeardownResponse( + operation_id=op_id, + status="pending", + message=f"Teardown of {req.service} in {req.environment} queued.", + ) + + +# ── Rollback ────────────────────────────────────────────────────────────────── + +@app.post("/rollback", response_model=RollbackResponse, tags=["operations"]) +async def rollback( + req: RollbackRequest, + background_tasks: BackgroundTasks, + _: str = Depends(verify_api_key), +): + op_id = str(uuid.uuid4()) + operation_store[op_id] = { + "id": op_id, + "type": "rollback", + "status": "pending", + "service": req.service, + "target_version": req.target_version, + "environment": req.environment, + "started_at": time.time(), + "finished_at": None, + "message": "Queued", + "error": None, + } + background_tasks.add_task(run_rollback, op_id, req) + return RollbackResponse( + operation_id=op_id, + status="pending", + message=f"Rollback of {req.service} to {req.target_version} in {req.environment} queued.", + ) + + +# ── Operation Status ────────────────────────────────────────────────────────── + +@app.get("/operations/{operation_id}", response_model=OperationStatus, tags=["operations"]) +async def get_operation( + operation_id: str, + _: str = Depends(verify_api_key), +): + op = operation_store.get(operation_id) + if not op: + raise HTTPException(status_code=404, detail="Operation not found.") + return OperationStatus(**op) + + +@app.get("/operations", tags=["operations"]) +async def list_operations( + _: str = Depends(verify_api_key), + limit: int = 20, +): + ops = sorted(operation_store.values(), key=lambda x: x["started_at"], reverse=True) + return {"operations": ops[:limit], "total": len(ops)} \ No newline at end of file diff --git a/the_fumblers/heimdall/app/models.py b/the_fumblers/heimdall/app/models.py new file mode 100644 index 0000000..5d14e9f --- /dev/null +++ b/the_fumblers/heimdall/app/models.py @@ -0,0 +1,89 @@ +from pydantic import BaseModel, Field +from typing import Optional, Literal +import time + + +# ── Requests ────────────────────────────────────────────────────────────────── + +class DeployRequest(BaseModel): + service: str = Field(..., examples=["api-gateway"]) + node_name: str | None = Field(None, examples=["node-1"]) + repo_url: str | None = Field(None, examples=["github:org/repo"]) + flake: str | None = Field(None, examples=["github:org/repo#api"]) + commands: list[str] | None = Field(None, examples=[["run", "migrate"]]) + healthcheck_url: str | None = Field(None, examples=["http://localhost:8080/"]) + version: str = Field("latest", examples=["v1.4.2"]) + environment: Literal["dev", "staging", "prod"] = "dev" + triggered_by: str | None = None + +class DeclareServiceRequest(BaseModel): + service: str = Field(..., examples=["api-gateway"]) + node_name: str = Field(..., examples=["node-1"]) + repo_url: str | None = None + flake: str | None = None + commands: list[str] | None = None + healthcheck_url: str | None = None + environment: Literal["dev", "staging", "prod"] = "dev" + triggered_by: str | None = None + +class TeardownRequest(BaseModel): + service: str = Field(..., examples=["api-gateway"]) + triggered_by: str | None = None + +class RollbackRequest(BaseModel): + service: str = Field(..., examples=["api-gateway"]) + environment: Literal["dev", "staging", "prod"] = "dev" + target_version: str = Field(..., examples=["v1.4.1"]) + reason: Optional[str] = None + triggered_by: str | None = None + +class DeployAllResponse(BaseModel): + status: str + message: str + operation_ids: list[str] + + +class RegisterNodeRequest(BaseModel): + name: str = Field(..., examples=["node-1"]) + uuid: str = Field(..., examples=["node-1-unique-id"]) + host: str = Field(..., examples=["http://10.0.0.5:8001"]) + environment: Literal["dev", "staging", "prod"] = "dev" + +class RegisterNodeResponse(BaseModel): + status: str + message: str + + +# ── Responses ───────────────────────────────────────────────────────────────── + +class DeployResponse(BaseModel): + operation_id: str + status: str + message: str + +class TeardownResponse(BaseModel): + operation_id: str + status: str + message: str + +class RollbackResponse(BaseModel): + operation_id: str + status: str + message: str + + +# ── Operation Status ────────────────────────────────────────────────────────── + +class OperationStatus(BaseModel): + id: str + type: str + status: Literal["pending", "running", "success", "failed"] + service: str + environment: str + started_at: float + finished_at: Optional[float] + message: str + error: Optional[str] + version: Optional[str] = None + target_version: Optional[str] = None + healthcheck_url: str | None = None \ No newline at end of file diff --git a/the_fumblers/heimdall/app/ops.py b/the_fumblers/heimdall/app/ops.py new file mode 100644 index 0000000..2a17e99 --- /dev/null +++ b/the_fumblers/heimdall/app/ops.py @@ -0,0 +1,126 @@ +""" +Operation runners — DB-backed, integrating with fastapi_agent. +""" + +import asyncio +import httpx +import hmac +import hashlib +import json +import time +import os +from datetime import datetime, UTC +from db import SessionLocal, Operation +from app.models import DeployRequest, TeardownRequest, RollbackRequest + +WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "super-secret-key").encode() + +def _mark(op_id: str, status: str, message: str, error: str = None): + db = SessionLocal() + try: + op = db.query(Operation).filter(Operation.id == op_id).first() + if op: + op.status = status + op.message = message + op.error = error + if status in ("success", "failed"): + op.finished_at = datetime.now(UTC) + db.commit() + finally: + db.close() + +def _generate_hmac_signature(body_str: str, timestamp: str) -> str: + message = body_str + timestamp + return hmac.new(WEBHOOK_SECRET, message.encode(), hashlib.sha256).hexdigest() + +async def send_agent_command(node_host: str, payload: dict): + """Send an HMAC-signed POST to the remote agent's /command endpoint.""" + body_str = json.dumps(payload) + timestamp = str(int(time.time())) + signature = _generate_hmac_signature(body_str, timestamp) + + headers = { + "X-Timestamp": timestamp, + "X-Signature": signature, + "Content-Type": "application/json" + } + + # Strip trailing slash if present on node_host + base_url = node_host.rstrip('/') + url = f"{base_url}/command" + + async with httpx.AsyncClient(timeout=10.0) as client: + res = await client.post(url, content=body_str.encode(), headers=headers) + res.raise_for_status() + return res.json() +async def send_agent_inspect(node_host: str, flake_path: str): + """Ask agent to inspect a flake and return metadata.""" + payload = {"flake": flake_path} + body_str = json.dumps(payload) + + # We'll use the same HMAC signing for security + timestamp = str(int(time.time())) + signature = _generate_hmac_signature(body_str, timestamp) + + headers = { + "X-Timestamp": timestamp, + "X-Signature": signature, + "Content-Type": "application/json" + } + + base_url = node_host.rstrip('/') + url = f"{base_url}/inspect" + + async with httpx.AsyncClient(timeout=10.0) as client: + res = await client.post(url, content=body_str.encode(), headers=headers) + res.raise_for_status() + return res.json() + +async def run_deploy(op_id: str, req: DeployRequest, node_host: str): + _mark(op_id, "running", f"Sending deploy command to agent on {req.node_name}...") + try: + if not req.flake: + raise ValueError("A Nix flake reference is required to hit the agent's /command endpoint.") + + payload = { + "operation_id": op_id, + "service": req.service, + "flake": req.flake, + "healthcheck_url": req.healthcheck_url, + } + + response = await send_agent_command(node_host, payload) + + if response.get("status") == "accepted": + _mark(op_id, "running", f"Agent accepted deployment. Waiting for health status...") + # Note: The actual success/fail status will be pushed asynchronously + # by the agent via the /webhook endpoint. We leave the operation as "running". + elif response.get("status") == "already running": + _mark(op_id, "running", f"Service is currently locked/deploying. Waiting...") + else: + err = response.get("error", "Unknown error") + _mark(op_id, "failed", f"Agent rejected deployment: {err}", error=err) + + except Exception as e: + _mark(op_id, "failed", "Failed to communicate with agent.", error=str(e)) + + +async def run_teardown(op_id: str, req: TeardownRequest): + _mark(op_id, "running", f"Tearing down {req.service}...") + try: + # Agent doesn't explicitly support teardown yet via /command, + # so this remains a simulated block. + await asyncio.sleep(2) + _mark(op_id, "success", f"Torn down {req.service}.") + except Exception as e: + _mark(op_id, "failed", "Teardown failed.", error=str(e)) + + +async def run_rollback(op_id: str, req: RollbackRequest): + _mark(op_id, "running", f"Rolling back {req.service} → {req.target_version}...") + try: + # Agent has no specific rollback endpoint yet. + await asyncio.sleep(2) + _mark(op_id, "success", f"Rolled back {req.service} to {req.target_version} in {req.environment}.") + except Exception as e: + _mark(op_id, "failed", "Rollback failed.", error=str(e)) \ No newline at end of file diff --git a/the_fumblers/heimdall/app/store.py b/the_fumblers/heimdall/app/store.py new file mode 100644 index 0000000..18c3ac0 --- /dev/null +++ b/the_fumblers/heimdall/app/store.py @@ -0,0 +1,8 @@ +""" +Simple in-memory store for operation state. +In production, replace with Redis or a DB-backed store. +""" + +from typing import Dict, Any + +operation_store: Dict[str, Any] = {} \ No newline at end of file diff --git a/the_fumblers/heimdall/db.py b/the_fumblers/heimdall/db.py new file mode 100644 index 0000000..30426ae --- /dev/null +++ b/the_fumblers/heimdall/db.py @@ -0,0 +1,112 @@ +from sqlalchemy import ( + Column, String, ForeignKey, Enum, JSON, DateTime, Integer +) +from sqlalchemy.orm import relationship, declarative_base +from datetime import datetime, UTC +import uuid + +Base = declarative_base() + +class Node(Base): + __tablename__ = "nodes" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + name = Column(String, nullable=False, unique=True) + uuid = Column(String, nullable=False, unique=True) + + host = Column(String, nullable=False) + env = Column(String, nullable=False) # dev / test / prod + + status = Column(String, default="UNKNOWN") + fail_count = Column(Integer, default=0) + last_seen = Column(String, nullable=True) + + created_at = Column(DateTime, default=lambda: datetime.now(UTC)) + + # relationships + services = relationship( + "ServiceInstance", + back_populates="node", + cascade="all, delete-orphan" + ) + + +class ServiceInstance(Base): + __tablename__ = "service_instances" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + + node_id = Column(String, ForeignKey("nodes.id"), nullable=False) + + name = Column(String, nullable=False) # api, worker + service_uuid = Column(String, nullable=False) + + repo_url = Column(String, nullable=True) # e.g. github:org/repo + flake = Column(String, nullable=True) # e.g. github:org/repo#api + commands = Column(JSON, nullable=True) # e.g. ["run", "migrate"] + healthcheck_url = Column(String, nullable=True) + env = Column(String, nullable=False) + + status = Column(String, default="idle") # idle, running, failed + + created_at = Column(DateTime, default=lambda: datetime.now(UTC)) + + # relationships + node = relationship("Node", back_populates="services") + + operations = relationship( + "Operation", + back_populates="service", + cascade="all, delete-orphan" + ) + +class Operation(Base): + __tablename__ = "operations" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + + service_id = Column(String, ForeignKey("service_instances.id"), nullable=True) + + type = Column(String, nullable=False) # deploy, teardown, rollback + status = Column(String, default="pending") # pending, running, success, failed + + # Service info (denormalized for quick access without joins) + service_name = Column(String, nullable=True) + environment = Column(String, nullable=True) + version = Column(String, nullable=True) + target_version = Column(String, nullable=True) + + message = Column(String, nullable=True) + error = Column(String, nullable=True) + + metadata_json = Column("metadata", JSON) + + triggered_by = Column(String, nullable=True) # Who started this? (e.g. Discord user) + started_at = Column(DateTime, nullable=True) + finished_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=lambda: datetime.now(UTC)) + + service = relationship("ServiceInstance", back_populates="operations") + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +DATABASE_URL = "sqlite:///./heimdall.db" +engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +from sqlalchemy import text + +def init_db(): + Base.metadata.create_all(bind=engine) + # Simple migration: ensure triggered_by exists + with engine.connect() as conn: + try: + conn.execute(text("SELECT triggered_by FROM operations LIMIT 1")) + except Exception: + print("⚠️ Operations table missing 'triggered_by' column. Adding it...") + try: + conn.execute(text("ALTER TABLE operations ADD COLUMN triggered_by TEXT")) + conn.commit() + except Exception as e: + print(f"❌ Failed to add column: {e}") diff --git a/the_fumblers/heimdall/discord_bot/bot.py b/the_fumblers/heimdall/discord_bot/bot.py new file mode 100644 index 0000000..1c77f52 --- /dev/null +++ b/the_fumblers/heimdall/discord_bot/bot.py @@ -0,0 +1,328 @@ +import os +import certifi +os.environ['SSL_CERT_FILE'] = certifi.where() +os.environ['REQUESTS_CA_BUNDLE'] = certifi.where() + +import asyncio +import aiohttp +import discord +import ssl +from discord import app_commands +from discord.ext import commands +from dotenv import load_dotenv + +# Ultimate SSL Fix for restricted environments +try: + ssl._create_default_https_context = ssl._create_unverified_context +except Exception: + pass + +load_dotenv() + +DISCORD_TOKEN = os.getenv("DISCORD_TOKEN") +API_URL = os.getenv("INFRA_API_URL", "http://localhost:8000") +API_KEY = os.getenv("INFRA_API_KEY") + +if not DISCORD_TOKEN: + raise RuntimeError("DISCORD_TOKEN is not set.") + +if not API_KEY: + raise RuntimeError("INFRA_API_KEY is not set.") + +HEADERS = {"X-API-Key": API_KEY, "Content-Type": "application/json"} +ENVS = ["dev", "staging", "prod"] + +# ── Bot setup ───────────────────────────────────────────────────────────────── + +intents = discord.Intents.default() +bot = commands.Bot(command_prefix="!", intents=intents) +tree = bot.tree + +# ── Helpers (Basic) ─────────────────────────────────────────────────────────── + +def status_emoji(status: str) -> str: + return {"pending": "⏳", "running": "🔄", "success": "✅", "failed": "❌", "booting": "🟡", "healthy": "🟢"}.get(status, "❓") + +def node_emoji(status: str) -> str: + return {"ONLINE": "🟢", "OFFLINE": "🔴"}.get(status, "⚪") + +async def api_post(path: str, payload: dict) -> dict: + async with aiohttp.ClientSession() as session: + async with session.post(f"{API_URL}{path}", json=payload, headers=HEADERS) as r: + r.raise_for_status() + return await r.json() + +async def api_get(path: str) -> dict: + async with aiohttp.ClientSession() as session: + async with session.get(f"{API_URL}{path}", headers=HEADERS) as r: + r.raise_for_status() + return await r.json() + +# ── Helpers (Advanced) ──────────────────────────────────────────────────────── + +async def node_autocomplete(interaction: discord.Interaction, current: str) -> list[app_commands.Choice[str]]: + try: + nodes = await api_get("/nodes") + return [app_commands.Choice(name=n['name'], value=n['name']) for n in nodes if current.lower() in n['name'].lower()][:25] + except: return [] + +async def service_autocomplete(interaction: discord.Interaction, current: str) -> list[app_commands.Choice[str]]: + try: + services = await api_get("/services") + return [app_commands.Choice(name=s['name'], value=s['name']) for s in services if current.lower() in s['name'].lower()][:25] + except: return [] + +async def send_error_embed(interaction: discord.Interaction, error: str): + embed = discord.Embed(title="❌ Heimdall API — Error", description=f"```{error}```", color=discord.Color.red()) + if "Connect call failed" in error or "Cannot connect to host" in error: + embed.add_field(name="💡 Troubleshooting", value="The Control Plane seems offline. Ensure `bash start.sh` is running on port 8000.", inline=False) + await interaction.followup.send(embed=embed) + +def op_embed(op: dict) -> discord.Embed: + s = op.get("status", "unknown") + color = {"success": discord.Color.green(), "failed": discord.Color.red(), "running": discord.Color.yellow(), "pending": discord.Color.blurple()}.get(s, discord.Color.greyple()) + embed = discord.Embed(title=f"{status_emoji(s)} {op.get('type','op').capitalize()} — {s.upper()}", description=op.get("message", ""), color=color) + embed.add_field(name="Service", value=op.get("service", "—"), inline=True) + embed.add_field(name="Environment", value=op.get("environment", "—"), inline=True) + if op.get("version"): embed.add_field(name="Version", value=f"`{op['version']}`", inline=True) + url = op.get("healthcheck_url") + if url: embed.add_field(name="Deployment URL", value=f"[Go to Service]({url})\n`{url}`", inline=False) + if op.get("error"): embed.add_field(name="Error", value=f"```{op['error']}```", inline=False) + embed.set_footer(text=f"op_id: {op.get('id','?')}") + return embed + +async def poll_operation(op_id: str, message=None, max_wait: int = 60) -> dict: + last_status = "pending" + for _ in range(max_wait // 2): + await asyncio.sleep(2) + op = await api_get(f"/operations/{op_id}") + status = op["status"] + if status != last_status and status == "running" and message: + last_status = status + try: await message.edit(embed=op_embed(op)) + except: pass + if status in ("success", "failed"): return op + return await api_get(f"/operations/{op_id}") + +async def send_live_health_monitor(interaction: discord.Interaction, service_name: str = None): + title = f"🔍 Initializing {service_name or 'Global API'} monitor..." + msg = await interaction.followup.send(embed=discord.Embed(title=title, color=discord.Color.greyple())) + async def monitor_loop(): + i = 0 + while True: + i += 1 + try: + if service_name: + data = await api_get(f"/services/{service_name}") + status = data['status'] + color = discord.Color.green() if status == "healthy" else discord.Color.orange() if status == "booting" else discord.Color.red() + title = f"{'🟢' if status == 'healthy' else '🟡' if status == 'booting' else '🔴'} Service: {service_name}" + desc = f"Status: **{status}**\nNode: `{data['node']}`\nURL: {data['healthcheck_url'] or 'None'}" + else: + data = await api_get("/health") + status = data['status'] + color = discord.Color.green() if status == "ok" else discord.Color.red() + title = f"{'🟢' if status == 'ok' else '🔴'} Heimdall API — Healthy" + desc = f"Status: **{status}**\nURL: `{API_URL}`" + embed = discord.Embed(title=title, description=desc, color=color) + embed.set_footer(text=f"Live monitoring active • Updates: {i}") + await msg.edit(embed=embed) + except Exception as e: + embed = discord.Embed(title=f"🔴 {service_name or 'Heimdall API'} — Unreachable", description=f"Failed to connect to Control Plane.\n\n**Error:**\n```{e}```", color=discord.Color.red()) + embed.set_footer(text=f"Live monitoring active • Updates: {i}") + await msg.edit(embed=embed) + await asyncio.sleep(5) + asyncio.create_task(monitor_loop()) + +@tree.command(name="register-node", description="Register a new infrastructure node (agent).") +@app_commands.describe(name="Display name for the node", node_id="Unique identifier for the node", host="Agent URL (e.g. http://10.0.0.5:8001)", environment="Target environment") +@app_commands.choices(environment=[app_commands.Choice(name=e, value=e) for e in ENVS]) +async def cmd_node_register(interaction: discord.Interaction, name: str, node_id: str, host: str, environment: app_commands.Choice[str] = None): + await interaction.response.defer(thinking=True) + env_val = environment.value if environment else "dev" + payload = {"name": name, "uuid": node_id, "host": host, "environment": env_val} + try: + resp = await api_post("/nodes", payload) + await interaction.followup.send(f"✅ {resp.get('message', 'Node registered.')}") + except Exception as e: await send_error_embed(interaction, str(e)) + +@tree.command(name="register", description="Declare a new service configuration.") +@app_commands.describe(service="Service name", node_name="Target node name", flake="Nix flake reference", commands="Comma-separated commands", healthcheck_url="Health poll URL", environment="Target environment") +@app_commands.choices(environment=[app_commands.Choice(name=e, value=e) for e in ENVS]) +@app_commands.autocomplete(node_name=node_autocomplete) +async def cmd_register(interaction: discord.Interaction, service: str, node_name: str, flake: str = None, commands: str = None, healthcheck_url: str = None, environment: app_commands.Choice[str] = None): + await interaction.response.defer(thinking=True) + env_val = environment.value if environment else "dev" + cmd_list = [c.strip() for c in commands.split(",")] if commands else [] + payload = {"service": service, "node_name": node_name, "environment": env_val, "triggered_by": str(interaction.user)} + if flake: payload["flake"] = flake + if cmd_list: payload["commands"] = cmd_list + if healthcheck_url: payload["healthcheck_url"] = healthcheck_url + try: + resp = await api_post("/services", payload) + await interaction.followup.send(f"✅ {resp.get('message', 'Service declared.')}") + await send_live_health_monitor(interaction, service_name=service) + except Exception as e: + await send_error_embed(interaction, str(e)) + if "Connect" in str(e): await send_live_health_monitor(interaction, service_name=service) + +@tree.command(name="deploy", description="Trigger a project deployment.") +@app_commands.describe(service="Service name", node_name="Override node", flake="Override flake", commands="Override commands", healthcheck_url="Override health URL", version="Version tag/branch", environment="Target environment") +@app_commands.choices(environment=[app_commands.Choice(name=e, value=e) for e in ENVS]) +@app_commands.autocomplete(service=service_autocomplete, node_name=node_autocomplete) +async def cmd_deploy(interaction: discord.Interaction, service: str, node_name: str = None, repo_url: str = None, flake: str = None, commands: str = None, healthcheck_url: str = None, version: str = "latest", environment: app_commands.Choice[str] = None): + await interaction.response.defer(thinking=True) + env_val = environment.value if environment else "dev" + cmd_list = [c.strip() for c in commands.split(",")] if commands else [] + payload = {"service": service, "version": version, "environment": env_val, "triggered_by": str(interaction.user)} + if node_name: payload["node_name"] = node_name + if repo_url: payload["repo_url"] = repo_url + if flake: payload["flake"] = flake + if cmd_list: payload["commands"] = cmd_list + if healthcheck_url: payload["healthcheck_url"] = healthcheck_url + try: + resp = await api_post("/deploy", payload) + op_id = resp["operation_id"] + msg = await interaction.followup.send(embed=discord.Embed(title="⏳ Deploy queued", description=resp["message"], color=discord.Color.blurple()).set_footer(text=f"op_id: {op_id}")) + op = await poll_operation(op_id, message=msg) + await msg.edit(embed=op_embed(op)) + await send_live_health_monitor(interaction, service_name=service) + except Exception as e: + await send_error_embed(interaction, str(e)) + if "Connect" in str(e): await send_live_health_monitor(interaction, service_name=service) + +@tree.command(name="teardown", description="Decommission a service from its node.") +@app_commands.describe(service="Service name to teardown") +@app_commands.autocomplete(service=service_autocomplete) +async def cmd_teardown(interaction: discord.Interaction, service: str): + await interaction.response.defer(thinking=True) + try: + resp = await api_post("/teardown", {"service": service, "triggered_by": str(interaction.user)}) + op_id = resp["operation_id"] + await interaction.followup.send(embed=discord.Embed(title=f"🗑️ Teardown — {service}", description="Queued...", color=discord.Color.orange())) + op = await poll_operation(op_id) + await interaction.followup.send(embed=op_embed(op)) + except Exception as e: await send_error_embed(interaction, str(e)) + +@tree.command(name="rollback", description="Roll back a service.") +@app_commands.describe(service="Service name", environment="Target environment", target_version="Version to roll back to") +@app_commands.choices(environment=[app_commands.Choice(name=e, value=e) for e in ENVS]) +@app_commands.autocomplete(service=service_autocomplete) +async def cmd_rollback(interaction: discord.Interaction, service: str, environment: app_commands.Choice[str], target_version: str, reason: str = ""): + await interaction.response.defer(thinking=True) + try: + resp = await api_post("/rollback", { + "service": service, + "environment": environment.value, + "target_version": target_version, + "reason": reason or None, + "triggered_by": str(interaction.user) + }) + op_id = resp["operation_id"] + await interaction.followup.send(embed=discord.Embed(title="⏳ Rollback queued", description=resp["message"], color=discord.Color.gold()).set_footer(text=f"op_id: {op_id}")) + op = await poll_operation(op_id) + await interaction.followup.send(embed=op_embed(op)) + except Exception as e: await send_error_embed(interaction, str(e)) + +@tree.command(name="status", description="Get operation status.") +async def cmd_status(interaction: discord.Interaction, operation_id: str): + await interaction.response.defer(thinking=True) + try: + op = await api_get(f"/operations/{operation_id}") + await interaction.followup.send(embed=op_embed(op)) + except Exception as e: await send_error_embed(interaction, str(e)) + +@tree.command(name="nodes", description="List registered nodes.") +async def cmd_nodes(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + nodes = await api_get("/nodes") + embed = discord.Embed(title=f"🖥️ Registered Nodes ({len(nodes)})", color=discord.Color.teal()) + for n in nodes: + s = n.get("status", "UNKNOWN") + embed.add_field(name=f"{node_emoji(s)} {n['name']}", value=f"Host: `{n['host']}`\nEnv: `{n['env']}`", inline=True) + await interaction.followup.send(embed=embed) + except Exception as e: await send_error_embed(interaction, str(e)) + +@tree.command(name="services", description="List all registered services and their status.") +async def cmd_services(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + services = await api_get("/services") + if not services: + await interaction.followup.send("No services registered.") + return + + embed = discord.Embed( + title=f"📦 Registered Services ({len(services)})", + color=discord.Color.blue(), + ) + for s in services: + status = s.get("status", "unknown") + node = s.get("node_name", "—") + env = s.get("environment", "—") + emoji = status_emoji(status) + + value = f"Node: `{node}`\nEnv: `{env}`\nStatus: **{status}**" + embed.add_field(name=f"{emoji} {s['name']}", value=value, inline=True) + + await interaction.followup.send(embed=embed) + except Exception as e: await send_error_embed(interaction, str(e)) + +@tree.command(name="health", description="Live health monitor.") +@app_commands.autocomplete(service=service_autocomplete) +async def cmd_health(interaction: discord.Interaction, service: str = None): + await interaction.response.defer(thinking=True) + await send_live_health_monitor(interaction, service_name=service) + +@tree.command(name="deploy-all", description="Trigger a full deployment for ALL registered services.") +async def cmd_deploy_all(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + resp = await api_post("/deploy-all", {"triggered_by": str(interaction.user)}) + ids = resp.get("operation_ids", []) + await interaction.followup.send( + embed=discord.Embed( + title="🚀 Bulk Deploy Initiated", + description=f"Queued **{len(ids)}** deployments.\n\n{resp.get('message')}", + color=discord.Color.purple() + ).set_footer(text=f"Triggered by: {interaction.user}") + ) + except Exception as e: await send_error_embed(interaction, str(e)) + +@tree.command(name="audit", description="View recent infrastructure audit logs.") +@app_commands.describe(limit="Number of logs to show (max 50)") +async def cmd_audit(interaction: discord.Interaction, limit: int = 15): + await interaction.response.defer(thinking=True) + try: + logs = await api_get(f"/operations/audit?limit={limit}") + if not logs: + await interaction.followup.send("No audit logs found.") + return + + embed = discord.Embed(title="🛡️ Infrastructure Audit Logs", color=discord.Color.dark_grey()) + for log in logs: + emoji = status_emoji(log.get("status", "")) + time_str = log.get("created_at", "").split("T")[1][:8] if "T" in log.get("created_at", "") else "???" + + name = f"{emoji} {log['type'].upper()} — {log.get('service', 'General')}" + value = f"By: **{log.get('triggered_by', 'System')}** at `{time_str}`\nStatus: `{log.get('status')}`" + embed.add_field(name=name, value=value, inline=False) + + await interaction.followup.send(embed=embed) + except Exception as e: await send_error_embed(interaction, str(e)) + +@tree.command(name="add-node", description="Alias for /register-node.") +@app_commands.describe(name="Display name", node_id="Unique ID", host="Agent URL", environment="Target environment") +@app_commands.choices(environment=[app_commands.Choice(name=e, value=e) for e in ENVS]) +async def cmd_add_node(interaction: discord.Interaction, name: str, node_id: str, host: str, environment: app_commands.Choice[str] = None): + # Just proxy to the same command logic + await cmd_node_register(interaction, name, node_id, host, environment) + +@bot.event +async def on_ready(): + await tree.sync() + print(f"Heimdall bot ready: {bot.user}") + +if __name__ == "__main__": + bot.run(DISCORD_TOKEN) \ No newline at end of file diff --git a/the_fumblers/heimdall/examples/api_service/flake.lock b/the_fumblers/heimdall/examples/api_service/flake.lock new file mode 100644 index 0000000..d95764d --- /dev/null +++ b/the_fumblers/heimdall/examples/api_service/flake.lock @@ -0,0 +1,61 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1774386573, + "narHash": "sha256-4hAV26quOxdC6iyG7kYaZcM3VOskcPUrdCQd/nx8obc=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "46db2e09e1d3f113a13c0d7b81e2f221c63b8ce9", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/the_fumblers/heimdall/examples/api_service/flake.nix b/the_fumblers/heimdall/examples/api_service/flake.nix new file mode 100644 index 0000000..42af03c --- /dev/null +++ b/the_fumblers/heimdall/examples/api_service/flake.nix @@ -0,0 +1,44 @@ +{ + description = "A dummy API service"; + + inputs = { + nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { self, nixpkgs, flake-utils }: + flake-utils.lib.eachDefaultSystem (system: + let + pkgs = nixpkgs.legacyPackages.${system}; + + # Native Python (no external dependencies required) + pythonEnv = pkgs.python3; + + # A script to run the dummy API + run-script = pkgs.writeShellScriptBin "run-api" '' + export PORT="''${PORT:-5000}" + echo "Starting Dummy HTTP API on port $PORT..." + exec ${pythonEnv}/bin/python ${./main.py} + ''; + + in + { + packages.default = run-script; + + apps.default = { + type = "app"; + program = "${run-script}/bin/run-api"; + }; + + devShells.default = pkgs.mkShell { + buildInputs = [ pythonEnv ]; + }; + + # Heimdall Metadata + heimdall-manifest = { + commands = [ "run" ]; + healthcheck_url = "http://127.0.0.1:5000/"; + }; + } + ); +} diff --git a/the_fumblers/heimdall/examples/api_service/main.py b/the_fumblers/heimdall/examples/api_service/main.py new file mode 100644 index 0000000..cea5de4 --- /dev/null +++ b/the_fumblers/heimdall/examples/api_service/main.py @@ -0,0 +1,25 @@ +import os +import json +from http.server import HTTPServer, BaseHTTPRequestHandler + +class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + + response = { + "status": "ok", + "message": "Hello from Heimdall Native Python Dummy API!", + "path": self.path + } + self.wfile.write(json.dumps(response).encode('utf-8')) + +if __name__ == "__main__": + port = int(os.environ.get("PORT", 5000)) + server = HTTPServer(('0.0.0.0', port), SimpleHTTPRequestHandler) + print(f"Server started globally on 0.0.0.0:{port}") + try: + server.serve_forever() + except KeyboardInterrupt: + server.server_close() diff --git a/the_fumblers/heimdall/fastapi_agent/main.py b/the_fumblers/heimdall/fastapi_agent/main.py new file mode 100755 index 0000000..9d2144e --- /dev/null +++ b/the_fumblers/heimdall/fastapi_agent/main.py @@ -0,0 +1,340 @@ +from contextlib import asynccontextmanager +from fastapi import FastAPI, Request, HTTPException +from pydantic import BaseModel +import uvicorn +import asyncio +import json +import os +import httpx +import time +import hmac +import hashlib + +# Configuration +STATE_FILE = "state.json" +WEBHOOK_URL = os.getenv("WEBHOOK_URL", "http://localhost:8080/webhook") +_secret_value = os.getenv("WEBHOOK_SECRET") +if not _secret_value: + raise RuntimeError("WEBHOOK_SECRET environment variable must be set for webhook signing.") +SECRET_KEY = _secret_value.encode() + +# State dictionaries +service_state = {} +service_locks = {} + +webhook_client: httpx.AsyncClient = None + +def generate_signature(body_str: str, timestamp: str) -> str: + message = body_str + timestamp + return hmac.new(SECRET_KEY, message.encode(), hashlib.sha256).hexdigest() + +def verify_signature(body_str: str, timestamp: str, signature: str) -> bool: + expected = generate_signature(body_str, timestamp) + return hmac.compare_digest(expected, signature) + +async def send_webhook(payload: dict): + if not webhook_client: + return + body_str = json.dumps(payload) + timestamp = str(int(time.time())) + signature = generate_signature(body_str, timestamp) + + headers = { + "X-Timestamp": timestamp, + "X-Signature": signature, + "Content-Type": "application/json" + } + try: + await webhook_client.post(WEBHOOK_URL, content=body_str.encode(), headers=headers) + except Exception as e: + print(f"Failed to send webhook: {e}") + +async def send_heartbeat(): + while True: + try: + await asyncio.sleep(10) + payload = { + "type": "node_status", + "services": service_state + } + await send_webhook(payload) + except asyncio.CancelledError: + break + except Exception as e: + print(f"Heartbeat error: {e}") + +async def health_check_loop(): + while True: + try: + await asyncio.sleep(5) + for svc, data in service_state.items(): + pid = data.get("pid") + health_url = data.get("health_url") + if pid: + try: + os.kill(pid, 0) + status = "healthy" + except OSError: + status = "dead" + + if status == "healthy" and health_url: + try: + async with httpx.AsyncClient(timeout=2.0) as client: + res = await client.get(health_url) + if res.status_code != 200: + status = "unhealthy" + except Exception: + status = "unhealthy" + + if data.get("status") != status: + service_state[svc]["status"] = status + save_state() + await send_webhook({"type": "status", "service": svc, "status": status}) + if status == "dead" and svc in service_locks and service_locks[svc].locked(): + service_locks[svc].release() + except asyncio.CancelledError: + break + except Exception as e: + print(f"Health check error: {e}") + +log_buffer = [] + +async def flush_logs_loop(): + while True: + try: + await asyncio.sleep(5) + if log_buffer: + to_send = list(log_buffer) + log_buffer.clear() + await send_webhook({ + "type": "logs_batch", + "logs": to_send + }) + except asyncio.CancelledError: + break + except Exception as e: + print(f"Log flush error: {e}") + +async def read_stream(stream, service: str, stream_name: str): + """Read logs from process stdout/stderr, print locally, write to file, and buffer for webhook.""" + if stream is None: + return + + log_file_path = f"../logs/{service}.log" + while True: + line = await stream.readline() + if not line: + break + log_line = line.decode(errors='replace').rstrip() + + # 1. Print locally (goes to node.log) + print(f"[{service} | {stream_name}] {log_line}") + + # 2. Append to individual service log file + try: + with open(log_file_path, "a") as f: + f.write(f"[{stream_name}] {log_line}\n") + except Exception as e: + print(f"Failed to write to {log_file_path}: {e}") + + # 3. Buffer for Control Plane webhook + log_buffer.append({ + "service": service, + "stream": stream_name, + "log": log_line + }) + +def save_state(): + try: + with open(STATE_FILE, "w") as f: + json.dump(service_state, f, indent=2) + except Exception as e: + print(f"Failed to save state: {e}") + +class CommandRequest(BaseModel): + operation_id: str + service: str + flake: str + healthcheck_url: str | None = None + +@asynccontextmanager +async def lifespan(app: FastAPI): + print("Starting up FastAPI node agent...") + global webhook_client + webhook_client = httpx.AsyncClient(timeout=5.0) + + heartbeat_task = asyncio.create_task(send_heartbeat()) + health_task = asyncio.create_task(health_check_loop()) + log_flush_task = asyncio.create_task(flush_logs_loop()) + + if os.path.exists(STATE_FILE): + try: + with open(STATE_FILE, "r") as f: + state = json.load(f) + + for svc, data in state.items(): + pid = data.get("pid") + health_url = data.get("health_url") + if pid: + try: + os.kill(pid, 0) + service_state[svc] = {"pid": pid, "status": "healthy", "health_url": health_url} + lock = asyncio.Lock() + await lock.acquire() + service_locks[svc] = lock + except OSError: + service_state[svc] = {"pid": pid, "status": "dead", "health_url": health_url} + + save_state() + except Exception as e: + print(f"Error loading state: {e}") + + yield + print("Shutting down FastAPI node agent...") + heartbeat_task.cancel() + health_task.cancel() + log_flush_task.cancel() + if webhook_client: + await webhook_client.aclose() + +app = FastAPI(lifespan=lifespan) + +from fastapi import Depends + +async def verify_hmac(request: Request): + x_timestamp = request.headers.get("x-timestamp") + x_signature = request.headers.get("x-signature") + + if not x_timestamp or not x_signature: + raise HTTPException(status_code=401, detail="Missing HMAC headers") + + try: + ts = int(x_timestamp) + now = time.time() + if abs(now - ts) > 30: + raise HTTPException(status_code=400, detail="Request too old") + except ValueError: + raise HTTPException(status_code=400, detail="Invalid timestamp") + + body_bytes = await request.body() + body_str = body_bytes.decode('utf-8') + timestamp = x_timestamp + message = body_str + timestamp + + print("SERVER BODY:", body_str) + print("SERVER TIMESTAMP:", timestamp) + print("SERVER MESSAGE:", message) + + expected_signature = hmac.new( + SECRET_KEY, + message.encode(), + hashlib.sha256 + ).hexdigest() + + # Allow a bypass 'default-signature' for easier manual testing + if x_signature == "default-signature": + return + + if not hmac.compare_digest(expected_signature, x_signature): + raise HTTPException(status_code=401, detail="Invalid signature") + +@app.get("/heartbeat") +async def heartbeat(): + return {"status": "ok"} + +@app.post("/command", dependencies=[Depends(verify_hmac)]) +async def receive_command(command: CommandRequest): + + print(f"Received request: {command.model_dump_json(indent=2)}") + + svc = command.service + if svc not in service_locks: + service_locks[svc] = asyncio.Lock() + + lock = service_locks[svc] + + if lock.locked(): + old_pid = service_state.get(svc, {}).get("pid") + if old_pid: + try: + os.kill(old_pid, 15) # SIGTERM + print(f"Sent SIGTERM to old {svc} process PID {old_pid} for redeploy") + except OSError: + pass + + try: + await asyncio.wait_for(lock.acquire(), timeout=10.0) + except asyncio.TimeoutError: + return {"status": "failed", "error": "Old process resisted termination, lock timeout"} + else: + await lock.acquire() + + try: + # Validate flake + flake_ref = command.flake.split('#')[0] + show_proc = await asyncio.create_subprocess_exec( + "nix", "flake", "show", flake_ref, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + show_out, show_err = await show_proc.communicate() + if show_proc.returncode != 0: + print("FLAKE SHOW ERROR:", show_err.decode(errors='replace')) + lock.release() + return {"status": "failed", "error": "invalid flake"} + + # Evaluate manifest + eval_proc = await asyncio.create_subprocess_exec( + "nix", "eval", command.flake, "--json", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + eval_out, eval_err = await eval_proc.communicate() + if eval_proc.returncode == 0: + print("Manifest Evaluation Result:", eval_out.decode(errors="replace")) + else: + raise Exception(f"Manifest evaluation failed: {eval_err.decode(errors='replace')}") + + # Execute service + process = await asyncio.create_subprocess_exec( + "nix", "run", command.flake, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT + ) + + service_state[svc] = { + "pid": process.pid, + "status": "booting", + "health_url": command.healthcheck_url + } + save_state() + + await send_webhook({"type": "status", "service": svc, "status": "booting"}) + + if process.stdout: + asyncio.create_task(read_stream(process.stdout, svc, "stdout")) + + async def wait_and_release(): + await process.wait() + if svc in service_state: + service_state[svc]["status"] = "dead" + save_state() + if lock.locked(): + lock.release() + await send_webhook({"type": "status", "service": svc, "status": "dead"}) + + asyncio.create_task(wait_and_release()) + + return {"status": "accepted"} + + except Exception as e: + print(f"Error starting process for service {svc}: {e}") + lock.release() + await send_webhook({"type": "status", "service": svc, "status": "failed", "error": str(e)}) + return {"status": "failed"} + +if __name__ == "__main__": + import uvicorn + agent_port = int(os.getenv("HEIMDALL_AGENT_PORT", 8001)) + print(f"Heimdall Node Agent starting up on port {agent_port}...") + uvicorn.run(app, host="0.0.0.0", port=agent_port) diff --git a/the_fumblers/heimdall/fastapi_agent/state.json b/the_fumblers/heimdall/fastapi_agent/state.json new file mode 100644 index 0000000..88626fd --- /dev/null +++ b/the_fumblers/heimdall/fastapi_agent/state.json @@ -0,0 +1,22 @@ +{ + "dummy-api": { + "pid": 45300, + "status": "dead", + "health_url": "http://0.0.0.0:8080/" + }, + "service-1": { + "pid": 68806, + "status": "healthy", + "health_url": "http://127.0.0.1:5000/" + }, + "worker-1": { + "pid": 67532, + "status": "healthy", + "health_url": null + }, + "nodejs-app": { + "pid": 67708, + "status": "healthy", + "health_url": null + } +} \ No newline at end of file diff --git a/the_fumblers/heimdall/fastapi_agent/test_app.py b/the_fumblers/heimdall/fastapi_agent/test_app.py new file mode 100755 index 0000000..99703d8 --- /dev/null +++ b/the_fumblers/heimdall/fastapi_agent/test_app.py @@ -0,0 +1,21 @@ +from fastapi.testclient import TestClient +from fastapi_agent.main import app + +def run_tests(): + print("--- Testing /command endpoint ---") + + with TestClient(app) as client: + payload = { + "operation_id": "op-test-123", + "service": "test-service-1", + "flake": "nixpkgs#hello" + } + + print(f"Sending POST to /command with payload: {payload}") + response = client.post("/command", json=payload) + + print("Status code:", response.status_code) + print("Response JSON:", response.json()) + +if __name__ == "__main__": + run_tests() diff --git a/the_fumblers/heimdall/fastapi_agent/test_main.py b/the_fumblers/heimdall/fastapi_agent/test_main.py new file mode 100755 index 0000000..3baae87 --- /dev/null +++ b/the_fumblers/heimdall/fastapi_agent/test_main.py @@ -0,0 +1,43 @@ +import pytest +from fastapi.testclient import TestClient +from main import app, service_locks, service_pids +import asyncio + +@pytest.fixture +def client(): + # Clean up state before each test + service_locks.clear() + service_pids.clear() + with TestClient(app) as test_client: + yield test_client + +def test_command_success(client): + """Test normal successful subprocess spawn.""" + payload = { + "operation_id": "op-1", + "service": "success-svc", + "flake": "nixpkgs#hello" + } + res = client.post("/command", json=payload) + + assert res.status_code == 200 + assert res.json() == {"status": "accepted"} + assert "success-svc" in service_pids + +def test_command_already_running(client): + """Test already running rejection.""" + class MockLockedLock: + def locked(self): + return True + + service_locks["locked-svc"] = MockLockedLock() + + payload = { + "operation_id": "op-3", + "service": "locked-svc", + "flake": "nixpkgs#long-running" + } + res = client.post("/command", json=payload) + + assert res.status_code == 200 + assert res.json() == {"status": "already running"} diff --git a/the_fumblers/heimdall/flake.lock b/the_fumblers/heimdall/flake.lock new file mode 100644 index 0000000..1fa3427 --- /dev/null +++ b/the_fumblers/heimdall/flake.lock @@ -0,0 +1,27 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1774386573, + "narHash": "sha256-4hAV26quOxdC6iyG7kYaZcM3VOskcPUrdCQd/nx8obc=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "46db2e09e1d3f113a13c0d7b81e2f221c63b8ce9", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/the_fumblers/heimdall/flake.nix b/the_fumblers/heimdall/flake.nix new file mode 100644 index 0000000..349f5b2 --- /dev/null +++ b/the_fumblers/heimdall/flake.nix @@ -0,0 +1,23 @@ +{ + description = ""; + + inputs = { + nixpkgs.url = "github:nixos/nixpkgs?ref=nixos-unstable"; + }; + + outputs = { + self, + nixpkgs, + }: let + system = "x86_64-linux"; + pkgs = import nixpkgs {inherit system;}; + in { + devShells.${system}.default = pkgs.mkShell { + buildInputs = with pkgs; [ + python314 + uv + tailscale + ]; + }; + }; +} diff --git a/the_fumblers/heimdall/node.py b/the_fumblers/heimdall/node.py new file mode 100644 index 0000000..f810b20 --- /dev/null +++ b/the_fumblers/heimdall/node.py @@ -0,0 +1,13 @@ +from fastapi import FastAPI +from datetime import datetime, UTC + +app = FastAPI() + +@app.get("/heartbeat") +async def heartbeat(): + # Node reports it is alive + return { + "status": "alive", + "time": datetime.now(UTC).isoformat() + "Z" + } + diff --git a/the_fumblers/heimdall/requirements.txt b/the_fumblers/heimdall/requirements.txt new file mode 100644 index 0000000..a8b3b53 --- /dev/null +++ b/the_fumblers/heimdall/requirements.txt @@ -0,0 +1,8 @@ +fastapi>=0.111.0 +uvicorn[standard]>=0.29.0 +pydantic>=2.6.0 +python-dotenv>=1.0.0 + +# Discord bot +discord.py>=2.3.2 +aiohttp>=3.9.0 \ No newline at end of file diff --git a/the_fumblers/heimdall/start.sh b/the_fumblers/heimdall/start.sh new file mode 100755 index 0000000..24168d2 --- /dev/null +++ b/the_fumblers/heimdall/start.sh @@ -0,0 +1,105 @@ +#!/usr/bin/env bash + +set -e + +echo "🛡️ Booting Heimdall Ecosystem..." + +# --- Config --- +CTRL_PORT=8000 +AGENT_PORT=8001 +SVC_PORT=5000 +# -------------- + +echo "0. Cleaning up previous running instances (Ports $CTRL_PORT, $AGENT_PORT, $SVC_PORT)..." +fuser -k $CTRL_PORT/tcp 2>/dev/null || true +fuser -k $AGENT_PORT/tcp 2>/dev/null || true +fuser -k $SVC_PORT/tcp 2>/dev/null || true +fuser -k 8080/tcp 2>/dev/null || true # Old default +pkill -f "uvicorn api:app" || true +pkill -f "uvicorn main:app" || true +pkill -f "python discord_bot/bot.py" || true +pkill -f "python api.py" || true +sleep 1 + +# Create a place to store logs +mkdir -p logs + +export WEBHOOK_SECRET="super-secret-key" +export INFRA_API_KEY="heimdall" +export INFRA_API_URL="http://localhost:$CTRL_PORT" +export HEIMDALL_API_PORT=$CTRL_PORT +export HEIMDALL_AGENT_PORT=$AGENT_PORT + +echo "1. Starting Heimdall Control Plane (API) on port $CTRL_PORT..." +uvicorn api:app --host 0.0.0.0 --port $CTRL_PORT > logs/api.log 2>&1 & +CTRL_PID=$! + +echo "⏳ Waiting for Control Plane to be ready..." +for i in {1..10}; do + if curl -s "http://localhost:$CTRL_PORT/health" > /dev/null; then + echo "🟢 Control Plane is UP." + break + fi + sleep 1 +done + +echo "2. Bootstrapping 'local-agent' node in database..." +python3 -c ' +import os +from db import SessionLocal, Node +db = SessionLocal() +agent_port = os.environ.get("HEIMDALL_AGENT_PORT", "8001") +node = db.query(Node).filter_by(name="local-agent").first() +if not node: + node = Node(name="local-agent", uuid="local-agent", host=f"http://localhost:{agent_port}", env="dev") + db.add(node) + db.commit() +db.close() +' + +echo "3. Registering 'service-1' and 'worker-1' via API curl..." +curl -sS -X POST http://127.0.0.1:$CTRL_PORT/services \ + -H "X-API-Key: heimdall" \ + -H "Content-Type: application/json" \ + -d '{ + "service": "service-1", + "node_name": "local-agent", + "flake": "path:'"$PWD"'/examples/api_service", + "commands": ["run"], + "healthcheck_url": "http://127.0.0.1:'"$SVC_PORT"'/", + "environment": "dev" + }' > /dev/null + +curl -sS -X POST http://127.0.0.1:$CTRL_PORT/services \ + -H "X-API-Key: heimdall" \ + -H "Content-Type: application/json" \ + -d '{ + "service": "worker-1", + "node_name": "local-agent", + "flake": "path:'"$PWD"'/examples/worker_service", + "commands": ["run"], + "environment": "dev" + }' > /dev/null + +echo "" + +echo "4. Starting fastapi_agent (Node Agent) on port $AGENT_PORT..." +export WEBHOOK_URL="http://localhost:$CTRL_PORT/webhook" +cd fastapi_agent +uvicorn main:app --host 0.0.0.0 --port $AGENT_PORT > ../logs/node.log 2>&1 & +NODE_PID=$! +cd .. + +echo "5. Starting Discord Bot..." +export SSL_CERT_FILE=$(python3 -m certifi) +python3 discord_bot/bot.py > logs/bot.log 2>&1 & +BOT_PID=$! + +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo "✅ Heimdall Infrastructure Ecosystem is UP!" +echo " - Control Plane: http://localhost:$CTRL_PORT" +echo " - Node Agent: http://localhost:$AGENT_PORT" +echo " - Discord Bot: Active" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo "Use 'kill $CTRL_PID $NODE_PID $BOT_PID' to shutdown." +echo "" diff --git a/the_fumblers/heimdall/test.py b/the_fumblers/heimdall/test.py new file mode 100644 index 0000000..2ab4c2a --- /dev/null +++ b/the_fumblers/heimdall/test.py @@ -0,0 +1,246 @@ +import hashlib +import hmac +import json +import pytest +from fastapi.testclient import TestClient +from api import app, WEBHOOK_SECRET +from db import SessionLocal, Node, ServiceInstance, Operation, init_db + + +@pytest.fixture(autouse=True) +def setup_db(): + init_db() + db = SessionLocal() + db.query(Operation).delete() + db.query(ServiceInstance).delete() + db.query(Node).delete() + db.commit() + db.close() + +@pytest.fixture(autouse=True) +def mock_agent_command(monkeypatch): + async def mock_cmd(*args, **kwargs): + return {"status": "accepted"} + monkeypatch.setattr("app.ops.send_agent_command", mock_cmd) + +client = TestClient(app) + + +def sign_payload(payload: dict): + body = json.dumps(payload).encode("utf-8") + digest = hmac.new(WEBHOOK_SECRET.encode("utf-8"), body, hashlib.sha256).hexdigest() + return body, {"x-signature": f"sha256={digest}", "Content-Type": "application/json"} + + +# ── Webhook tests ──────────────────────────────────────────────────────────── + +def test_webhook_deploy(): + # Setup: register a node first + db = SessionLocal() + node = Node(name="test-node", uuid="test-uuid", host="localhost", env="prod") + db.add(node) + db.commit() + db.close() + + payload = {"action": "deploy", "service": "api", "version": "v1", "env": "prod"} + body, headers = sign_payload(payload) + + r = client.post("/webhook", data=body, headers=headers) + assert r.status_code == 200 + assert r.json() == {"status": "accepted"} + + db = SessionLocal() + ops = db.query(Operation).all() + assert len(ops) == 1 + assert ops[0].type == "deploy" + db.close() + + +def test_webhook_register(): + payload = { + "action": "register", + "service": "node-v2", + "env": "prod", + "metadata": {"name": "node-v2", "host": "http://localhost:8002"}, + } + body, headers = sign_payload(payload) + + r = client.post("/webhook", data=body, headers=headers) + assert r.status_code == 200 + + db = SessionLocal() + node = db.query(Node).filter(Node.name == "node-v2").first() + assert node is not None + assert node.host == "http://localhost:8002" + db.close() + + +def test_webhook_invalid_signature(): + payload = {"action": "deploy", "service": "api", "env": "prod"} + body = json.dumps(payload).encode("utf-8") + + r = client.post("/webhook", data=body, headers={"x-signature": "sha256=bad"}) + assert r.status_code == 401 + + +def test_webhook_missing_header(): + payload = {"action": "deploy", "service": "api", "env": "prod"} + body = json.dumps(payload).encode("utf-8") + + r = client.post("/webhook", data=body) + assert r.status_code == 422 + + +# ── Infra control tests ───────────────────────────────────────────────────── + +API_KEY_HEADER = {"X-API-Key": "heimdall"} + + +def test_declare_service(): + db = SessionLocal() + db.add(Node(name="declare-node", uuid="uuid-declare", host="http://localhost:8004", env="dev")) + db.commit() + db.close() + + # Declare the service + r = client.post("/services", json={ + "service": "declared-svc", + "node_name": "declare-node", + "flake": "path:/test", + "environment": "dev", + }, headers=API_KEY_HEADER) + assert r.status_code == 200 + + # Deploy without node_name and flake + r = client.post("/deploy", json={ + "service": "declared-svc", + "version": "v2", + "environment": "dev", + }, headers=API_KEY_HEADER) + assert r.status_code == 200 + data = r.json() + assert data["status"] == "pending" + + # Verify the operation used the declared config + op_id = data["operation_id"] + db = SessionLocal() + op = db.query(Operation).filter_by(id=op_id).first() + assert op.metadata_json["flake"] == "path:/test" + db.close() + +def test_deploy_endpoint_with_node(): + db = SessionLocal() + db.add(Node(name="deploy-node", uuid="uuid-1", host="http://localhost:8001", env="dev")) + db.commit() + db.close() + + r = client.post("/deploy", json={ + "service": "api-gateway", + "node_name": "deploy-node", + "repo_url": "github:org/repo", + "flake": "github:org/repo#api", + "commands": ["nix run"], + "version": "v1.4.2", + "environment": "dev", + }, headers=API_KEY_HEADER) + assert r.status_code == 200 + data = r.json() + assert data["status"] == "pending" + assert "operation_id" in data + + +def test_teardown_endpoint(): + r = client.post("/teardown", json={ + "service": "api-gateway", + "environment": "dev", + "confirm": True, + }, headers=API_KEY_HEADER) + assert r.status_code == 200 + data = r.json() + assert data["status"] == "pending" + + +def test_rollback_endpoint(): + r = client.post("/rollback", json={ + "service": "api-gateway", + "environment": "dev", + "target_version": "v1.4.1", + }, headers=API_KEY_HEADER) + assert r.status_code == 200 + data = r.json() + assert data["status"] == "pending" + + +def test_operation_status(): + # Setup node + db = SessionLocal() + db.add(Node(name="status-node", uuid="uuid-2", host="http://localhost:8002", env="dev")) + db.commit() + db.close() + + # Create a deploy first + r = client.post("/deploy", json={ + "service": "api-gateway", + "node_name": "status-node", + "version": "v1.0.0", + "environment": "dev", + }, headers=API_KEY_HEADER) + op_id = r.json()["operation_id"] + + # Check its status + r = client.get(f"/operations/{op_id}", headers=API_KEY_HEADER) + assert r.status_code == 200 + data = r.json() + assert data["id"] == op_id + assert data["type"] == "deploy" + assert data["service"] == "api-gateway" + + +def test_list_operations(): + # Setup node + db = SessionLocal() + db.add(Node(name="list-node", uuid="uuid-3", host="http://localhost:8003", env="dev")) + db.commit() + db.close() + + # Create a couple operations + client.post("/deploy", json={"service": "svc1", "node_name": "list-node", "version": "v1", "environment": "dev"}, headers=API_KEY_HEADER) + client.post("/deploy", json={"service": "svc2", "node_name": "list-node", "version": "v2", "environment": "dev"}, headers=API_KEY_HEADER) + + r = client.get("/operations", headers=API_KEY_HEADER) + assert r.status_code == 200 + data = r.json() + assert data["total"] == 2 + assert len(data["operations"]) == 2 + + +def test_operation_not_found(): + r = client.get("/operations/nonexistent", headers=API_KEY_HEADER) + assert r.status_code == 404 + + +def test_deploy_unauthorized(): + r = client.post("/deploy", json={ + "service": "api-gateway", + "node_name": "auth-node", + "version": "v1", + "environment": "dev", + }) + assert r.status_code == 401 + + +def test_health(): + r = client.get("/health") + assert r.status_code == 200 + assert r.json()["status"] == "ok" + + +def test_nodes_endpoint(): + db = SessionLocal() + db.add(Node(name="n1", uuid="u1", host="http://localhost:8001", env="prod")) + db.commit() + db.close() + + r = client.get("/nodes") + assert r.status_code == 200 + assert any(n["name"] == "n1" for n in r.json()) diff --git a/the_fumblers/heimdall/test_orchaestrator.py b/the_fumblers/heimdall/test_orchaestrator.py new file mode 100644 index 0000000..75b8a6d --- /dev/null +++ b/the_fumblers/heimdall/test_orchaestrator.py @@ -0,0 +1,140 @@ +import asyncio +import pytest +from fastapi.testclient import TestClient +import api +from datetime import datetime, UTC +from db import SessionLocal, Node, Operation, ServiceInstance, init_db + +class DummyResponse: + def __init__(self, status_code): + self.status_code = status_code + + def json(self): + return { + "status": "alive", + "time": datetime.now(UTC).isoformat() + "Z" + } + +class FakeSuccessClient: + def __init__(self, timeout=None): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def get(self, url): + return DummyResponse(200) + +class FakeFailClient: + def __init__(self, timeout=None): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def get(self, url): + raise Exception("node unreachable") + +@pytest.fixture(autouse=True) +def setup_db(): + init_db() + db = SessionLocal() + db.query(Operation).delete() + db.query(ServiceInstance).delete() + db.query(Node).delete() + db.commit() + db.close() + +def test_node_heartbeat(): + # Note: node.app was the separate node.py app. + # Since we merged orchestrator into api, we still have node.py as a separate file. + import node + client = TestClient(node.app) + r = client.get("/heartbeat") + assert r.status_code == 200 + body = r.json() + assert body["status"] == "alive" + +def test_orchestrator_check_node_online(monkeypatch): + db = SessionLocal() + test_node = Node(name="node1", uuid="node1-uuid", host="http://localhost:8001", env="prod", status="UNKNOWN", fail_count=1) + db.add(test_node) + db.commit() + node_id = test_node.id + db.close() + + monkeypatch.setattr(api.httpx, "AsyncClient", FakeSuccessClient) + + asyncio.run(api.check_node(node_id)) + + db = SessionLocal() + updated_node = db.query(Node).filter(Node.id == node_id).first() + assert updated_node.status == "ONLINE" + assert updated_node.fail_count == 0 + assert updated_node.last_seen is not None + db.close() + +def test_orchestrator_check_node_offline(monkeypatch): + db = SessionLocal() + test_node = Node(name="node1", uuid="node1-uuid", host="http://localhost:8001", env="prod", status="UNKNOWN", fail_count=api.FAIL_THRESHOLD - 1) + db.add(test_node) + db.commit() + node_id = test_node.id + db.close() + + monkeypatch.setattr(api.httpx, "AsyncClient", FakeFailClient) + + asyncio.run(api.check_node(node_id)) + + db = SessionLocal() + updated_node = db.query(Node).filter(Node.id == node_id).first() + assert updated_node.status == "OFFLINE" + assert updated_node.fail_count == api.FAIL_THRESHOLD + db.close() + +def test_orchestrator_node_status_transition(monkeypatch): + db = SessionLocal() + test_node = Node(name="node1", uuid="node1-uuid", host="http://localhost:8001", env="prod", status="UNKNOWN", fail_count=0) + db.add(test_node) + db.commit() + node_id = test_node.id + db.close() + + monkeypatch.setattr(api.httpx, "AsyncClient", FakeSuccessClient) + asyncio.run(api.check_node(node_id)) + + db = SessionLocal() + assert db.query(Node).filter(Node.id == node_id).first().status == "ONLINE" + db.close() + + monkeypatch.setattr(api.httpx, "AsyncClient", FakeFailClient) + for _ in range(api.FAIL_THRESHOLD): + asyncio.run(api.check_node(node_id)) + + db = SessionLocal() + updated_node = db.query(Node).filter(Node.id == node_id).first() + assert updated_node.status == "OFFLINE" + assert updated_node.fail_count >= api.FAIL_THRESHOLD + db.close() + +def test_orchestrator_nodes_endpoint(monkeypatch): + # Register a node + db = SessionLocal() + test_node = Node(name="node1", uuid="node1-uuid", host="http://localhost:8001", env="prod") + db.add(test_node) + db.commit() + db.close() + + # Prevent monitor task racing in test client startup + monkeypatch.setattr(api.app.router, "on_startup", []) + client = TestClient(api.app) + r = client.get("/nodes") + assert r.status_code == 200 + body = r.json() + assert any(n["name"] == "node1" for n in body) \ No newline at end of file diff --git a/the_fumblers/heimdall/test_webhook.py b/the_fumblers/heimdall/test_webhook.py new file mode 100644 index 0000000..f6b73ee --- /dev/null +++ b/the_fumblers/heimdall/test_webhook.py @@ -0,0 +1,89 @@ +import hashlib +import hmac +import json +import pytest +from fastapi.testclient import TestClient +from api import app, WEBHOOK_SECRET +from db import SessionLocal, Node, ServiceInstance, Operation, init_db + +@pytest.fixture(autouse=True) +def setup_db(): + init_db() + db = SessionLocal() + db.query(Operation).delete() + db.query(ServiceInstance).delete() + db.query(Node).delete() + db.commit() + db.close() + +client = TestClient(app) + +def sign_payload(payload: dict): + body = json.dumps(payload).encode("utf-8") + digest = hmac.new(WEBHOOK_SECRET.encode("utf-8"), body, hashlib.sha256).hexdigest() + return body, {"X-Signature": f"sha256={digest}", "Content-Type": "application/json"} + +def test_webhook_deploy_success(): + # Setup node + db = SessionLocal() + node = Node(name="test-node", uuid="test-node-uuid", host="localhost", env="prod") + db.add(node) + db.commit() + db.close() + + payload = { + "action": "deploy", + "service": "api", + "version": "v1", + "env": "prod" + } + body, headers = sign_payload(payload) + + response = client.post('/webhook', data=body, headers=headers) + assert response.status_code == 200 + assert response.json() == {"status": "accepted"} + + # Verify Operation created + db = SessionLocal() + ops = db.query(Operation).all() + assert len(ops) == 1 + assert ops[0].type == "deploy" + db.close() + +def test_webhook_register_success(): + payload = { + "action": "register", + "service": "node-v2", + "env": "prod", + "metadata": { + "name": "node-v2", + "host": "http://localhost:8002" + } + } + body, headers = sign_payload(payload) + + response = client.post('/webhook', data=body, headers=headers) + assert response.status_code == 200 + + db = SessionLocal() + node = db.query(Node).filter(Node.name == "node-v2").first() + assert node is not None + assert node.host == "http://localhost:8002" + db.close() + +def test_webhook_invalid_signature(): + payload = {"action": "deploy", "service": "api", "env": "prod"} + body, headers = sign_payload(payload) + headers["X-Signature"] = "sha256=invalid" + + response = client.post('/webhook', data=body, headers=headers) + assert response.status_code == 401 + +def test_nodes_endpoint(): + # Register a node first + test_webhook_register_success() + + response = client.get('/nodes') + assert response.status_code == 200 + nodes = response.json() + assert any(n['name'] == 'node-v2' for n in nodes) From 68251873ece8cd3a3570886c53c605472bc180d2 Mon Sep 17 00:00:00 2001 From: Blazzzeee Date: Sat, 28 Mar 2026 18:38:26 +0530 Subject: [PATCH 2/2] push readme --- the_fumblers/README.md | 184 ++++++++++++++++++ .../examples/nodejs_service/flake.lock | 61 ++++++ .../examples/nodejs_service/flake.nix | 41 ++++ .../heimdall/examples/nodejs_service/index.js | 18 ++ .../examples/worker_service/flake.lock | 61 ++++++ .../examples/worker_service/flake.nix | 37 ++++ .../heimdall/examples/worker_service/main.py | 34 ++++ 7 files changed, 436 insertions(+) create mode 100644 the_fumblers/README.md create mode 100644 the_fumblers/heimdall/examples/nodejs_service/flake.lock create mode 100644 the_fumblers/heimdall/examples/nodejs_service/flake.nix create mode 100644 the_fumblers/heimdall/examples/nodejs_service/index.js create mode 100644 the_fumblers/heimdall/examples/worker_service/flake.lock create mode 100644 the_fumblers/heimdall/examples/worker_service/flake.nix create mode 100644 the_fumblers/heimdall/examples/worker_service/main.py diff --git a/the_fumblers/README.md b/the_fumblers/README.md new file mode 100644 index 0000000..f0b8a8d --- /dev/null +++ b/the_fumblers/README.md @@ -0,0 +1,184 @@ +# 🌌 heimdall + +**Distributed Node Orchestration & Health Monitoring Platform** + +heimdall is a lightweight Python platform that continuously polls registered services for their health, tracks failure counts, and delivers alerts through Discord, Telegram, or generic webhooks — all exposed via a clean REST API. + +--- + +## Features + +- **Async heartbeat polling** — orchestrator checks every registered node's `/heartbeat` endpoint on a loop using `asyncio` + `httpx` +- **Resilience thresholds** — nodes aren't marked `OFFLINE` on the first failure; a configurable `FAIL_THRESHOLD` prevents false positives +- **Multi-channel alerts** — integrations for Discord bots, Telegram bots, and generic webhooks +- **Live status API** — `GET /nodes` returns real-time status of all registered nodes +- **Pluggable nodes** — any FastAPI service that exposes `/heartbeat` becomes a Stellaris node instantly +- **Reproducible dev environment** — Nix flake included for zero-dependency-conflict setup + +--- + +## Architecture + +``` +┌─────────────┐ heartbeat poll ┌─────────────────────────────┐ +│ Node A │ ──────────────────────▶│ │ +├─────────────┤ │ Orchestrator │──▶ GET /nodes (REST) +│ Node B │ ──────────────────────▶│ FastAPI + asyncio │──▶ Discord Bot +├─────────────┤ │ fail_count tracking │──▶ Telegram Bot +│ Node C │ ──────────────────────▶│ │──▶ Webhook +└─────────────┘ └─────────────────────────────┘ +``` + +Each node exposes a `/heartbeat` endpoint. The orchestrator polls all nodes, increments `fail_count` on failures, and flips status to `OFFLINE` once the threshold is crossed. + +--- + +## Project Structure + +``` +stellaris/ +├── orchestrator.py # Core: async polling loop, node state, REST API +├── node.py # Lightweight FastAPI heartbeat microservice +├── api.py # API layer / shared app setup +├── test.py # General tests +├── test_orchaestrator.py # Orchestrator-specific tests (monkeypatched) +├── flake.nix # Nix development environment +├── flake.lock # Locked Nix dependencies +└── .envrc # direnv hook: `use flake` +``` + +--- + +## Getting Started + +### Prerequisites + +- Python 3.10+ +- [Nix](https://nixos.org/) (recommended) **or** install dependencies manually + +### With Nix (recommended) + +```bash +# Enter the dev shell — all dependencies are loaded automatically +direnv allow # if you use direnv +# or +nix develop +``` + +### Without Nix + +```bash +pip install fastapi uvicorn httpx pytest +``` + +### Run the Node + +```bash +uvicorn node:app --port 8001 +``` + +The node exposes: + +``` +GET /heartbeat → { "status": "alive", "time": "2025-03-28T10:00:00Z" } +``` + +### Run the Orchestrator + +```bash +uvicorn orchestrator:app --port 8000 +``` + +The orchestrator exposes: + +``` +GET /nodes → { "node1": { "url": "...", "status": "ONLINE", "fail_count": 0, "last_seen": "..." } } +``` + +--- + +## Configuration + +Edit the node registry inside `orchestrator.py` to add your services: + +```python +NODES = { + "node1": { + "url": "http://localhost:8001", + "status": "UNKNOWN", + "fail_count": 0, + "last_seen": None, + } +} + +FAIL_THRESHOLD = 3 # failures before a node is marked OFFLINE +``` + +--- + +## Node Status Reference + +| Status | Meaning | +|-----------|------------------------------------------------------| +| `UNKNOWN` | Node registered but not yet polled | +| `ONLINE` | Last heartbeat succeeded; `fail_count` reset to 0 | +| `OFFLINE` | `fail_count` reached `FAIL_THRESHOLD` | + +--- + +## Running Tests + +```bash +pytest test.py test_orchaestrator.py -v +``` + +Tests use `monkeypatch` to inject fake HTTP clients — no live services required. + +### What's tested + +- Node `/heartbeat` returns `{ status, time }` with a UTC timestamp +- Orchestrator marks node `ONLINE` and resets `fail_count` on success +- Orchestrator marks node `OFFLINE` after `FAIL_THRESHOLD` consecutive failures +- `fail_count` increments correctly on single failure (status stays `UNKNOWN`) +- Status transitions: `UNKNOWN → ONLINE → OFFLINE` +- `GET /nodes` endpoint returns the full registry + +--- + +## Notification Integrations + +Each integration lives in its own branch and can be merged independently. + +| Channel | Branch | Status | +|---------------|------------------------|-------------| +| Discord Bot | `ft.connect_discord` | ✅ Complete | +| Telegram Bot | `ft.connect_telegram` | ✅ Complete | +| Webhook | `ft.addwebhook` | ✅ Complete | +| Node Auto-reg | `ft.add_node_service` | 🚧 In progress | + +--- + +## Tech Stack + +| Layer | Technology | +|---------------|---------------------| +| Web framework | FastAPI | +| Async HTTP | httpx | +| Testing | pytest | +| Runtime | Python asyncio | +| Dev env | Nix flakes + direnv | + +--- + +## Contributing + +1. Fork the repo and create a branch from `master` +2. Follow the `ft.` naming convention for feature branches +3. Add or update tests for any changed behaviour +4. Open a pull request with a clear description + +--- + +## License + +MIT — see `LICENSE` for details. diff --git a/the_fumblers/heimdall/examples/nodejs_service/flake.lock b/the_fumblers/heimdall/examples/nodejs_service/flake.lock new file mode 100644 index 0000000..d95764d --- /dev/null +++ b/the_fumblers/heimdall/examples/nodejs_service/flake.lock @@ -0,0 +1,61 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1774386573, + "narHash": "sha256-4hAV26quOxdC6iyG7kYaZcM3VOskcPUrdCQd/nx8obc=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "46db2e09e1d3f113a13c0d7b81e2f221c63b8ce9", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/the_fumblers/heimdall/examples/nodejs_service/flake.nix b/the_fumblers/heimdall/examples/nodejs_service/flake.nix new file mode 100644 index 0000000..c28fda2 --- /dev/null +++ b/the_fumblers/heimdall/examples/nodejs_service/flake.nix @@ -0,0 +1,41 @@ +{ + description = "A minimal NodeJS service for Heimdall"; + + inputs = { + nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { self, nixpkgs, flake-utils }: + flake-utils.lib.eachDefaultSystem (system: + let + pkgs = nixpkgs.legacyPackages.${system}; + nodejs = pkgs.nodejs; + + run-script = pkgs.writeShellScriptBin "run-node" '' + export PORT="''${PORT:-3000}" + echo "Starting NodeJS Service on port $PORT..." + exec ${nodejs}/bin/node ${./index.js} + ''; + + in + { + packages.default = run-script; + + apps.default = { + type = "app"; + program = "${run-script}/bin/run-node"; + }; + + devShells.default = pkgs.mkShell { + buildInputs = [ nodejs ]; + }; + + # Heimdall Metadata + heimdall-manifest = { + commands = [ "run" ]; + healthcheck_url = "http://127.0.0.1:3000/"; + }; + } + ); +} diff --git a/the_fumblers/heimdall/examples/nodejs_service/index.js b/the_fumblers/heimdall/examples/nodejs_service/index.js new file mode 100644 index 0000000..d66834e --- /dev/null +++ b/the_fumblers/heimdall/examples/nodejs_service/index.js @@ -0,0 +1,18 @@ +const http = require('http'); + +const port = process.env.PORT || 3000; + +const server = http.createServer((req, res) => { + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify({ + status: 'healthy', + message: 'Heimdall NodeJS Node Alive!', + time: new Date().toISOString(), + env: process.env.NODE_ENV || 'development' + })); +}); + +server.listen(port, () => { + console.log(`NodeJS Server running on port ${port}`); +}); diff --git a/the_fumblers/heimdall/examples/worker_service/flake.lock b/the_fumblers/heimdall/examples/worker_service/flake.lock new file mode 100644 index 0000000..d95764d --- /dev/null +++ b/the_fumblers/heimdall/examples/worker_service/flake.lock @@ -0,0 +1,61 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1774386573, + "narHash": "sha256-4hAV26quOxdC6iyG7kYaZcM3VOskcPUrdCQd/nx8obc=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "46db2e09e1d3f113a13c0d7b81e2f221c63b8ce9", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/the_fumblers/heimdall/examples/worker_service/flake.nix b/the_fumblers/heimdall/examples/worker_service/flake.nix new file mode 100644 index 0000000..2b80459 --- /dev/null +++ b/the_fumblers/heimdall/examples/worker_service/flake.nix @@ -0,0 +1,37 @@ +{ + description = "A dummy background worker service"; + + inputs = { + nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { self, nixpkgs, flake-utils }: + flake-utils.lib.eachDefaultSystem (system: + let + pkgs = nixpkgs.legacyPackages.${system}; + + pythonEnv = pkgs.python3.withPackages (ps: with ps; [ + # Add worker dependencies here if needed + ]); + + run-script = pkgs.writeShellScriptBin "run-worker" '' + echo "Starting Dummy Worker Service..." + exec ${pythonEnv}/bin/python ${./main.py} + ''; + + in + { + packages.default = run-script; + + apps.default = { + type = "app"; + program = "${run-script}/bin/run-worker"; + }; + + devShells.default = pkgs.mkShell { + buildInputs = [ pythonEnv ]; + }; + } + ); +} diff --git a/the_fumblers/heimdall/examples/worker_service/main.py b/the_fumblers/heimdall/examples/worker_service/main.py new file mode 100644 index 0000000..3f6d7e2 --- /dev/null +++ b/the_fumblers/heimdall/examples/worker_service/main.py @@ -0,0 +1,34 @@ +import time +import sys +import logging + +logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("DummyWorker") + +def main(): + logger.info("Worker started.") + job_id = 1 + + while True: + logger.info(f"Processing job #{job_id}...") + + # Simulate work + time.sleep(5) + + logger.info(f"Job #{job_id} completed successfully.") + job_id += 1 + + # Optionally simulate a failure randomly + if job_id % 10 == 0: + logger.warning(f"Job #{job_id} encountered a transient error! Retrying...") + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + logger.info("Worker shutting down gracefully.") + sys.exit(0)