Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
749 changes: 748 additions & 1 deletion crates/phantom-agent/src/lib.rs

Large diffs are not rendered by default.

178 changes: 150 additions & 28 deletions crates/phantom-capture/src/ldpreload.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
//! LD_PRELOAD capture backend — Linux only.
//!
//! Listens on a Unix datagram socket for [`TraceMsg`] JSON messages emitted
//! by the phantom-agent dylib injected into a target process, and converts
//! them into [`HttpTrace`] objects.
//! Listens on a Unix datagram socket for JSON messages emitted by the
//! phantom-agent dylib injected into a target process, and converts them into
//! [`HttpTrace`] or [`MysqlTrace`] objects.
//!
//! Messages are discriminated by the `msg_type` field:
//! - `"mysql"` → [`MysqlTrace`] emitted on the MySQL channel
//! - anything else (or absent) → [`HttpTrace`] emitted on the HTTP channel

use std::collections::HashMap;
use std::path::{Path, PathBuf};
Expand All @@ -12,13 +16,14 @@ use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as B64;
use phantom_core::capture::CaptureBackend;
use phantom_core::error::CaptureError;
use phantom_core::mysql::{MysqlResponseKind, MysqlStore, MysqlTrace};
use phantom_core::trace::{HttpMethod, HttpTrace, SpanId, TraceId};
use tokio::net::UnixDatagram;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, warn};

// ─────────────────────────────────────────────────────────────────────────────
// IPC message format (must match phantom-agent's TraceMsg)
// IPC message format — HTTP (must match phantom-agent's TraceMsg)
// ─────────────────────────────────────────────────────────────────────────────

#[derive(serde::Deserialize)]
Expand All @@ -37,6 +42,34 @@ struct AgentTrace {
protocol_version: Option<String>,
}

// ─────────────────────────────────────────────────────────────────────────────
// IPC message format — MySQL (must match phantom-agent's MysqlTraceMsg)
// ─────────────────────────────────────────────────────────────────────────────

#[derive(serde::Deserialize)]
struct AgentMysqlTrace {
query: String,
duration_ms: u64,
timestamp_ms: u64,
dest_addr: Option<String>,
db_name: Option<String>,
// OK fields
affected_rows: Option<u64>,
last_insert_id: Option<u64>,
warnings: Option<u16>,
// ResultSet fields
column_count: Option<u64>,
row_count: Option<u64>,
// ERR fields
error_code: Option<u16>,
sql_state: Option<String>,
error_message: Option<String>,
}

// ─────────────────────────────────────────────────────────────────────────────
// Conversion helpers
// ─────────────────────────────────────────────────────────────────────────────

