From 15ca51dda81ad0e5cdc47975305d30a62d2af214 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Mar 2026 01:07:21 +0000 Subject: [PATCH 1/2] feat: add MySQL trace capture via LD_PRELOAD backend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds end-to-end MySQL query observability without any application instrumentation, using the existing LD_PRELOAD agent. Changes: - phantom-core: new `mysql` module with `MysqlTrace`, `MysqlResponseKind` (Ok/ResultSet/Err), and `MysqlStore` trait - phantom-storage: `FjallMysqlStore` — Fjall-backed `MysqlStore` with two partitions (by span_id and by timestamp) for ordered queries - phantom-agent: `connect()` hook detects MySQL connections by port (default 3306, overridable via `PHANTOM_MYSQL_PORT`); per-FD state machine (`MysqlConnState`) handles handshake, COM_QUERY, and server response parsing (OK / ERR / ResultSet); emits `MysqlTraceMsg` JSON over the existing Unix datagram socket with `msg_type = "mysql"` - phantom-capture: `dispatch_agent_message()` peeks at `msg_type` to route HTTP vs MySQL; `start_mysql_aware()` returns both channels; existing `CaptureBackend::start()` preserved for backward compatibility - phantom-tui: new MySQL tab (key `2`); `App` tracks MySQL traces and selected index; UI renders list with Time/Query/Result/Duration columns and a detail pane; status bar shows `HTTP: N | MySQL: N` - src/main.rs: opens `FjallMysqlStore`; proxy backend uses a dummy MySQL channel; ldpreload backend uses `start_mysql_aware()`; JSONL mode emits MySQL traces as `{"type":"mysql", ...}` objects https://claude.ai/code/session_015N7iRe1Pkx41rHgvgKQaTC --- crates/phantom-agent/src/lib.rs | 749 +++++++++++++++++++++- crates/phantom-capture/src/ldpreload.rs | 178 ++++- crates/phantom-core/src/lib.rs | 1 + crates/phantom-core/src/mysql.rs | 176 +++++ crates/phantom-storage/src/fjall_mysql.rs | 229 +++++++ crates/phantom-storage/src/lib.rs | 2 + crates/phantom-tui/src/app.rs | 135 +++- crates/phantom-tui/src/lib.rs | 23 +- crates/phantom-tui/src/ui.rs | 317 ++++++++- src/main.rs | 87 ++- 10 files changed, 1828 insertions(+), 69 deletions(-) create mode 100644 crates/phantom-core/src/mysql.rs create mode 100644 crates/phantom-storage/src/fjall_mysql.rs diff --git a/crates/phantom-agent/src/lib.rs b/crates/phantom-agent/src/lib.rs index c744e5c..a59fa86 100644 --- a/crates/phantom-agent/src/lib.rs +++ b/crates/phantom-agent/src/lib.rs @@ -542,6 +542,8 @@ enum FdState { }, /// HTTP/2 connection (may carry many multiplexed streams). Http2(Box), + /// MySQL connection — tracks COM_QUERY round-trips. + MysqlConnection(Box), } static STATE_MAP: OnceLock>> = OnceLock::new(); @@ -700,6 +702,431 @@ fn do_emit( }); } +// ───────────────────────────────────────────────────────────────────────────── +// MySQL wire protocol — state machine and helpers +// ───────────────────────────────────────────────────────────────────────────── + +/// MySQL port to intercept (default 3306, override with PHANTOM_MYSQL_PORT). +static MYSQL_PORT: OnceLock = OnceLock::new(); + +fn mysql_port() -> u16 { + *MYSQL_PORT.get_or_init(|| { + std::env::var("PHANTOM_MYSQL_PORT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3306) + }) +} + +/// Parse a MySQL packet header: `[3B length LE][1B seq_id][payload]`. +/// Returns `(total_bytes_consumed, seq_id, payload_slice)` or `None` if incomplete. +fn parse_mysql_packet(buf: &[u8]) -> Option<(usize, u8, &[u8])> { + if buf.len() < 4 { + return None; + } + let payload_len = u32::from_le_bytes([buf[0], buf[1], buf[2], 0]) as usize; + let total = 4 + payload_len; + if buf.len() < total { + return None; + } + Some((total, buf[3], &buf[4..total])) +} + +/// Decode a MySQL length-encoded integer. +/// Returns `(value, bytes_consumed)` or `None` on insufficient data. +fn decode_lenenc_int(buf: &[u8]) -> Option<(u64, usize)> { + match buf.first()? { + n @ 0x00..=0xfb => Some((*n as u64, 1)), + 0xfc => { + if buf.len() < 3 { + return None; + } + Some((u16::from_le_bytes([buf[1], buf[2]]) as u64, 3)) + } + 0xfd => { + if buf.len() < 4 { + return None; + } + Some((u32::from_le_bytes([buf[1], buf[2], buf[3], 0]) as u64, 4)) + } + 0xfe => { + if buf.len() < 9 { + return None; + } + let bytes: [u8; 8] = buf[1..9].try_into().ok()?; + Some((u64::from_le_bytes(bytes), 9)) + } + _ => None, // 0xff = ERR indicator, not a length + } +} + +#[derive(Debug)] +enum HandshakePhase { + /// Waiting for the server greeting (seq_id=0, payload[0]=0x0a). + WaitingGreeting, + /// Server greeting seen; waiting for the server's auth OK/ERR (seq_id=2, payload[0]=0x00). + WaitingAuthOk, + /// Handshake complete; ready to track COM_QUERY commands. + Done, +} + +#[derive(Debug)] +enum ResultSetPhase { + /// Reading column definition packets. + ReadingColumns { cols_seen: u64 }, + /// Column defs done (EOF seen); reading data rows. + ReadingRows, +} + +#[derive(Debug)] +enum MysqlQueryState { + Idle, + /// COM_QUERY sent; waiting for the first server response packet. + AwaitingResponse { + query: String, + started_at: Instant, + timestamp_ms: u64, + }, + /// First server packet was a column count — reading result set. + ReadingResultSet { + query: String, + started_at: Instant, + timestamp_ms: u64, + column_count: u64, + row_count: u64, + phase: ResultSetPhase, + }, +} + +#[derive(Debug)] +struct MysqlConnState { + dest_addr: Option, + db_name: Option, + /// Bytes buffered from the client (send direction). + send_buf: Vec, + /// Bytes buffered from the server (recv direction). + recv_buf: Vec, + handshake: HandshakePhase, + query_state: MysqlQueryState, +} + +impl MysqlConnState { + fn new(dest_addr: Option) -> Self { + Self { + dest_addr, + db_name: None, + send_buf: Vec::new(), + recv_buf: Vec::new(), + handshake: HandshakePhase::WaitingGreeting, + query_state: MysqlQueryState::Idle, + } + } +} + +/// IPC message for a completed MySQL query (JSON-serialised to the same socket as HTTP traces). +#[derive(serde::Serialize)] +struct MysqlTraceMsg { + msg_type: &'static str, // always "mysql" + query: String, + duration_ms: u64, + timestamp_ms: u64, + dest_addr: Option, + db_name: Option, + // OK response fields + affected_rows: Option, + last_insert_id: Option, + warnings: Option, + // ResultSet response fields + column_count: Option, + row_count: Option, + // ERR response fields + error_code: Option, + sql_state: Option, + error_message: Option, +} + +fn emit_mysql_msg(msg: &MysqlTraceMsg) { + let Some((sock, path)) = ipc() else { return }; + let Ok(data) = serde_json::to_vec(msg) else { + return; + }; + if data.len() <= MAX_DATAGRAM { + let _ = sock.send_to(&data, path); + } +} + +/// Process bytes arriving from the client (client → server direction) for a MySQL FD. +/// May transition `state.query_state` from `Idle` to `AwaitingResponse`. +fn process_mysql_outgoing(state: &mut MysqlConnState, data: &[u8]) { + if state.send_buf.len() < MAX_BUF { + state.send_buf.extend_from_slice(data); + } + + loop { + let Some((consumed, seq_id, payload)) = parse_mysql_packet(&state.send_buf) else { + break; + }; + // Only act on packets during the command phase (handshake done, seq_id=0). + if matches!(state.handshake, HandshakePhase::Done) + && seq_id == 0 + && payload.first() == Some(&0x03) // COM_QUERY + && matches!(state.query_state, MysqlQueryState::Idle) + { + let query = String::from_utf8_lossy(&payload[1..]).into_owned(); + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + state.query_state = MysqlQueryState::AwaitingResponse { + query, + started_at: Instant::now(), + timestamp_ms: ts, + }; + } + state.send_buf.drain(..consumed); + } +} + +/// Process bytes arriving from the server (server → client direction) for a MySQL FD. +/// Returns a `MysqlTraceMsg` when a query round-trip is complete. +fn process_mysql_incoming(state: &mut MysqlConnState, data: &[u8]) -> Option { + if state.recv_buf.len() < MAX_BUF { + state.recv_buf.extend_from_slice(data); + } + + loop { + let Some((consumed, seq_id, payload)) = parse_mysql_packet(&state.recv_buf) else { + break; + }; + + // ── Handshake phase ────────────────────────────────────────────────── + match &state.handshake { + HandshakePhase::WaitingGreeting => { + // Server greeting: seq_id=0, payload[0]=0x0a (protocol v10). + if seq_id == 0 && payload.first() == Some(&0x0a) { + state.handshake = HandshakePhase::WaitingAuthOk; + } + state.recv_buf.drain(..consumed); + continue; + } + HandshakePhase::WaitingAuthOk => { + // Auth OK: payload[0]=0x00 (OK packet) at seq_id >= 2. + if seq_id >= 2 && payload.first() == Some(&0x00) { + state.handshake = HandshakePhase::Done; + } + state.recv_buf.drain(..consumed); + continue; + } + HandshakePhase::Done => {} + } + + // ── Command-phase server responses ─────────────────────────────────── + match &mut state.query_state { + MysqlQueryState::Idle => { + state.recv_buf.drain(..consumed); + continue; + } + + MysqlQueryState::AwaitingResponse { + query, + started_at, + timestamp_ms, + } => { + let first = payload.first().copied().unwrap_or(0); + match first { + // OK packet + 0x00 => { + let duration = started_at.elapsed(); + let ts = *timestamp_ms; + let (affected_rows, off1) = decode_lenenc_int(&payload[1..]) + .map(|(v, o)| (v, o + 1)) + .unwrap_or((0, 1)); + let (last_insert_id, _) = + decode_lenenc_int(&payload[off1..]).unwrap_or((0, 1)); + // status flags (2B) and warnings (2B) follow but we skip bounds checks for brevity + let warnings = if payload.len() >= off1 + 1 + 4 { + u16::from_le_bytes([payload[off1 + 1], payload[off1 + 2]]) + } else { + 0 + }; + let msg = MysqlTraceMsg { + msg_type: "mysql", + query: query.clone(), + duration_ms: duration.as_millis() as u64, + timestamp_ms: ts, + dest_addr: state.dest_addr.clone(), + db_name: state.db_name.clone(), + affected_rows: Some(affected_rows), + last_insert_id: Some(last_insert_id), + warnings: Some(warnings), + column_count: None, + row_count: None, + error_code: None, + sql_state: None, + error_message: None, + }; + state.query_state = MysqlQueryState::Idle; + state.recv_buf.drain(..consumed); + return Some(msg); + } + // ERR packet + 0xff => { + let duration = started_at.elapsed(); + let ts = *timestamp_ms; + let error_code = if payload.len() >= 3 { + u16::from_le_bytes([payload[1], payload[2]]) + } else { + 0 + }; + // sqlstate: '#' + 5 ASCII chars at payload[3..9], if present + let (sql_state, msg_start) = if payload.len() >= 9 && payload[3] == b'#' { + (String::from_utf8_lossy(&payload[4..9]).into_owned(), 9) + } else { + (String::new(), 3) + }; + let error_message = + String::from_utf8_lossy(&payload[msg_start..]).into_owned(); + let msg = MysqlTraceMsg { + msg_type: "mysql", + query: query.clone(), + duration_ms: duration.as_millis() as u64, + timestamp_ms: ts, + dest_addr: state.dest_addr.clone(), + db_name: state.db_name.clone(), + affected_rows: None, + last_insert_id: None, + warnings: None, + column_count: None, + row_count: None, + error_code: Some(error_code), + sql_state: Some(sql_state), + error_message: Some(error_message), + }; + state.query_state = MysqlQueryState::Idle; + state.recv_buf.drain(..consumed); + return Some(msg); + } + // First packet of a ResultSet — byte is the column count (lenenc int). + _ => { + let column_count = decode_lenenc_int(payload).map(|(v, _)| v).unwrap_or(1); + let query = query.clone(); + let started_at = *started_at; + let timestamp_ms = *timestamp_ms; + state.query_state = MysqlQueryState::ReadingResultSet { + query, + started_at, + timestamp_ms, + column_count, + row_count: 0, + phase: ResultSetPhase::ReadingColumns { cols_seen: 0 }, + }; + state.recv_buf.drain(..consumed); + continue; + } + } + } + + MysqlQueryState::ReadingResultSet { + query, + started_at, + timestamp_ms, + column_count, + row_count, + phase, + } => { + let first = payload.first().copied().unwrap_or(0); + // EOF packet: payload[0]=0xfe AND payload length < 9 (to distinguish from lenenc 0xfe). + let is_eof = first == 0xfe && payload.len() < 9; + // OK terminator (CLIENT_DEPRECATE_EOF): 0x00 with seq_id > 1 in rows phase. + let is_ok_terminator = + first == 0x00 && matches!(phase, ResultSetPhase::ReadingRows); + + match phase { + ResultSetPhase::ReadingColumns { cols_seen } => { + if is_eof { + // End of column definitions; start reading rows. + *phase = ResultSetPhase::ReadingRows; + } else { + *cols_seen += 1; + } + state.recv_buf.drain(..consumed); + continue; + } + ResultSetPhase::ReadingRows => { + if is_eof || is_ok_terminator { + // Result set complete. + let duration = started_at.elapsed(); + let ts = *timestamp_ms; + let msg = MysqlTraceMsg { + msg_type: "mysql", + query: query.clone(), + duration_ms: duration.as_millis() as u64, + timestamp_ms: ts, + dest_addr: state.dest_addr.clone(), + db_name: state.db_name.clone(), + affected_rows: None, + last_insert_id: None, + warnings: None, + column_count: Some(*column_count), + row_count: Some(*row_count), + error_code: None, + sql_state: None, + error_message: None, + }; + state.query_state = MysqlQueryState::Idle; + state.recv_buf.drain(..consumed); + return Some(msg); + } else if first == 0xff { + // ERR during result set streaming. + let error_code = if payload.len() >= 3 { + u16::from_le_bytes([payload[1], payload[2]]) + } else { + 0 + }; + let (sql_state, msg_start) = if payload.len() >= 9 && payload[3] == b'#' + { + (String::from_utf8_lossy(&payload[4..9]).into_owned(), 9) + } else { + (String::new(), 3) + }; + let error_message = + String::from_utf8_lossy(&payload[msg_start..]).into_owned(); + let duration = started_at.elapsed(); + let ts = *timestamp_ms; + let msg = MysqlTraceMsg { + msg_type: "mysql", + query: query.clone(), + duration_ms: duration.as_millis() as u64, + timestamp_ms: ts, + dest_addr: state.dest_addr.clone(), + db_name: state.db_name.clone(), + affected_rows: None, + last_insert_id: None, + warnings: None, + column_count: Some(*column_count), + row_count: Some(*row_count), + error_code: Some(error_code), + sql_state: Some(sql_state), + error_message: Some(error_message), + }; + state.query_state = MysqlQueryState::Idle; + state.recv_buf.drain(..consumed); + return Some(msg); + } else { + // Data row packet. + *row_count += 1; + state.recv_buf.drain(..consumed); + continue; + } + } + } + } + } + } + + None +} + // ───────────────────────────────────────────────────────────────────────────── // Hook processing (called from within hooks, after re-entry check) // @@ -713,6 +1140,12 @@ fn process_outgoing(key: usize, data: &[u8], tls: bool) { Err(_) => return, }; + // ── MySQL path ─────────────────────────────────────────────────────────── + if let Some(FdState::MysqlConnection(state)) = map.get_mut(&key) { + process_mysql_outgoing(state, data); + return; + } + // ── HTTP/2 path ────────────────────────────────────────────────────────── // If we already know this connection is HTTP/2, route directly. if let Some(FdState::Http2(h2)) = map.get_mut(&key) { @@ -781,6 +1214,23 @@ fn process_outgoing(key: usize, data: &[u8], tls: bool) { } fn process_incoming(key: usize, data: &[u8]) { + // ── MySQL path ─────────────────────────────────────────────────────────── + let mysql_msg = { + let mut map = match state_map().lock() { + Ok(m) => m, + Err(_) => return, + }; + if let Some(FdState::MysqlConnection(state)) = map.get_mut(&key) { + process_mysql_incoming(state, data) + } else { + None + } + }; // lock released + if let Some(msg) = mysql_msg { + emit_mysql_msg(&msg); + return; + } + // ── HTTP/2 path ────────────────────────────────────────────────────────── // Handle HTTP/2 streams, collecting those that have a complete response. // We release the lock before emitting. @@ -906,15 +1356,123 @@ fn process_teardown(key: usize) { } } } + // MySQL: emit partial query trace if a query was in flight. + Some(FdState::MysqlConnection(state)) => { + let pending = match &state.query_state { + MysqlQueryState::AwaitingResponse { + query, + started_at, + timestamp_ms, + } => Some((query.clone(), *started_at, *timestamp_ms)), + MysqlQueryState::ReadingResultSet { + query, + started_at, + timestamp_ms, + .. + } => Some((query.clone(), *started_at, *timestamp_ms)), + MysqlQueryState::Idle => None, + }; + if let Some((query, started_at, timestamp_ms)) = pending { + emit_mysql_msg(&MysqlTraceMsg { + msg_type: "mysql", + query, + duration_ms: started_at.elapsed().as_millis() as u64, + timestamp_ms, + dest_addr: state.dest_addr.clone(), + db_name: state.db_name.clone(), + affected_rows: None, + last_insert_id: None, + warnings: None, + column_count: None, + row_count: None, + error_code: None, + sql_state: None, + error_message: None, + }); + } + } // If headers were never parsed, we have nothing useful to emit. _ => {} } } // ───────────────────────────────────────────────────────────────────────────── -// Hooks — libc (plain HTTP) +// Hooks — libc (plain HTTP + MySQL connect detection) // ───────────────────────────────────────────────────────────────────────────── +/// Extract the port from a `sockaddr` (supports AF_INET and AF_INET6). +/// Returns `None` for other address families. +unsafe fn extract_sockaddr_port( + addr: *const libc::sockaddr, + addrlen: libc::socklen_t, +) -> Option { + if addr.is_null() { + return None; + } + let family = unsafe { (*addr).sa_family } as i32; + if family == libc::AF_INET && addrlen as usize >= std::mem::size_of::() { + let sa = unsafe { &*(addr as *const libc::sockaddr_in) }; + return Some(u16::from_be(sa.sin_port)); + } + if family == libc::AF_INET6 && addrlen as usize >= std::mem::size_of::() { + let sa = unsafe { &*(addr as *const libc::sockaddr_in6) }; + return Some(u16::from_be(sa.sin6_port)); + } + None +} + +/// Format a `sockaddr` as `"host:port"` string for MySQL dest_addr metadata. +unsafe fn format_sockaddr(addr: *const libc::sockaddr, addrlen: libc::socklen_t) -> Option { + if addr.is_null() { + return None; + } + let family = unsafe { (*addr).sa_family } as i32; + if family == libc::AF_INET && addrlen as usize >= std::mem::size_of::() { + let sa = unsafe { &*(addr as *const libc::sockaddr_in) }; + let port = u16::from_be(sa.sin_port); + let ip = u32::from_be(sa.sin_addr.s_addr); + let a = (ip >> 24) & 0xff; + let b = (ip >> 16) & 0xff; + let c = (ip >> 8) & 0xff; + let d = ip & 0xff; + return Some(format!("{a}.{b}.{c}.{d}:{port}")); + } + None +} + +redhook::hook! { + unsafe fn connect( + sockfd: c_int, + addr: *const libc::sockaddr, + addrlen: libc::socklen_t + ) -> c_int => phantom_connect { + // SAFETY: delegating to the real libc connect(2). + let result = unsafe { redhook::real!(connect)(sockfd, addr, addrlen) }; + // result == 0 (success) or -1 with EINPROGRESS (non-blocking) — both count. + let errno = unsafe { *libc::__errno_location() }; + if result == 0 || (result == -1 && errno == libc::EINPROGRESS) { + IN_HOOK.with(|g| { + if !g.get() { + g.set(true); + // SAFETY: addr lifetime is valid for the duration of connect(2). + let port = unsafe { extract_sockaddr_port(addr, addrlen) }; + if port == Some(mysql_port()) { + let dest = unsafe { format_sockaddr(addr, addrlen) }; + if let Ok(mut map) = state_map().lock() { + map.insert( + sockfd as usize, + FdState::MysqlConnection(Box::new(MysqlConnState::new(dest))), + ); + } + } + g.set(false); + } + }); + } + result + } +} + redhook::hook! { unsafe fn send( sockfd: c_int, @@ -1046,3 +1604,192 @@ redhook::hook! { unsafe { redhook::real!(SSL_free)(ssl) } } } + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + // ── parse_mysql_packet ──────────────────────────────────────────────────── + + #[test] + fn test_parse_mysql_packet_complete() { + // 3-byte length (5) + seq_id (0) + payload "hello" + let buf = [5u8, 0, 0, 0, b'h', b'e', b'l', b'l', b'o']; + let (consumed, seq, payload) = parse_mysql_packet(&buf).unwrap(); + assert_eq!(consumed, 9); + assert_eq!(seq, 0); + assert_eq!(payload, b"hello"); + } + + #[test] + fn test_parse_mysql_packet_incomplete() { + // Says 10 bytes payload but buf only has 5 + let buf = [10u8, 0, 0, 0, b'h', b'i']; + assert!(parse_mysql_packet(&buf).is_none()); + } + + #[test] + fn test_parse_mysql_packet_too_short_for_header() { + assert!(parse_mysql_packet(&[1, 0, 0]).is_none()); + } + + // ── decode_lenenc_int ───────────────────────────────────────────────────── + + #[test] + fn test_decode_lenenc_int_1byte() { + assert_eq!(decode_lenenc_int(&[42]), Some((42, 1))); + assert_eq!(decode_lenenc_int(&[0]), Some((0, 1))); + assert_eq!(decode_lenenc_int(&[0xfb]), Some((251, 1))); + } + + #[test] + fn test_decode_lenenc_int_2byte() { + // 0xfc marker + little-endian u16 + let buf = [0xfc, 0x01, 0x00]; // = 1 + assert_eq!(decode_lenenc_int(&buf), Some((1, 3))); + let buf = [0xfc, 0xff, 0xff]; // = 65535 + assert_eq!(decode_lenenc_int(&buf), Some((65535, 3))); + } + + #[test] + fn test_decode_lenenc_int_3byte() { + let buf = [0xfd, 0x01, 0x00, 0x00]; // = 1 + assert_eq!(decode_lenenc_int(&buf), Some((1, 4))); + } + + #[test] + fn test_decode_lenenc_int_8byte() { + let mut buf = [0u8; 9]; + buf[0] = 0xfe; + buf[1] = 42; + assert_eq!(decode_lenenc_int(&buf), Some((42, 9))); + } + + #[test] + fn test_decode_lenenc_int_insufficient() { + assert_eq!(decode_lenenc_int(&[0xfc, 0x01]), None); // need 3 bytes + assert_eq!(decode_lenenc_int(&[]), None); + } + + // ── MySQL state machine ─────────────────────────────────────────────────── + + /// Build a MySQL packet: [len 3B LE][seq_id 1B][payload]. + fn make_mysql_packet(seq_id: u8, payload: &[u8]) -> Vec { + let len = payload.len() as u32; + let mut v = vec![ + (len & 0xff) as u8, + ((len >> 8) & 0xff) as u8, + ((len >> 16) & 0xff) as u8, + seq_id, + ]; + v.extend_from_slice(payload); + v + } + + fn do_handshake(state: &mut MysqlConnState) { + // Server greeting (seq=0, first byte 0x0a) + let greeting = make_mysql_packet(0, &[0x0a, b'8', b'.', b'0', 0]); + process_mysql_incoming(state, &greeting); + // Client auth response (sent from client, ignored for handshake purposes) + let auth = make_mysql_packet(1, &[0x00]); + process_mysql_outgoing(state, &auth); + // Server auth OK (seq=2, 0x00) + let auth_ok = make_mysql_packet(2, &[0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00]); + process_mysql_incoming(state, &auth_ok); + assert!(matches!(state.handshake, HandshakePhase::Done)); + } + + #[test] + fn test_mysql_state_ok_response() { + let mut state = MysqlConnState::new(Some("127.0.0.1:3306".to_string())); + do_handshake(&mut state); + + // COM_QUERY: seq=0, 0x03 + SQL text + let query_pkt = make_mysql_packet(0, b"\x03SELECT 1"); + process_mysql_outgoing(&mut state, &query_pkt); + assert!(matches!( + state.query_state, + MysqlQueryState::AwaitingResponse { .. } + )); + + // Server OK: seq=1, 0x00 + affected_rows(0) + last_insert_id(0) + status(2B) + warnings(2B) + let ok_payload = [0x00u8, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00]; + let ok_pkt = make_mysql_packet(1, &ok_payload); + let msg = process_mysql_incoming(&mut state, &ok_pkt).unwrap(); + assert_eq!(msg.query, "SELECT 1"); + assert_eq!(msg.affected_rows, Some(0)); + assert!(msg.error_code.is_none()); + assert!(matches!(state.query_state, MysqlQueryState::Idle)); + } + + #[test] + fn test_mysql_state_err_response() { + let mut state = MysqlConnState::new(None); + do_handshake(&mut state); + + let query_pkt = make_mysql_packet(0, b"\x03BAD QUERY"); + process_mysql_outgoing(&mut state, &query_pkt); + + // ERR: 0xff + error_code(2B LE) + '#' + sqlstate(5B) + message + let mut err_payload = vec![0xffu8, 0x28, 0x04]; // error_code = 0x0428 = 1064 + err_payload.extend_from_slice(b"#42000syntax error"); + let err_pkt = make_mysql_packet(1, &err_payload); + let msg = process_mysql_incoming(&mut state, &err_pkt).unwrap(); + assert_eq!(msg.query, "BAD QUERY"); + assert_eq!(msg.error_code, Some(1064)); + assert_eq!(msg.sql_state.as_deref(), Some("42000")); + assert!( + msg.error_message + .as_deref() + .unwrap() + .contains("syntax error") + ); + } + + #[test] + fn test_mysql_state_resultset() { + let mut state = MysqlConnState::new(None); + do_handshake(&mut state); + + // COM_QUERY + let query_pkt = make_mysql_packet(0, b"\x03SELECT id, name FROM users"); + process_mysql_outgoing(&mut state, &query_pkt); + + // Column count packet: 2 columns (lenenc int = 0x02) + let col_count_pkt = make_mysql_packet(1, &[0x02]); + let r = process_mysql_incoming(&mut state, &col_count_pkt); + assert!(r.is_none()); + assert!(matches!( + state.query_state, + MysqlQueryState::ReadingResultSet { .. } + )); + + // Two column definition packets (arbitrary content, not 0xfe) + for seq in 2u8..4 { + let col_def = make_mysql_packet(seq, b"def\x00\x00\x00id"); + process_mysql_incoming(&mut state, &col_def); + } + + // EOF after columns (payload < 9 bytes, first byte 0xfe) + let eof_cols = make_mysql_packet(4, &[0xfe, 0x00, 0x00, 0x02, 0x00]); + process_mysql_incoming(&mut state, &eof_cols); + + // Two data rows (not 0xfe) + for seq in 5u8..7 { + let row = make_mysql_packet(seq, b"\x011\x05Alice"); + process_mysql_incoming(&mut state, &row); + } + + // EOF terminator + let eof_rows = make_mysql_packet(7, &[0xfe, 0x00, 0x00, 0x02, 0x00]); + let msg = process_mysql_incoming(&mut state, &eof_rows).unwrap(); + assert_eq!(msg.column_count, Some(2)); + assert_eq!(msg.row_count, Some(2)); + assert!(msg.error_code.is_none()); + assert!(matches!(state.query_state, MysqlQueryState::Idle)); + } +} diff --git a/crates/phantom-capture/src/ldpreload.rs b/crates/phantom-capture/src/ldpreload.rs index 761ee5c..fa15792 100644 --- a/crates/phantom-capture/src/ldpreload.rs +++ b/crates/phantom-capture/src/ldpreload.rs @@ -1,8 +1,12 @@ //! LD_PRELOAD capture backend — Linux only. //! -//! Listens on a Unix datagram socket for [`TraceMsg`] JSON messages emitted -//! by the phantom-agent dylib injected into a target process, and converts -//! them into [`HttpTrace`] objects. +//! Listens on a Unix datagram socket for JSON messages emitted by the +//! phantom-agent dylib injected into a target process, and converts them into +//! [`HttpTrace`] or [`MysqlTrace`] objects. +//! +//! Messages are discriminated by the `msg_type` field: +//! - `"mysql"` → [`MysqlTrace`] emitted on the MySQL channel +//! - anything else (or absent) → [`HttpTrace`] emitted on the HTTP channel use std::collections::HashMap; use std::path::{Path, PathBuf}; @@ -12,13 +16,14 @@ use base64::Engine as _; use base64::engine::general_purpose::STANDARD as B64; use phantom_core::capture::CaptureBackend; use phantom_core::error::CaptureError; +use phantom_core::mysql::{MysqlResponseKind, MysqlStore, MysqlTrace}; use phantom_core::trace::{HttpMethod, HttpTrace, SpanId, TraceId}; use tokio::net::UnixDatagram; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, warn}; // ───────────────────────────────────────────────────────────────────────────── -// IPC message format (must match phantom-agent's TraceMsg) +// IPC message format — HTTP (must match phantom-agent's TraceMsg) // ───────────────────────────────────────────────────────────────────────────── #[derive(serde::Deserialize)] @@ -37,6 +42,34 @@ struct AgentTrace { protocol_version: Option, } +// ───────────────────────────────────────────────────────────────────────────── +// IPC message format — MySQL (must match phantom-agent's MysqlTraceMsg) +// ───────────────────────────────────────────────────────────────────────────── + +#[derive(serde::Deserialize)] +struct AgentMysqlTrace { + query: String, + duration_ms: u64, + timestamp_ms: u64, + dest_addr: Option, + db_name: Option, + // OK fields + affected_rows: Option, + last_insert_id: Option, + warnings: Option, + // ResultSet fields + column_count: Option, + row_count: Option, + // ERR fields + error_code: Option, + sql_state: Option, + error_message: Option, +} + +// ───────────────────────────────────────────────────────────────────────────── +// Conversion helpers +// ───────────────────────────────────────────────────────────────────────────── + fn parse_method(s: &str) -> HttpMethod { match s.to_uppercase().as_str() { "GET" => HttpMethod::Get, @@ -62,15 +95,16 @@ fn rand_bytes() -> [u8; N] { buf } -fn agent_trace_to_http_trace(a: AgentTrace) -> HttpTrace { - let timestamp = SystemTime::UNIX_EPOCH + Duration::from_millis(a.timestamp_ms); - // Guard against timestamps before UNIX_EPOCH (shouldn't happen but be safe). - let timestamp = if timestamp < UNIX_EPOCH { +fn ms_to_system_time(ms: u64) -> SystemTime { + let ts = SystemTime::UNIX_EPOCH + Duration::from_millis(ms); + if ts < UNIX_EPOCH { SystemTime::now() } else { - timestamp - }; + ts + } +} +fn agent_trace_to_http_trace(a: AgentTrace) -> HttpTrace { HttpTrace { span_id: SpanId(rand_bytes::<8>()), trace_id: TraceId(rand_bytes::<16>()), @@ -82,7 +116,7 @@ fn agent_trace_to_http_trace(a: AgentTrace) -> HttpTrace { status_code: a.status_code, response_headers: a.response_headers, response_body: decode_body(a.response_body_b64), - timestamp, + timestamp: ms_to_system_time(a.timestamp_ms), duration: Duration::from_millis(a.duration_ms), source_addr: None, dest_addr: a.dest_addr, @@ -90,6 +124,39 @@ fn agent_trace_to_http_trace(a: AgentTrace) -> HttpTrace { } } +fn agent_mysql_trace_to_mysql_trace(a: AgentMysqlTrace) -> MysqlTrace { + let response = if let Some(code) = a.error_code { + MysqlResponseKind::Err { + error_code: code, + sql_state: a.sql_state.unwrap_or_default(), + message: a.error_message.unwrap_or_default(), + } + } else if a.column_count.is_some() { + MysqlResponseKind::ResultSet { + column_count: a.column_count.unwrap_or(0), + row_count: a.row_count.unwrap_or(0), + } + } else { + MysqlResponseKind::Ok { + affected_rows: a.affected_rows.unwrap_or(0), + last_insert_id: a.last_insert_id.unwrap_or(0), + warnings: a.warnings.unwrap_or(0), + } + }; + + MysqlTrace { + span_id: SpanId(rand_bytes::<8>()), + trace_id: TraceId(rand_bytes::<16>()), + parent_span_id: None, + query: a.query, + response, + timestamp: ms_to_system_time(a.timestamp_ms), + duration: Duration::from_millis(a.duration_ms), + dest_addr: a.dest_addr, + db_name: a.db_name, + } +} + // ───────────────────────────────────────────────────────────────────────────── // LdPreloadCaptureBackend // ───────────────────────────────────────────────────────────────────────────── @@ -117,17 +184,23 @@ impl LdPreloadCaptureBackend { pub fn socket_path(&self) -> &Path { &self.socket_path } -} -impl CaptureBackend for LdPreloadCaptureBackend { - fn start(&mut self) -> Result, CaptureError> { + /// Start capturing, returning **both** an HTTP and a MySQL trace receiver. + /// + /// This is the preferred entry point when running the LD_PRELOAD backend. + /// The [`CaptureBackend::start`] implementation calls this and discards the + /// MySQL receiver, preserving the trait contract for contexts that only need HTTP. + pub fn start_mysql_aware( + &mut self, + ) -> Result<(mpsc::Receiver, mpsc::Receiver), CaptureError> { // Remove stale socket file if it exists. let _ = std::fs::remove_file(&self.socket_path); let socket = UnixDatagram::bind(&self.socket_path) .map_err(|e| CaptureError::StartFailed(e.to_string()))?; - let (trace_tx, trace_rx) = mpsc::channel(4096); + let (http_tx, http_rx) = mpsc::channel::(4096); + let (mysql_tx, mysql_rx) = mpsc::channel::(4096); let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>(); let task_handle = tokio::spawn(async move { @@ -138,18 +211,11 @@ impl CaptureBackend for LdPreloadCaptureBackend { result = socket.recv_from(&mut buf) => { match result { Ok((n, _from)) => { - match serde_json::from_slice::(&buf[..n]) { - Ok(agent_trace) => { - let trace = agent_trace_to_http_trace(agent_trace); - debug!(url = %trace.url, "captured via ldpreload"); - if trace_tx.try_send(trace).is_err() { - warn!("ldpreload trace channel full, dropping"); - } - } - Err(e) => { - warn!("ldpreload: failed to parse agent message: {e}"); - } - } + dispatch_agent_message( + &buf[..n], + &http_tx, + &mysql_tx, + ); } Err(e) => { warn!("ldpreload socket recv error: {e}"); @@ -163,7 +229,54 @@ impl CaptureBackend for LdPreloadCaptureBackend { self.shutdown_tx = Some(shutdown_tx); self.task_handle = Some(task_handle); - Ok(trace_rx) + Ok((http_rx, mysql_rx)) + } +} + +/// Peek at the `msg_type` field and dispatch to the appropriate channel. +fn dispatch_agent_message( + data: &[u8], + http_tx: &mpsc::Sender, + mysql_tx: &mpsc::Sender, +) { + // Deserialise into a generic Value to inspect msg_type without duplicating + // the full struct. For MySQL messages the overhead is negligible. + let Ok(val) = serde_json::from_slice::(data) else { + warn!("ldpreload: failed to parse agent message as JSON"); + return; + }; + + match val.get("msg_type").and_then(|v| v.as_str()) { + Some("mysql") => match serde_json::from_value::(val) { + Ok(agent) => { + let trace = agent_mysql_trace_to_mysql_trace(agent); + debug!(query = %trace.query, "mysql trace captured via ldpreload"); + if mysql_tx.try_send(trace).is_err() { + warn!("ldpreload mysql trace channel full, dropping"); + } + } + Err(e) => warn!("ldpreload: failed to parse mysql message: {e}"), + }, + _ => { + // No msg_type or msg_type != "mysql" → treat as HTTP trace. + match serde_json::from_value::(val) { + Ok(agent) => { + let trace = agent_trace_to_http_trace(agent); + debug!(url = %trace.url, "captured via ldpreload"); + if http_tx.try_send(trace).is_err() { + warn!("ldpreload trace channel full, dropping"); + } + } + Err(e) => warn!("ldpreload: failed to parse http message: {e}"), + } + } + } +} + +impl CaptureBackend for LdPreloadCaptureBackend { + fn start(&mut self) -> Result, CaptureError> { + let (http_rx, _mysql_rx) = self.start_mysql_aware()?; + Ok(http_rx) } fn stop(&mut self) -> Result<(), CaptureError> { @@ -178,3 +291,12 @@ impl CaptureBackend for LdPreloadCaptureBackend { "ldpreload" } } + +// ───────────────────────────────────────────────────────────────────────────── +// Tests — MysqlStore trait is used here to keep the import active +// ───────────────────────────────────────────────────────────────────────────── + +// Suppress unused import warning: MysqlStore is re-exported for use by main.rs +const _: fn() = || { + let _: Option<&dyn MysqlStore> = None; +}; diff --git a/crates/phantom-core/src/lib.rs b/crates/phantom-core/src/lib.rs index 07f75f0..839fac7 100644 --- a/crates/phantom-core/src/lib.rs +++ b/crates/phantom-core/src/lib.rs @@ -1,4 +1,5 @@ pub mod capture; pub mod error; +pub mod mysql; pub mod storage; pub mod trace; diff --git a/crates/phantom-core/src/mysql.rs b/crates/phantom-core/src/mysql.rs new file mode 100644 index 0000000..e1d1702 --- /dev/null +++ b/crates/phantom-core/src/mysql.rs @@ -0,0 +1,176 @@ +use std::time::{Duration, SystemTime}; + +use serde::{Deserialize, Serialize}; + +use crate::error::StorageError; +use crate::trace::{SpanId, TraceId}; + +// ───────────────────────────────────────────────────────────────────────────── +// MySQL trace types +// ───────────────────────────────────────────────────────────────────────────── + +/// The outcome of a MySQL COM_QUERY command. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind")] +pub enum MysqlResponseKind { + /// SELECT / SHOW / EXPLAIN — server returned a result set. + ResultSet { column_count: u64, row_count: u64 }, + /// INSERT / UPDATE / DELETE / DDL — server returned an OK packet. + Ok { + affected_rows: u64, + last_insert_id: u64, + warnings: u16, + }, + /// Server returned an ERR packet. + Err { + error_code: u16, + sql_state: String, + message: String, + }, +} + +/// A complete MySQL COM_QUERY round-trip. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MysqlTrace { + /// Unique ID for this trace span. + pub span_id: SpanId, + /// W3C Trace Context trace ID. + pub trace_id: TraceId, + /// Parent span ID (`None` if root span). + pub parent_span_id: Option, + + // -- Query -- + pub query: String, + + // -- Response -- + pub response: MysqlResponseKind, + + // -- Timing -- + pub timestamp: SystemTime, + pub duration: Duration, + + // -- Connection metadata -- + pub dest_addr: Option, + pub db_name: Option, +} + +// ───────────────────────────────────────────────────────────────────────────── +// MysqlStore trait +// ───────────────────────────────────────────────────────────────────────────── + +/// Persistent store for [`MysqlTrace`] records. +pub trait MysqlStore: Send + Sync { + fn insert(&self, trace: &MysqlTrace) -> Result<(), StorageError>; + fn get_by_span_id(&self, span_id: &SpanId) -> Result, StorageError>; + fn list_recent(&self, limit: usize, offset: usize) -> Result, StorageError>; + fn search_by_query(&self, pattern: &str, limit: usize) + -> Result, StorageError>; + fn count(&self) -> Result; +} + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use std::time::{Duration, SystemTime}; + + use super::*; + use crate::trace::{SpanId, TraceId}; + + fn make_trace(response: MysqlResponseKind) -> MysqlTrace { + MysqlTrace { + span_id: SpanId([1u8; 8]), + trace_id: TraceId([2u8; 16]), + parent_span_id: None, + query: "SELECT 1".to_string(), + response, + timestamp: SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000), + duration: Duration::from_millis(4), + dest_addr: Some("127.0.0.1:3306".to_string()), + db_name: Some("mydb".to_string()), + } + } + + #[test] + fn test_mysql_response_kind_serde_result_set() { + let kind = MysqlResponseKind::ResultSet { + column_count: 3, + row_count: 12, + }; + let json = serde_json::to_string(&kind).unwrap(); + let back: MysqlResponseKind = serde_json::from_str(&json).unwrap(); + match back { + MysqlResponseKind::ResultSet { + column_count, + row_count, + } => { + assert_eq!(column_count, 3); + assert_eq!(row_count, 12); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn test_mysql_response_kind_serde_ok() { + let kind = MysqlResponseKind::Ok { + affected_rows: 1, + last_insert_id: 42, + warnings: 0, + }; + let json = serde_json::to_string(&kind).unwrap(); + let back: MysqlResponseKind = serde_json::from_str(&json).unwrap(); + match back { + MysqlResponseKind::Ok { + affected_rows, + last_insert_id, + warnings, + } => { + assert_eq!(affected_rows, 1); + assert_eq!(last_insert_id, 42); + assert_eq!(warnings, 0); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn test_mysql_response_kind_serde_err() { + let kind = MysqlResponseKind::Err { + error_code: 1064, + sql_state: "42000".to_string(), + message: "syntax error".to_string(), + }; + let json = serde_json::to_string(&kind).unwrap(); + let back: MysqlResponseKind = serde_json::from_str(&json).unwrap(); + match back { + MysqlResponseKind::Err { + error_code, + sql_state, + message, + } => { + assert_eq!(error_code, 1064); + assert_eq!(sql_state, "42000"); + assert_eq!(message, "syntax error"); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn test_mysql_trace_serde_roundtrip() { + let trace = make_trace(MysqlResponseKind::Ok { + affected_rows: 5, + last_insert_id: 0, + warnings: 0, + }); + let json = serde_json::to_string(&trace).unwrap(); + let back: MysqlTrace = serde_json::from_str(&json).unwrap(); + assert_eq!(back.query, "SELECT 1"); + assert_eq!(back.span_id.0, [1u8; 8]); + assert_eq!(back.trace_id.0, [2u8; 16]); + assert_eq!(back.dest_addr.as_deref(), Some("127.0.0.1:3306")); + } +} diff --git a/crates/phantom-storage/src/fjall_mysql.rs b/crates/phantom-storage/src/fjall_mysql.rs new file mode 100644 index 0000000..94f6e27 --- /dev/null +++ b/crates/phantom-storage/src/fjall_mysql.rs @@ -0,0 +1,229 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle}; +use phantom_core::error::StorageError; +use phantom_core::mysql::{MysqlStore, MysqlTrace}; +use phantom_core::trace::SpanId; + +pub struct FjallMysqlStore { + keyspace: Keyspace, + mysql_traces: PartitionHandle, + mysql_by_time: PartitionHandle, +} + +impl FjallMysqlStore { + pub fn open(path: impl AsRef) -> Result { + let keyspace = Config::new(path) + .open() + .map_err(|e| StorageError::Open(e.to_string()))?; + + let kv_sep_opts = PartitionCreateOptions::default() + .with_kv_separation(fjall::KvSeparationOptions::default()); + + let mysql_traces = keyspace + .open_partition("mysql_traces", kv_sep_opts) + .map_err(|e| StorageError::Open(e.to_string()))?; + + let mysql_by_time = keyspace + .open_partition("mysql_by_time", PartitionCreateOptions::default()) + .map_err(|e| StorageError::Open(e.to_string()))?; + + Ok(Self { + keyspace, + mysql_traces, + mysql_by_time, + }) + } +} + +/// Encode a `SystemTime` as big-endian nanoseconds since UNIX epoch. +fn encode_timestamp(ts: &SystemTime) -> [u8; 8] { + let nanos = ts + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_nanos() as u64; + nanos.to_be_bytes() +} + +/// Build the `mysql_by_time` key: `{timestamp_be (8B)}{span_id (8B)}`. +fn time_key(ts: &SystemTime, span_id: &SpanId) -> [u8; 16] { + let mut key = [0u8; 16]; + key[..8].copy_from_slice(&encode_timestamp(ts)); + key[8..].copy_from_slice(span_id.as_bytes()); + key +} + +impl MysqlStore for FjallMysqlStore { + fn insert(&self, trace: &MysqlTrace) -> Result<(), StorageError> { + let serialized = + serde_json::to_vec(trace).map_err(|e| StorageError::Serialization(e.to_string()))?; + + let span_key = trace.span_id.as_bytes(); + let time_k = time_key(&trace.timestamp, &trace.span_id); + + let mut batch = self.keyspace.batch(); + batch.insert(&self.mysql_traces, span_key, &serialized); + batch.insert(&self.mysql_by_time, time_k, span_key); + batch + .commit() + .map_err(|e| StorageError::Write(e.to_string()))?; + + Ok(()) + } + + fn get_by_span_id(&self, span_id: &SpanId) -> Result, StorageError> { + let Some(value) = self + .mysql_traces + .get(span_id.as_bytes()) + .map_err(|e| StorageError::Read(e.to_string()))? + else { + return Ok(None); + }; + let trace: MysqlTrace = serde_json::from_slice(&value) + .map_err(|e| StorageError::Serialization(e.to_string()))?; + Ok(Some(trace)) + } + + fn list_recent(&self, limit: usize, offset: usize) -> Result, StorageError> { + let mut results = Vec::with_capacity(limit); + for (i, entry) in self.mysql_by_time.iter().rev().enumerate() { + if i < offset { + continue; + } + if results.len() >= limit { + break; + } + let (_key, value) = entry.map_err(|e| StorageError::Read(e.to_string()))?; + let span_id_bytes: [u8; 8] = value[..8] + .try_into() + .map_err(|_| StorageError::Read("invalid span_id in mysql_by_time index".into()))?; + if let Some(trace) = self.get_by_span_id(&SpanId(span_id_bytes))? { + results.push(trace); + } + } + Ok(results) + } + + fn search_by_query( + &self, + pattern: &str, + limit: usize, + ) -> Result, StorageError> { + let mut results = Vec::new(); + for entry in self.mysql_by_time.iter().rev() { + if results.len() >= limit { + break; + } + let (_key, value) = entry.map_err(|e| StorageError::Read(e.to_string()))?; + let span_id_bytes: [u8; 8] = value[..8] + .try_into() + .map_err(|_| StorageError::Read("invalid span_id in mysql_by_time index".into()))?; + if let Some(trace) = self.get_by_span_id(&SpanId(span_id_bytes))? + && trace.query.contains(pattern) + { + results.push(trace); + } + } + Ok(results) + } + + fn count(&self) -> Result { + Ok(self.mysql_traces.approximate_len() as u64) + } +} + +#[cfg(test)] +mod tests { + use std::time::{Duration, SystemTime}; + + use super::*; + use phantom_core::mysql::{MysqlResponseKind, MysqlTrace}; + use phantom_core::trace::{SpanId, TraceId}; + + fn make_trace(query: &str, ts_offset_ms: u64) -> MysqlTrace { + MysqlTrace { + span_id: SpanId(rand_bytes_8()), + trace_id: TraceId(rand_bytes_16()), + parent_span_id: None, + query: query.to_string(), + response: MysqlResponseKind::Ok { + affected_rows: 1, + last_insert_id: 0, + warnings: 0, + }, + timestamp: SystemTime::UNIX_EPOCH + + Duration::from_secs(1_700_000_000) + + Duration::from_millis(ts_offset_ms), + duration: Duration::from_millis(4), + dest_addr: Some("127.0.0.1:3306".to_string()), + db_name: Some("testdb".to_string()), + } + } + + fn rand_bytes_8() -> [u8; 8] { + let mut buf = [0u8; 8]; + buf.iter_mut().for_each(|b| *b = rand::random()); + buf + } + + fn rand_bytes_16() -> [u8; 16] { + let mut buf = [0u8; 16]; + buf.iter_mut().for_each(|b| *b = rand::random()); + buf + } + + #[test] + fn test_mysql_insert_and_get() { + let dir = tempfile::tempdir().unwrap(); + let store = FjallMysqlStore::open(dir.path()).unwrap(); + + let trace = make_trace("SELECT * FROM users", 0); + let span_id = trace.span_id.clone(); + + store.insert(&trace).unwrap(); + + let retrieved = store.get_by_span_id(&span_id).unwrap().unwrap(); + assert_eq!(retrieved.query, "SELECT * FROM users"); + } + + #[test] + fn test_mysql_list_recent_ordering() { + let dir = tempfile::tempdir().unwrap(); + let store = FjallMysqlStore::open(dir.path()).unwrap(); + + for i in 0..5u64 { + store + .insert(&make_trace(&format!("SELECT {i}"), i * 10)) + .unwrap(); + } + + let recent = store.list_recent(3, 0).unwrap(); + assert_eq!(recent.len(), 3); + // Most recent first (highest timestamp offset = 40ms → "SELECT 4") + assert!( + recent[0].query.contains("SELECT 4"), + "got: {}", + recent[0].query + ); + } + + #[test] + fn test_mysql_search_by_query() { + let dir = tempfile::tempdir().unwrap(); + let store = FjallMysqlStore::open(dir.path()).unwrap(); + + store.insert(&make_trace("SELECT * FROM users", 0)).unwrap(); + store + .insert(&make_trace("INSERT INTO orders VALUES (1)", 10)) + .unwrap(); + store + .insert(&make_trace("SELECT * FROM products", 20)) + .unwrap(); + + let results = store.search_by_query("SELECT", 10).unwrap(); + assert_eq!(results.len(), 2); + + let results = store.search_by_query("INSERT", 10).unwrap(); + assert_eq!(results.len(), 1); + } +} diff --git a/crates/phantom-storage/src/lib.rs b/crates/phantom-storage/src/lib.rs index 12d8379..a02b7ae 100644 --- a/crates/phantom-storage/src/lib.rs +++ b/crates/phantom-storage/src/lib.rs @@ -1,3 +1,5 @@ +mod fjall_mysql; mod fjall_store; +pub use fjall_mysql::FjallMysqlStore; pub use fjall_store::FjallTraceStore; diff --git a/crates/phantom-tui/src/app.rs b/crates/phantom-tui/src/app.rs index 17d809d..8b45bd3 100644 --- a/crates/phantom-tui/src/app.rs +++ b/crates/phantom-tui/src/app.rs @@ -1,3 +1,4 @@ +use phantom_core::mysql::MysqlTrace; use phantom_core::trace::HttpTrace; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -6,14 +7,31 @@ pub enum Pane { TraceDetail, } +/// Which top-level tab is active. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum ActiveTab { + #[default] + Http, + Mysql, +} + pub struct App { + // ── HTTP tab ────────────────────────────────────────────────────────────── pub traces: Vec, pub selected_index: usize, + pub trace_count: u64, + + // ── MySQL tab ───────────────────────────────────────────────────────────── + pub mysql_traces: Vec, + pub mysql_selected_index: usize, + pub mysql_trace_count: u64, + + // ── Shared UI state ─────────────────────────────────────────────────────── pub filter: String, pub filter_active: bool, pub active_pane: Pane, + pub active_tab: ActiveTab, pub should_quit: bool, - pub trace_count: u64, pub backend_name: String, } @@ -22,15 +40,21 @@ impl App { Self { traces: Vec::new(), selected_index: 0, + trace_count: 0, + mysql_traces: Vec::new(), + mysql_selected_index: 0, + mysql_trace_count: 0, filter: String::new(), filter_active: false, active_pane: Pane::TraceList, + active_tab: ActiveTab::Http, should_quit: false, - trace_count: 0, backend_name: backend_name.to_string(), } } + // ── HTTP traces ─────────────────────────────────────────────────────────── + pub fn filtered_traces(&self) -> Vec<&HttpTrace> { if self.filter.is_empty() { self.traces.iter().collect() @@ -44,31 +68,108 @@ impl App { } pub fn selected_trace(&self) -> Option<&HttpTrace> { - let filtered = self.filtered_traces(); - filtered.get(self.selected_index).copied() + self.filtered_traces().get(self.selected_index).copied() + } + + pub fn add_trace(&mut self, trace: HttpTrace) { + self.traces.insert(0, trace); + self.trace_count += 1; + // Keep selection stable when new traces arrive at the top. + if !self.filter_active && self.selected_index > 0 { + self.selected_index += 1; + } + } + + // ── MySQL traces ────────────────────────────────────────────────────────── + + pub fn filtered_mysql_traces(&self) -> Vec<&MysqlTrace> { + if self.filter.is_empty() { + self.mysql_traces.iter().collect() + } else { + let filter_lower = self.filter.to_lowercase(); + self.mysql_traces + .iter() + .filter(|t| t.query.to_lowercase().contains(&filter_lower)) + .collect() + } + } + + pub fn selected_mysql_trace(&self) -> Option<&MysqlTrace> { + self.filtered_mysql_traces() + .get(self.mysql_selected_index) + .copied() + } + + pub fn add_mysql_trace(&mut self, trace: MysqlTrace) { + self.mysql_traces.insert(0, trace); + self.mysql_trace_count += 1; + if !self.filter_active && self.mysql_selected_index > 0 { + self.mysql_selected_index += 1; + } + } + + // ── Tab switching ───────────────────────────────────────────────────────── + + pub fn switch_tab(&mut self, tab: ActiveTab) { + self.active_tab = tab; + self.active_pane = Pane::TraceList; + self.clear_filter(); } + // ── Navigation (tab-aware) ──────────────────────────────────────────────── + pub fn move_up(&mut self) { - if self.selected_index > 0 { - self.selected_index -= 1; + match self.active_tab { + ActiveTab::Http => { + if self.selected_index > 0 { + self.selected_index -= 1; + } + } + ActiveTab::Mysql => { + if self.mysql_selected_index > 0 { + self.mysql_selected_index -= 1; + } + } } } pub fn move_down(&mut self) { - let max = self.filtered_traces().len().saturating_sub(1); - if self.selected_index < max { - self.selected_index += 1; + match self.active_tab { + ActiveTab::Http => { + let max = self.filtered_traces().len().saturating_sub(1); + if self.selected_index < max { + self.selected_index += 1; + } + } + ActiveTab::Mysql => { + let max = self.filtered_mysql_traces().len().saturating_sub(1); + if self.mysql_selected_index < max { + self.mysql_selected_index += 1; + } + } } } pub fn jump_top(&mut self) { - self.selected_index = 0; + match self.active_tab { + ActiveTab::Http => self.selected_index = 0, + ActiveTab::Mysql => self.mysql_selected_index = 0, + } } pub fn jump_bottom(&mut self) { - self.selected_index = self.filtered_traces().len().saturating_sub(1); + match self.active_tab { + ActiveTab::Http => { + self.selected_index = self.filtered_traces().len().saturating_sub(1); + } + ActiveTab::Mysql => { + self.mysql_selected_index = self.filtered_mysql_traces().len().saturating_sub(1); + } + } } + // ── Pane / filter helpers ───────────────────────────────────────────────── + pub fn toggle_pane(&mut self) { self.active_pane = match self.active_pane { Pane::TraceList => Pane::TraceDetail, @@ -88,24 +189,18 @@ impl App { self.filter.clear(); self.filter_active = false; self.selected_index = 0; + self.mysql_selected_index = 0; } pub fn push_filter_char(&mut self, c: char) { self.filter.push(c); self.selected_index = 0; + self.mysql_selected_index = 0; } pub fn pop_filter_char(&mut self) { self.filter.pop(); self.selected_index = 0; - } - - pub fn add_trace(&mut self, trace: HttpTrace) { - self.traces.insert(0, trace); - self.trace_count += 1; - // Keep selection stable when new traces arrive - if !self.filter_active && self.selected_index > 0 { - self.selected_index += 1; - } + self.mysql_selected_index = 0; } } diff --git a/crates/phantom-tui/src/lib.rs b/crates/phantom-tui/src/lib.rs index 796bf7a..f19340a 100644 --- a/crates/phantom-tui/src/lib.rs +++ b/crates/phantom-tui/src/lib.rs @@ -7,18 +7,21 @@ use std::sync::Arc; use crossterm::event::{KeyCode, KeyModifiers}; use crossterm::terminal::{self, EnterAlternateScreen, LeaveAlternateScreen}; use crossterm::{event::KeyEventKind, execute}; +use phantom_core::mysql::{MysqlStore, MysqlTrace}; use phantom_core::storage::TraceStore; use phantom_core::trace::HttpTrace; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use tokio::sync::mpsc; -use crate::app::App; +use crate::app::{ActiveTab, App}; use crate::event::{Event, EventHandler}; pub async fn run_tui( store: Arc, + mysql_store: Arc, mut trace_rx: mpsc::Receiver, + mut mysql_rx: mpsc::Receiver, backend_name: &str, ) -> std::io::Result<()> { // Initialize terminal @@ -30,24 +33,36 @@ pub async fn run_tui( let mut app = App::new(backend_name); - // Load existing traces from storage + // Load existing HTTP traces from storage if let Ok(existing) = store.list_recent(1000, 0) { app.trace_count = existing.len() as u64; app.traces = existing; } + // Load existing MySQL traces from storage + if let Ok(existing) = mysql_store.list_recent(1000, 0) { + app.mysql_trace_count = existing.len() as u64; + app.mysql_traces = existing; + } + let events = EventHandler::new(50); // 50ms tick loop { // Draw UI terminal.draw(|frame| ui::render(frame, &app))?; - // Drain all pending traces from the channel (non-blocking) + // Drain all pending HTTP traces from the channel (non-blocking) while let Ok(trace) = trace_rx.try_recv() { let _ = store.insert(&trace); app.add_trace(trace); } + // Drain all pending MySQL traces from the channel (non-blocking) + while let Ok(trace) = mysql_rx.try_recv() { + let _ = mysql_store.insert(&trace); + app.add_mysql_trace(trace); + } + // Handle events match events.poll()? { Event::Key(key) => { @@ -82,6 +97,8 @@ fn handle_normal_key(app: &mut App, code: KeyCode, modifiers: KeyModifiers) { KeyCode::Char('c') if modifiers.contains(KeyModifiers::CONTROL) => { app.should_quit = true; } + KeyCode::Char('1') => app.switch_tab(ActiveTab::Http), + KeyCode::Char('2') => app.switch_tab(ActiveTab::Mysql), KeyCode::Char('j') | KeyCode::Down => app.move_down(), KeyCode::Char('k') | KeyCode::Up => app.move_up(), KeyCode::Char('g') | KeyCode::Home => app.jump_top(), diff --git a/crates/phantom-tui/src/ui.rs b/crates/phantom-tui/src/ui.rs index 08bbd81..5ab3511 100644 --- a/crates/phantom-tui/src/ui.rs +++ b/crates/phantom-tui/src/ui.rs @@ -1,24 +1,27 @@ +use phantom_core::mysql::MysqlResponseKind; use ratatui::Frame; use ratatui::layout::{Constraint, Direction, Layout, Rect}; use ratatui::style::{Color, Modifier, Style}; use ratatui::text::{Line, Span, Text}; use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, Table, TableState, Wrap}; -use crate::app::{App, Pane}; +use crate::app::{ActiveTab, App, Pane}; pub fn render(frame: &mut Frame, app: &App) { let chunks = Layout::default() .direction(Direction::Vertical) .constraints([ Constraint::Length(1), // Status bar + Constraint::Length(1), // Tab bar Constraint::Min(0), // Main area Constraint::Length(1), // Help bar ]) .split(frame.area()); render_status_bar(frame, app, chunks[0]); - render_main(frame, app, chunks[1]); - render_help_bar(frame, app, chunks[2]); + render_tab_bar(frame, app, chunks[1]); + render_main(frame, app, chunks[2]); + render_help_bar(frame, app, chunks[3]); } fn render_status_bar(frame: &mut Frame, app: &App, area: Rect) { @@ -31,9 +34,14 @@ fn render_status_bar(frame: &mut Frame, app: &App, area: Rect) { ), Span::raw(" v0.1.0 | "), Span::styled( - format!("Traces: {}", app.trace_count), + format!("HTTP: {}", app.trace_count), Style::default().fg(Color::Green), ), + Span::raw(" | "), + Span::styled( + format!("MySQL: {}", app.mysql_trace_count), + Style::default().fg(Color::Blue), + ), Span::raw(" | Capturing via "), Span::styled(&app.backend_name, Style::default().fg(Color::Yellow)), ]); @@ -43,14 +51,49 @@ fn render_status_bar(frame: &mut Frame, app: &App, area: Rect) { ); } +fn render_tab_bar(frame: &mut Frame, app: &App, area: Rect) { + let http_style = if app.active_tab == ActiveTab::Http { + Style::default() + .fg(Color::Black) + .bg(Color::Cyan) + .add_modifier(Modifier::BOLD) + } else { + Style::default().fg(Color::DarkGray) + }; + let mysql_style = if app.active_tab == ActiveTab::Mysql { + Style::default() + .fg(Color::Black) + .bg(Color::Blue) + .add_modifier(Modifier::BOLD) + } else { + Style::default().fg(Color::DarkGray) + }; + + let tab_line = Line::from(vec![ + Span::raw(" "), + Span::styled(" [1] HTTP ", http_style), + Span::raw(" "), + Span::styled(" [2] MySQL ", mysql_style), + ]); + frame.render_widget(Paragraph::new(tab_line), area); +} + fn render_main(frame: &mut Frame, app: &App, area: Rect) { let chunks = Layout::default() .direction(Direction::Horizontal) .constraints([Constraint::Percentage(45), Constraint::Percentage(55)]) .split(area); - render_trace_list(frame, app, chunks[0]); - render_trace_detail(frame, app, chunks[1]); + match app.active_tab { + ActiveTab::Http => { + render_trace_list(frame, app, chunks[0]); + render_trace_detail(frame, app, chunks[1]); + } + ActiveTab::Mysql => { + render_mysql_list(frame, app, chunks[0]); + render_mysql_detail(frame, app, chunks[1]); + } + } } fn render_trace_list(frame: &mut Frame, app: &App, area: Rect) { @@ -289,12 +332,14 @@ fn render_help_bar(frame: &mut Frame, app: &App, area: Rect) { Line::from(vec![ Span::styled(" [q]", Style::default().fg(Color::Yellow)), Span::raw("uit "), + Span::styled("[1/2]", Style::default().fg(Color::Yellow)), + Span::raw("tab "), Span::styled("[/]", Style::default().fg(Color::Yellow)), Span::raw("filter "), Span::styled("[j/k]", Style::default().fg(Color::Yellow)), Span::raw("navigate "), Span::styled("[Tab]", Style::default().fg(Color::Yellow)), - Span::raw("switch "), + Span::raw("pane "), Span::styled("[g/G]", Style::default().fg(Color::Yellow)), Span::raw("top/bottom"), ]) @@ -305,6 +350,264 @@ fn render_help_bar(frame: &mut Frame, app: &App, area: Rect) { ); } +// ───────────────────────────────────────────────────────────────────────────── +// MySQL tab rendering +// ───────────────────────────────────────────────────────────────────────────── + +fn render_mysql_list(frame: &mut Frame, app: &App, area: Rect) { + let list_chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([Constraint::Length(3), Constraint::Min(0)]) + .split(area); + + // Filter bar (same as HTTP tab) + let filter_style = if app.filter_active { + Style::default().fg(Color::Yellow) + } else { + Style::default().fg(Color::Gray) + }; + let filter_text = if app.filter.is_empty() && !app.filter_active { + "Press / to filter".to_string() + } else { + app.filter.clone() + }; + let filter_block = Block::default() + .borders(Borders::ALL) + .border_style(filter_style) + .title(" Filter "); + frame.render_widget( + Paragraph::new(filter_text).block(filter_block), + list_chunks[0], + ); + + // MySQL trace table + let filtered = app.filtered_mysql_traces(); + + let header = Row::new(vec![ + Cell::from("Time"), + Cell::from("Query"), + Cell::from("Result"), + Cell::from("Duration"), + ]) + .style( + Style::default() + .fg(Color::Blue) + .add_modifier(Modifier::BOLD), + ); + + let rows: Vec = filtered + .iter() + .enumerate() + .map(|(i, trace)| { + let time = format_time(&trace.timestamp); + let query = truncate_str(&trace.query, 35); + let (result_str, result_color) = format_mysql_response(&trace.response); + let dur = format!("{:.0?}", trace.duration); + + let style = if i == app.mysql_selected_index { + Style::default() + .bg(Color::DarkGray) + .add_modifier(Modifier::BOLD) + } else { + Style::default() + }; + + Row::new(vec![ + Cell::from(time), + Cell::from(query), + Cell::from(result_str).style(Style::default().fg(result_color)), + Cell::from(dur).style(Style::default().fg(Color::DarkGray)), + ]) + .style(style) + }) + .collect(); + + let border_style = if app.active_pane == Pane::TraceList { + Style::default().fg(Color::Blue) + } else { + Style::default().fg(Color::DarkGray) + }; + + let table = Table::new( + rows, + [ + Constraint::Length(8), + Constraint::Min(15), + Constraint::Length(18), + Constraint::Length(8), + ], + ) + .header(header) + .block( + Block::default() + .borders(Borders::ALL) + .border_style(border_style) + .title(format!(" MySQL Queries ({}) ", filtered.len())), + ); + + let mut state = TableState::default(); + state.select(Some(app.mysql_selected_index)); + frame.render_stateful_widget(table, list_chunks[1], &mut state); +} + +fn render_mysql_detail(frame: &mut Frame, app: &App, area: Rect) { + let border_style = if app.active_pane == Pane::TraceDetail { + Style::default().fg(Color::Blue) + } else { + Style::default().fg(Color::DarkGray) + }; + + let block = Block::default() + .borders(Borders::ALL) + .border_style(border_style) + .title(" MySQL Detail "); + + let Some(trace) = app.selected_mysql_trace() else { + let empty = Paragraph::new("No query selected") + .style(Style::default().fg(Color::DarkGray)) + .block(block); + frame.render_widget(empty, area); + return; + }; + + let mut lines: Vec = Vec::new(); + + // Query + lines.push(Line::from(Span::styled( + "Query", + Style::default() + .fg(Color::Blue) + .add_modifier(Modifier::BOLD), + ))); + lines.push(Line::from("")); + for query_line in trace.query.lines() { + lines.push(Line::from(Span::styled( + query_line.to_string(), + Style::default().fg(Color::White), + ))); + } + lines.push(Line::from("")); + lines.push(Line::from(Span::styled( + "━".repeat(40), + Style::default().fg(Color::DarkGray), + ))); + lines.push(Line::from("")); + + // Response + lines.push(Line::from(Span::styled( + "Result", + Style::default() + .fg(Color::Blue) + .add_modifier(Modifier::BOLD), + ))); + lines.push(Line::from("")); + match &trace.response { + MysqlResponseKind::Ok { + affected_rows, + last_insert_id, + warnings, + } => { + lines.push(Line::from(vec![ + Span::styled( + "OK", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::raw(format!(" {affected_rows} row(s) affected")), + ])); + if *last_insert_id > 0 { + lines.push(Line::from(format!(" last_insert_id = {last_insert_id}"))); + } + if *warnings > 0 { + lines.push(Line::from(Span::styled( + format!(" {warnings} warning(s)"), + Style::default().fg(Color::Yellow), + ))); + } + } + MysqlResponseKind::ResultSet { + column_count, + row_count, + } => { + lines.push(Line::from(vec![ + Span::styled( + "ResultSet", + Style::default() + .fg(Color::Green) + .add_modifier(Modifier::BOLD), + ), + Span::raw(format!(" {column_count} col(s), {row_count} row(s)")), + ])); + } + MysqlResponseKind::Err { + error_code, + sql_state, + message, + } => { + lines.push(Line::from(vec![ + Span::styled( + format!("ERR {error_code}"), + Style::default().fg(Color::Red).add_modifier(Modifier::BOLD), + ), + Span::raw(format!(" ({sql_state})")), + ])); + lines.push(Line::from(Span::styled( + message.clone(), + Style::default().fg(Color::Red), + ))); + } + } + + lines.push(Line::from("")); + lines.push(Line::from(Span::styled( + "━".repeat(40), + Style::default().fg(Color::DarkGray), + ))); + lines.push(Line::from("")); + + // Metadata + lines.push(Line::from(Span::styled( + "Metadata", + Style::default() + .fg(Color::Blue) + .add_modifier(Modifier::BOLD), + ))); + lines.push(Line::from(format!(" Duration: {:.0?}", trace.duration))); + lines.push(Line::from(format!( + " Timestamp: {}", + format_time(&trace.timestamp) + ))); + if let Some(addr) = &trace.dest_addr { + lines.push(Line::from(format!(" Server: {addr}"))); + } + if let Some(db) = &trace.db_name { + lines.push(Line::from(format!(" Database: {db}"))); + } + + let detail = Paragraph::new(Text::from(lines)) + .block(block) + .wrap(Wrap { trim: false }); + frame.render_widget(detail, area); +} + +/// Format a `MysqlResponseKind` as a short display string and its color. +fn format_mysql_response(response: &MysqlResponseKind) -> (String, Color) { + match response { + MysqlResponseKind::Ok { affected_rows, .. } => { + (format!("OK {affected_rows} row(s)"), Color::Cyan) + } + MysqlResponseKind::ResultSet { + column_count, + row_count, + } => ( + format!("{column_count} cols, {row_count} rows"), + Color::Green, + ), + MysqlResponseKind::Err { error_code, .. } => (format!("ERR {error_code}"), Color::Red), + } +} + fn format_time(ts: &std::time::SystemTime) -> String { let duration = ts.duration_since(std::time::UNIX_EPOCH).unwrap_or_default(); let secs = duration.as_secs(); diff --git a/src/main.rs b/src/main.rs index 3058ebc..2ad35b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,9 +5,10 @@ use std::time::UNIX_EPOCH; use clap::{Parser, ValueEnum}; use phantom_capture::ProxyCaptureBackend; use phantom_core::capture::CaptureBackend; +use phantom_core::mysql::{MysqlStore, MysqlTrace}; use phantom_core::storage::TraceStore; use phantom_core::trace::HttpTrace; -use phantom_storage::FjallTraceStore; +use phantom_storage::{FjallMysqlStore, FjallTraceStore}; use serde::Serialize; // ───────────────────────────────────────────────────────────────────────────── @@ -149,6 +150,43 @@ fn trace_to_jsonl(t: &HttpTrace) -> JsonlTrace { } } +/// Serializable representation of a MySQL trace for JSONL output. +#[derive(Serialize)] +struct JsonlMysqlTrace { + #[serde(rename = "type")] + kind: &'static str, + timestamp_ms: u64, + duration_ms: u64, + query: String, + response: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + dest_addr: Option, + #[serde(skip_serializing_if = "Option::is_none")] + db_name: Option, + trace_id: String, + span_id: String, +} + +fn mysql_trace_to_jsonl(t: &MysqlTrace) -> JsonlMysqlTrace { + let timestamp_ms = t + .timestamp + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + let response = serde_json::to_value(&t.response).unwrap_or(serde_json::Value::Null); + JsonlMysqlTrace { + kind: "mysql", + timestamp_ms, + duration_ms: t.duration.as_millis() as u64, + query: t.query.clone(), + response, + dest_addr: t.dest_addr.clone(), + db_name: t.db_name.clone(), + trace_id: t.trace_id.to_string(), + span_id: t.span_id.to_string(), + } +} + /// Runs the JSONL output loop: each captured trace is serialized and written to /// stdout as a single JSON object followed by a newline. /// @@ -158,7 +196,9 @@ fn trace_to_jsonl(t: &HttpTrace) -> JsonlTrace { /// - The optional `child` process exits (ldpreload mode). async fn run_jsonl_output( store: Arc, + mysql_store: Arc, mut trace_rx: tokio::sync::mpsc::Receiver, + mut mysql_rx: tokio::sync::mpsc::Receiver, child: Option, ) -> anyhow::Result<()> { // Spawn a background thread to wait() on the child so we don't block the @@ -188,6 +228,15 @@ async fn run_jsonl_output( None => break, } } + maybe_mysql = mysql_rx.recv() => { + match maybe_mysql { + Some(t) => { + mysql_store.insert(&t).ok(); + println!("{}", serde_json::to_string(&mysql_trace_to_jsonl(&t))?); + } + None => break, + } + } _ = &mut ctrl_c => break, // When the child exits, wait briefly for the backend to flush any // in-flight datagrams, then drain whatever arrived. @@ -203,6 +252,10 @@ async fn run_jsonl_output( store.insert(&t).ok(); println!("{}", serde_json::to_string(&trace_to_jsonl(&t))?); } + while let Ok(t) = mysql_rx.try_recv() { + mysql_store.insert(&t).ok(); + println!("{}", serde_json::to_string(&mysql_trace_to_jsonl(&t))?); + } break; } } @@ -230,11 +283,12 @@ async fn main() -> anyhow::Result<()> { std::fs::create_dir_all(&data_dir)?; let store = Arc::new(FjallTraceStore::open(&data_dir)?); + let mysql_store = Arc::new(FjallMysqlStore::open(&data_dir)?); match cli.backend { - Backend::Proxy => run_proxy(cli, store).await, + Backend::Proxy => run_proxy(cli, store, mysql_store).await, #[cfg(target_os = "linux")] - Backend::Ldpreload => run_ldpreload(cli, store).await, + Backend::Ldpreload => run_ldpreload(cli, store, mysql_store).await, } } @@ -242,10 +296,16 @@ async fn main() -> anyhow::Result<()> { // Proxy backend // ───────────────────────────────────────────────────────────────────────────── -async fn run_proxy(cli: Cli, store: Arc) -> anyhow::Result<()> { +async fn run_proxy( + cli: Cli, + store: Arc, + mysql_store: Arc, +) -> anyhow::Result<()> { let mut backend = ProxyCaptureBackend::new(cli.port); let backend_name = backend.name().to_string(); let trace_rx = backend.start().map_err(|e| anyhow::anyhow!("{e}"))?; + // Proxy backend does not capture MySQL — provide a never-sending dummy channel. + let (_mysql_tx, mysql_rx) = tokio::sync::mpsc::channel::(1); match cli.output { OutputMode::Tui => { @@ -259,14 +319,14 @@ async fn run_proxy(cli: Cli, store: Arc) -> anyhow::Result<()> "phantom: traces stored in {}", store_path_display(&cli.data_dir) ); - phantom_tui::run_tui(store, trace_rx, &backend_name).await?; + phantom_tui::run_tui(store, mysql_store, trace_rx, mysql_rx, &backend_name).await?; } OutputMode::Jsonl => { eprintln!( "phantom: proxy listening on 127.0.0.1:{} [jsonl mode]", cli.port ); - run_jsonl_output(store, trace_rx, None).await?; + run_jsonl_output(store, mysql_store, trace_rx, mysql_rx, None).await?; } } @@ -279,7 +339,11 @@ async fn run_proxy(cli: Cli, store: Arc) -> anyhow::Result<()> // ───────────────────────────────────────────────────────────────────────────── #[cfg(target_os = "linux")] -async fn run_ldpreload(cli: Cli, store: Arc) -> anyhow::Result<()> { +async fn run_ldpreload( + cli: Cli, + store: Arc, + mysql_store: Arc, +) -> anyhow::Result<()> { use phantom_capture::LdPreloadCaptureBackend; let agent_lib = cli.agent_lib.clone().ok_or_else(|| { @@ -307,7 +371,10 @@ async fn run_ldpreload(cli: Cli, store: Arc) -> anyhow::Result< let mut backend = LdPreloadCaptureBackend::new(socket_path.clone()); let backend_name = backend.name().to_string(); - let trace_rx = backend.start().map_err(|e| anyhow::anyhow!("{e}"))?; + // Use start_mysql_aware() to get both HTTP and MySQL channels. + let (trace_rx, mysql_rx) = backend + .start_mysql_aware() + .map_err(|e| anyhow::anyhow!("{e}"))?; eprintln!("phantom: ldpreload backend active"); eprintln!(" agent lib : {}", agent_lib.display()); @@ -331,11 +398,11 @@ async fn run_ldpreload(cli: Cli, store: Arc) -> anyhow::Result< match cli.output { OutputMode::Tui => { // In TUI mode the user quits manually; child runs in background. - phantom_tui::run_tui(store, trace_rx, &backend_name).await?; + phantom_tui::run_tui(store, mysql_store, trace_rx, mysql_rx, &backend_name).await?; } OutputMode::Jsonl => { // In JSONL mode we exit automatically when the child finishes. - run_jsonl_output(store, trace_rx, Some(child)).await?; + run_jsonl_output(store, mysql_store, trace_rx, mysql_rx, Some(child)).await?; } } From b461b0fa02ed1760e747d7b195b3e280feb11c01 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Mar 2026 01:08:21 +0000 Subject: [PATCH 2/2] docs: add MySQL trace implementation plan Records the design decisions, implementation order, and test strategy that guided the MySQL trace feature addition. https://claude.ai/code/session_015N7iRe1Pkx41rHgvgKQaTC --- mysql_trace_plan.md | 557 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 557 insertions(+) create mode 100644 mysql_trace_plan.md diff --git a/mysql_trace_plan.md b/mysql_trace_plan.md new file mode 100644 index 0000000..a54adb0 --- /dev/null +++ b/mysql_trace_plan.md @@ -0,0 +1,557 @@ +# MySQL クエリトレース機能 実装計画 + +## 概要 + +LD_PRELOAD バックエンド向けに MySQL COM_QUERY トレース機能を追加する。 +`connect()` フックでポート3306の接続を検出し、MySQL wire protocol を解析して +SQLクエリとレスポンスをキャプチャする。TUIにMySQLタブを追加して表示する。 + +スコープ: +- **バックエンド**: LD_PRELOAD のみ(プロキシ不要) +- **コマンド**: COM_QUERY のみ(プリペアドステートメント除外) + +--- + +## 1. データモデル設計 + +### 1.1 新規ファイル: `crates/phantom-core/src/mysql.rs` + +```rust +use std::time::{Duration, SystemTime}; +use serde::{Deserialize, Serialize}; +use crate::{error::StorageError, trace::{SpanId, TraceId}}; + +/// MySQL クエリの実行結果(3種類のみ) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MysqlResponseKind { + ResultSet { column_count: u64, row_count: u64 }, + Ok { affected_rows: u64, last_insert_id: u64, warnings: u16 }, + Err { error_code: u16, sql_state: String, message: String }, +} + +/// MySQL 単一クエリのトレースレコード +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MysqlTrace { + pub span_id: SpanId, + pub trace_id: TraceId, + pub parent_span_id: Option, + pub query: String, + pub response: MysqlResponseKind, + pub timestamp: SystemTime, + pub duration: Duration, + pub dest_addr: Option, + pub db_name: Option, +} + +/// MySQL トレースストアトレイト(TraceStore と同じパターン) +pub trait MysqlStore: Send + Sync { + fn insert(&self, trace: &MysqlTrace) -> Result<(), StorageError>; + fn get_by_span_id(&self, span_id: &SpanId) -> Result, StorageError>; + fn list_recent(&self, limit: usize, offset: usize) -> Result, StorageError>; + fn search_by_query(&self, pattern: &str, limit: usize) -> Result, StorageError>; + fn count(&self) -> Result; +} +``` + +### 1.2 IPC メッセージ形式(エージェント側) + +HTTP と MySQL を同一ソケットで多重化するために `msg_type` フィールドを使用: + +```json +// HTTP(既存形式に msg_type を追加) +{"msg_type":"http","method":"GET","url":"...","status_code":200,...} + +// MySQL(新規) +{ + "msg_type":"mysql", + "query":"SELECT * FROM users", + "duration_ms":4, + "timestamp_ms":1234567890, + "dest_addr":"127.0.0.1:3306", + "db_name":"mydb", + "affected_rows":null, + "last_insert_id":null, + "warnings":null, + "column_count":3, + "row_count":12, + "error_code":null, + "sql_state":null, + "error_message":null +} +``` + +レスポンス種別判別ロジック: +- `error_code.is_some()` → `MysqlResponseKind::Err` +- `column_count.is_some()` → `MysqlResponseKind::ResultSet` +- それ以外 → `MysqlResponseKind::Ok` + +--- + +## 2. 変更・新規作成ファイル一覧 + +| ファイル | 種別 | 変更内容 | +|---|---|---| +| `crates/phantom-core/src/mysql.rs` | **新規** | `MysqlTrace`, `MysqlResponseKind`, `MysqlStore` | +| `crates/phantom-core/src/lib.rs` | 変更 | `pub mod mysql;` を追加 | +| `crates/phantom-storage/src/fjall_mysql.rs` | **新規** | `FjallMysqlStore` 実装 | +| `crates/phantom-storage/src/lib.rs` | 変更 | `FjallMysqlStore` をエクスポート | +| `crates/phantom-agent/src/lib.rs` | 変更 | `connect()` フック、`MysqlConnState`、MySQL 解析 | +| `crates/phantom-capture/src/ldpreload.rs` | 変更 | `start_mysql_aware()` 追加、`msg_type` 分岐 | +| `crates/phantom-tui/src/app.rs` | 変更 | MySQL タブ状態追加 | +| `crates/phantom-tui/src/ui.rs` | 変更 | MySQL タブ描画追加 | +| `crates/phantom-tui/src/lib.rs` | 変更 | `run_tui()` シグネチャ拡張 | +| `src/main.rs` | 変更 | MySQL ストア・チャネル配線 | + +--- + +## 3. 実装詳細 + +### Step 1: `phantom-core` — 型・トレイト定義 + +**`crates/phantom-core/src/mysql.rs`** を新規作成。 +**`crates/phantom-core/src/lib.rs`** に `pub mod mysql;` を追加。 + +インラインテスト: +- `test_mysql_response_kind_serde` — 3バリアント全てのシリアライズ往復テスト +- `test_mysql_trace_serde_roundtrip` — `MysqlTrace` 全体のシリアライズ往復テスト + +--- + +### Step 2: `phantom-storage` — Fjall ストア + +**`crates/phantom-storage/src/fjall_mysql.rs`** を新規作成。 + +```rust +pub struct FjallMysqlStore { + keyspace: Keyspace, + mysql_traces: PartitionHandle, // span_id (8B) → JSON + mysql_by_time: PartitionHandle, // timestamp_be (8B) ++ span_id (8B) → span_id (8B) +} +``` + +`FjallTraceStore` と同じパターン: +- `open(path)` で既存のキースペースに追加パーティションを開く +- `insert()` でバッチコミット(2パーティション同時書き込み) +- `list_recent()` は `mysql_by_time` を逆順スキャン → `mysql_traces` ルックアップ +- `search_by_query()` は `mysql_traces` 全件スキャンでSQL文字列マッチ + +インラインテスト: +- `test_mysql_insert_and_get` +- `test_mysql_list_recent_ordering` +- `test_mysql_search_by_query` + +--- + +### Step 3: `phantom-agent` — MySQL プロトコル解析 + +**`crates/phantom-agent/src/lib.rs`** に以下を追加: + +#### 3.1 connect() フック + +```rust +// ポート設定(環境変数、デフォルト3306) +static MYSQL_PORT: OnceLock = OnceLock::new(); + +fn mysql_port() -> u16 { + *MYSQL_PORT.get_or_init(|| { + std::env::var("PHANTOM_MYSQL_PORT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3306) + }) +} + +redhook::hook! { + unsafe fn connect( + sockfd: c_int, + addr: *const libc::sockaddr, + addrlen: libc::socklen_t, + ) -> c_int => phantom_connect { + // 再入ガード適用 + // 実際の connect() を呼び出し + // AF_INET / AF_INET6 からポートを取得 + // ポート一致 → STATE_MAP に MysqlConnection を登録 + } +} +``` + +sockaddr パース(外部クレート不要、ポインタキャストで処理): +```rust +fn extract_port(addr: *const libc::sockaddr, addrlen: libc::socklen_t) -> Option { + unsafe { + if addrlen as usize >= std::mem::size_of::() { + let sa = &*(addr as *const libc::sockaddr_in); + if sa.sin_family as i32 == libc::AF_INET { + return Some(u16::from_be(sa.sin_port)); + } + } + if addrlen as usize >= std::mem::size_of::() { + let sa = &*(addr as *const libc::sockaddr_in6); + if sa.sin6_family as i32 == libc::AF_INET6 { + return Some(u16::from_be(sa.sin6_port)); + } + } + None + } +} +``` + +#### 3.2 FdState 拡張 + +```rust +enum FdState { + CollectingRequest { buf: Vec }, + CollectingResponse { ... }, + Http2(Box), + MysqlConnection(Box), // ← 新規 +} +``` + +#### 3.3 MysqlConnState ステートマシン + +```rust +struct MysqlConnState { + dest_addr: Option, + db_name: Option, + send_buf: Vec, // クライアント → サーバー + recv_buf: Vec, // サーバー → クライアント + // ハンドシェイク追跡 + handshake_phase: HandshakePhase, + // クエリ追跡 + query_state: MysqlQueryState, +} + +enum HandshakePhase { + WaitingGreeting, // サーバーの初期挨拶待ち + WaitingAuthOk, // 認証OK待ち + Done, // ハンドシェイク完了 +} + +enum MysqlQueryState { + Idle, + AwaitingResponse { query: String, started_at: Instant, timestamp_ms: u64 }, + ReadingResultSet { + query: String, started_at: Instant, timestamp_ms: u64, + column_count: u64, // 最初のパケットから取得 + row_count: u64, // データパケットをカウント + phase: ResultSetPhase, + }, +} + +enum ResultSetPhase { + ReadingColumns { cols_seen: u64 }, + ReadingRows, +} +``` + +#### 3.4 MySQL パケット解析 + +```rust +/// MySQL パケット: [3B length LE][1B seq_id][payload] +fn parse_mysql_packet(buf: &[u8]) -> Option<(usize, u8, &[u8])> { + if buf.len() < 4 { return None; } + let len = u32::from_le_bytes([buf[0], buf[1], buf[2], 0]) as usize; + if buf.len() < 4 + len { return None; } + Some((4 + len, buf[3], &buf[4..4 + len])) +} + +/// MySQL 可変長整数デコード +fn decode_lenenc_int(buf: &[u8]) -> Option<(u64, usize)> { + match buf.first()? { + n @ 0x00..=0xfb => Some((*n as u64, 1)), + 0xfc => { + if buf.len() < 3 { return None; } + Some((u16::from_le_bytes([buf[1], buf[2]]) as u64, 3)) + } + 0xfd => { + if buf.len() < 4 { return None; } + Some((u32::from_le_bytes([buf[1], buf[2], buf[3], 0]) as u64, 4)) + } + 0xfe => { + if buf.len() < 9 { return None; } + Some((u64::from_le_bytes(buf[1..9].try_into().ok()?), 9)) + } + _ => None, + } +} +``` + +処理フロー(`process_mysql_outgoing`): +1. `send_buf` に追記 +2. `parse_mysql_packet` で先頭パケットを取り出す +3. `HandshakePhase::Done` かつ `seq_id == 0` かつ `payload[0] == 0x03` → COM_QUERY 検出 +4. query text = `String::from_utf8_lossy(&payload[1..])` で取得 +5. `MysqlQueryState::AwaitingResponse` に遷移 + +処理フロー(`process_mysql_incoming`): +1. `recv_buf` に追記 +2. パケット取り出し +3. `HandshakePhase::WaitingGreeting`: seq_id=0, payload[0]=0x0a → `WaitingAuthOk` へ +4. `HandshakePhase::WaitingAuthOk`: payload[0]=0x00 かつ seq_id>=2 → `Done` へ +5. `MysqlQueryState::AwaitingResponse`: + - `0x00` → OK パケット(affected_rows/last_insert_id 解析) → emit → `Idle` + - `0xff` → ERR パケット(error_code/sql_state/message 解析) → emit → `Idle` + - それ以外 → ResultSet 開始(column_count を lenenc_int で解析)→ `ReadingResultSet` +6. `ReadingResultSet`: + - カラム定義フェーズ: `cols_seen < column_count` ならカラム定義パケットをスキップ + - EOF パケット(`0xfe`, < 9 bytes)でカラム定義終了 → `ReadingRows` へ + - 行フェーズ: `0xfe`(EOF)または `0x00`(CLIENT_DEPRECATE_EOF の OK)で終了 → emit + +#### 3.5 MysqlTraceMsg と emit + +```rust +#[derive(serde::Serialize)] +struct MysqlTraceMsg { + msg_type: &'static str, // = "mysql" + query: String, + duration_ms: u64, + timestamp_ms: u64, + dest_addr: Option, + db_name: Option, + // OK フィールド + affected_rows: Option, + last_insert_id: Option, + warnings: Option, + // ResultSet フィールド + column_count: Option, + row_count: Option, + // ERR フィールド + error_code: Option, + sql_state: Option, + error_message: Option, +} +``` + +同一の `emit_msg()` / Unix ソケットを使用(既存インフラを再利用)。 + +#### 3.6 既存 HTTP フック統合 + +`process_outgoing(key, data, tls)` を修正: +- `state_map().lock()` で状態を取得 +- `FdState::MysqlConnection` の場合 → `process_mysql_outgoing()` に分岐 + +`process_incoming(key, data, tls)` / `process_teardown(key)` も同様。 + +インラインテスト(`#[cfg(test)]` モジュール): +- `test_parse_mysql_packet_complete` / `test_parse_mysql_packet_incomplete` +- `test_decode_lenenc_int_all_forms` +- `test_mysql_handshake_detection` +- `test_mysql_state_ok_response` +- `test_mysql_state_err_response` +- `test_mysql_state_resultset` + +--- + +### Step 4: `phantom-capture/ldpreload.rs` — IPC 多重化 + +#### msg_type による分岐 + +```rust +// 現行: serde_json::from_slice::(&buf[..n]) +// 変更後: +let val: serde_json::Value = serde_json::from_slice(&buf[..n])?; +match val.get("msg_type").and_then(|v| v.as_str()) { + Some("mysql") => { /* MySQL パース */ } + _ => { /* HTTP パース(既存ロジック) */ } +} +``` + +既存の HTTP メッセージは `msg_type` フィールドがなくても `_` ブランチで処理される。 + +#### 新メソッド `start_mysql_aware()` + +```rust +pub fn start_mysql_aware( + &mut self, +) -> Result<(mpsc::Receiver, mpsc::Receiver), CaptureError> { + // ソケットバインド、チャネル生成 + // タスクスポーン: msg_type で HTTP/MySQL に分配 + Ok((http_rx, mysql_rx)) +} +``` + +既存 `CaptureBackend::start()` は `start_mysql_aware()` を呼び出し、MySQL 受信側を drop: +```rust +fn start(&mut self) -> Result, CaptureError> { + let (http_rx, _mysql_rx) = self.start_mysql_aware()?; + Ok(http_rx) +} +``` + +これにより既存のプロキシバックエンドとのインターフェース互換性を維持する。 + +--- + +### Step 5: `phantom-tui` — MySQL タブ + +#### `app.rs` 変更 + +```rust +#[derive(Debug, Default, PartialEq)] +pub enum ActiveTab { #[default] Http, Mysql } + +pub struct App { + // 既存フィールド... + pub mysql_traces: Vec, + pub mysql_selected_index: usize, + pub mysql_trace_count: u64, + pub active_tab: ActiveTab, +} + +impl App { + pub fn add_mysql_trace(&mut self, trace: MysqlTrace) { ... } + pub fn selected_mysql_trace(&self) -> Option<&MysqlTrace> { ... } + pub fn filtered_mysql_traces(&self) -> Vec<&MysqlTrace> { ... } + pub fn switch_tab(&mut self, tab: ActiveTab) { ... } +} +``` + +ナビゲーションキー(`j/k/↑/↓`)はアクティブタブの対応リストに作用。 + +#### `ui.rs` 変更 + +タブバー(メインエリア上部に追加): +``` + [1] HTTP (42) [2] MySQL (7) +``` + +MySQL リスト列: +``` +Time | Query (truncated 60 chars) | Result | Duration +12:34:56 | SELECT * FROM users WHERE id = 1 | 3 cols, 12 rows | 4ms +12:34:57 | INSERT INTO events (type) VALUES ('auth')| OK, 1 affected | 2ms +12:34:58 | SELECT * FROM nonexistent_table | ERR 1146 | 1ms +``` + +カラーコーディング: +- ResultSet → Green +- Ok → Cyan +- Err → Red + +MySQL 詳細パネル(右ペイン): +- クエリ全文(ラップ表示) +- レスポンス詳細(affected_rows / column_count+row_count / error message) +- タイムスタンプ、実行時間、接続先アドレス + +#### `lib.rs` 変更 + +```rust +pub async fn run_tui( + store: Arc, + mysql_store: Arc, + trace_rx: mpsc::Receiver, + mysql_rx: mpsc::Receiver, + backend_name: &str, +) -> std::io::Result<()> +``` + +起動時の既存トレース読み込み: +```rust +// HTTP(既存) +for trace in store.list_recent(1000, 0).unwrap_or_default() { + app.add_trace(trace); +} +// MySQL(新規) +for trace in mysql_store.list_recent(1000, 0).unwrap_or_default() { + app.add_mysql_trace(trace); +} +``` + +メインループでの `mysql_rx.try_recv()` 追記。 + +キーバインド追加: +- `1` → HTTP タブに切り替え +- `2` → MySQL タブに切り替え + +--- + +### Step 6: `src/main.rs` — 配線 + +```rust +// ストア(既存 FjallTraceStore の隣に開く) +let mysql_store = Arc::new(FjallMysqlStore::open(&data_dir)?); + +// バックエンド別チャネル +let (trace_rx, mysql_rx) = match cli.backend { + Backend::Proxy => { + let mut backend = ProxyCaptureBackend::new(cli.port); + let trace_rx = backend.start()?; + let (_tx, mysql_rx) = mpsc::channel::(1); // ダミー(送信なし) + (trace_rx, mysql_rx) + } + Backend::Ldpreload => { + let mut backend = /* 既存 */; + backend.start_mysql_aware()? + } +}; + +// TUI or JSONL +match cli.output { + OutputMode::Tui => { + phantom_tui::run_tui(store, mysql_store, trace_rx, mysql_rx, &backend.name()).await? + } + OutputMode::Jsonl => { + // mysql_rx からも受信して stdout に JSONL 出力 + run_jsonl_output(store, mysql_store, trace_rx, mysql_rx, None).await? + } +} +``` + +ステータスバーの更新: +``` +phantom v0.1.0 | HTTP: 42 | MySQL: 7 | Capturing via ldpreload +``` + +--- + +## 4. 実装順序(依存関係順) + +``` +Step 1: phantom-core (mysql.rs) ← すべての基盤 + ↓ +Step 2: phantom-storage (fjall_mysql.rs) ← core に依存 + ↓ +Step 3: phantom-agent (lib.rs) ← 独立(依存なし)、2と並行可 + ↓ +Step 4: phantom-capture (ldpreload.rs) ← core + agent IPC 形式に依存 + ↓ +Step 5: phantom-tui ← core に依存 + ↓ +Step 6: src/main.rs ← 全ての依存先 +``` + +--- + +## 5. テスト戦略 + +### ユニットテスト(インライン `#[cfg(test)]`) + +| ファイル | テスト名 | +|---|---| +| `phantom-core/src/mysql.rs` | `test_mysql_response_kind_serde`, `test_mysql_trace_serde_roundtrip` | +| `phantom-storage/src/fjall_mysql.rs` | `test_mysql_insert_and_get`, `test_mysql_list_recent_ordering`, `test_mysql_search_by_query` | +| `phantom-agent/src/lib.rs` | `test_parse_mysql_packet_*`, `test_decode_lenenc_int_*`, `test_mysql_state_machine_*` | + +### 統合テスト(Docker Compose) + +`compose.yaml` に MySQL サービスを追加し、LD_PRELOAD 経由でクエリを送信して +トレースが正しくキャプチャされるかを検証。 + +--- + +## 6. 既存コードへの影響 + +| 変更点 | 影響 | +|---|---| +| `TraceMsg` に `msg_type` フィールドなし | `ldpreload.rs` で `_` マッチに fallback するため後方互換 | +| `run_tui()` シグネチャ変更 | `main.rs` のみ変更(public API は crate 内のみ) | +| `FdState` に `MysqlConnection` 追加 | `phantom-agent` はワークスペース外なので影響なし | +| 新パーティション(`mysql_*`) | 既存のキースペースに追加、既存パーティションと競合なし | + +--- + +## 7. 環境変数 + +| 変数名 | 説明 | デフォルト | +|---|---|---| +| `PHANTOM_MYSQL_PORT` | MySQL 接続検出ポート | `3306` |