fn parse_method(s: &str) -> HttpMethod {
match s.to_uppercase().as_str() {
"GET" => HttpMethod::Get,
Expand All @@ -62,15 +95,16 @@ fn rand_bytes<const N: usize>() -> [u8; N] {
buf
}

fn agent_trace_to_http_trace(a: AgentTrace) -> HttpTrace {
let timestamp = SystemTime::UNIX_EPOCH + Duration::from_millis(a.timestamp_ms);
// Guard against timestamps before UNIX_EPOCH (shouldn't happen but be safe).
let timestamp = if timestamp < UNIX_EPOCH {
fn ms_to_system_time(ms: u64) -> SystemTime {
let ts = SystemTime::UNIX_EPOCH + Duration::from_millis(ms);
if ts < UNIX_EPOCH {
SystemTime::now()
} else {
timestamp
};
ts
}
}

fn agent_trace_to_http_trace(a: AgentTrace) -> HttpTrace {
HttpTrace {
span_id: SpanId(rand_bytes::<8>()),
trace_id: TraceId(rand_bytes::<16>()),
Expand All @@ -82,14 +116,47 @@ fn agent_trace_to_http_trace(a: AgentTrace) -> HttpTrace {
status_code: a.status_code,
response_headers: a.response_headers,
response_body: decode_body(a.response_body_b64),
timestamp,
timestamp: ms_to_system_time(a.timestamp_ms),
duration: Duration::from_millis(a.duration_ms),
source_addr: None,
dest_addr: a.dest_addr,
protocol_version: a.protocol_version.unwrap_or_else(|| "HTTP/1.1".to_string()),
}
}

fn agent_mysql_trace_to_mysql_trace(a: AgentMysqlTrace) -> MysqlTrace {
let response = if let Some(code) = a.error_code {
MysqlResponseKind::Err {
error_code: code,
sql_state: a.sql_state.unwrap_or_default(),
message: a.error_message.unwrap_or_default(),
}
} else if a.column_count.is_some() {
MysqlResponseKind::ResultSet {
column_count: a.column_count.unwrap_or(0),
row_count: a.row_count.unwrap_or(0),
}
} else {
MysqlResponseKind::Ok {
affected_rows: a.affected_rows.unwrap_or(0),
last_insert_id: a.last_insert_id.unwrap_or(0),
warnings: a.warnings.unwrap_or(0),
}
};

MysqlTrace {
span_id: SpanId(rand_bytes::<8>()),
trace_id: TraceId(rand_bytes::<16>()),
parent_span_id: None,
query: a.query,
response,
timestamp: ms_to_system_time(a.timestamp_ms),
duration: Duration::from_millis(a.duration_ms),
dest_addr: a.dest_addr,
db_name: a.db_name,
}
}

// ─────────────────────────────────────────────────────────────────────────────
// LdPreloadCaptureBackend
// ─────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -117,17 +184,23 @@ impl LdPreloadCaptureBackend {
pub fn socket_path(&self) -> &Path {
&self.socket_path
}
}

impl CaptureBackend for LdPreloadCaptureBackend {
fn start(&mut self) -> Result<mpsc::Receiver<HttpTrace>, CaptureError> {
/// Start capturing, returning **both** an HTTP and a MySQL trace receiver.
///
/// This is the preferred entry point when running the LD_PRELOAD backend.
/// The [`CaptureBackend::start`] implementation calls this and discards the
/// MySQL receiver, preserving the trait contract for contexts that only need HTTP.
pub fn start_mysql_aware(
&mut self,
) -> Result<(mpsc::Receiver<HttpTrace>, mpsc::Receiver<MysqlTrace>), CaptureError> {
// Remove stale socket file if it exists.
let _ = std::fs::remove_file(&self.socket_path);

let socket = UnixDatagram::bind(&self.socket_path)
.map_err(|e| CaptureError::StartFailed(e.to_string()))?;

let (trace_tx, trace_rx) = mpsc::channel(4096);
let (http_tx, http_rx) = mpsc::channel::<HttpTrace>(4096);
let (mysql_tx, mysql_rx) = mpsc::channel::<MysqlTrace>(4096);
let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();

let task_handle = tokio::spawn(async move {
Expand All @@ -138,18 +211,11 @@ impl CaptureBackend for LdPreloadCaptureBackend {
result = socket.recv_from(&mut buf) => {
match result {
Ok((n, _from)) => {
match serde_json::from_slice::<AgentTrace>(&buf[..n]) {
Ok(agent_trace) => {
let trace = agent_trace_to_http_trace(agent_trace);
debug!(url = %trace.url, "captured via ldpreload");
if trace_tx.try_send(trace).is_err() {
warn!("ldpreload trace channel full, dropping");
}
}
Err(e) => {
warn!("ldpreload: failed to parse agent message: {e}");
}
}
dispatch_agent_message(
&buf[..n],
&http_tx,
&mysql_tx,
);
}
Err(e) => {
warn!("ldpreload socket recv error: {e}");
Expand All @@ -163,7 +229,54 @@ impl CaptureBackend for LdPreloadCaptureBackend {

self.shutdown_tx = Some(shutdown_tx);
self.task_handle = Some(task_handle);
Ok(trace_rx)
Ok((http_rx, mysql_rx))
}
}

/// Peek at the `msg_type` field and dispatch to the appropriate channel.
fn dispatch_agent_message(
data: &[u8],
http_tx: &mpsc::Sender<HttpTrace>,
mysql_tx: &mpsc::Sender<MysqlTrace>,
) {
// Deserialise into a generic Value to inspect msg_type without duplicating
// the full struct. For MySQL messages the overhead is negligible.
let Ok(val) = serde_json::from_slice::<serde_json::Value>(data) else {
warn!("ldpreload: failed to parse agent message as JSON");
return;
};

match val.get("msg_type").and_then(|v| v.as_str()) {
Some("mysql") => match serde_json::from_value::<AgentMysqlTrace>(val) {
Ok(agent) => {
let trace = agent_mysql_trace_to_mysql_trace(agent);
debug!(query = %trace.query, "mysql trace captured via ldpreload");
if mysql_tx.try_send(trace).is_err() {
warn!("ldpreload mysql trace channel full, dropping");
}
}
Err(e) => warn!("ldpreload: failed to parse mysql message: {e}"),
},
_ => {
// No msg_type or msg_type != "mysql" → treat as HTTP trace.
match serde_json::from_value::<AgentTrace>(val) {
Ok(agent) => {
let trace = agent_trace_to_http_trace(agent);
debug!(url = %trace.url, "captured via ldpreload");
if http_tx.try_send(trace).is_err() {
warn!("ldpreload trace channel full, dropping");
}
}
Err(e) => warn!("ldpreload: failed to parse http message: {e}"),
}
}
}
}

impl CaptureBackend for LdPreloadCaptureBackend {
fn start(&mut self) -> Result<mpsc::Receiver<HttpTrace>, CaptureError> {
let (http_rx, _mysql_rx) = self.start_mysql_aware()?;
Ok(http_rx)
}

fn stop(&mut self) -> Result<(), CaptureError> {
Expand All @@ -178,3 +291,12 @@ impl CaptureBackend for LdPreloadCaptureBackend {
"ldpreload"
}
}

// ─────────────────────────────────────────────────────────────────────────────
// Tests — MysqlStore trait is used here to keep the import active
// ─────────────────────────────────────────────────────────────────────────────

// Suppress unused import warning: MysqlStore is re-exported for use by main.rs
const _: fn() = || {
let _: Option<&dyn MysqlStore> = None;
};
1 change: 1 addition & 0 deletions crates/phantom-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod capture;
pub mod error;
pub mod mysql;
pub mod storage;
pub mod trace;
Loading