From aef7e8fc9018b48f39956a707be4f73b8b18cffd Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 2 Apr 2026 22:00:14 +0000 Subject: [PATCH] qemu: Add async QMP client, always-on QMP monitor, and NetworkMode::None QMP is useful for many things around dynamic control of the VM. Let's add a QMP channel by default. This adds some basic infrastructure around hotplugging virtio-serial channels, which I'm thinking about using for dynamic host <-> VM communications. Also add NetworkMode::None for fully isolated VMs where all communication happens over virtio-serial. Assisted-by: OpenCode (Claude Opus 4) Signed-off-by: Colin Walters --- Cargo.lock | 2 + crates/bcvk-qemu/Cargo.toml | 6 +- crates/bcvk-qemu/src/lib.rs | 5 +- crates/bcvk-qemu/src/qemu.rs | 114 +++++++++- crates/bcvk-qemu/src/qmp.rs | 398 +++++++++++++++++++++++++++++++++++ 5 files changed, 519 insertions(+), 6 deletions(-) create mode 100644 crates/bcvk-qemu/src/qmp.rs diff --git a/Cargo.lock b/Cargo.lock index a9680fa..09ffd7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,6 +281,8 @@ dependencies = [ "libc", "nix", "rustix", + "serde_json", + "tempfile", "tokio", "tracing", "vsock", diff --git a/crates/bcvk-qemu/Cargo.toml b/crates/bcvk-qemu/Cargo.toml index f4dbe36..8aa3e08 100644 --- a/crates/bcvk-qemu/Cargo.toml +++ b/crates/bcvk-qemu/Cargo.toml @@ -14,9 +14,13 @@ data-encoding = "2.9" libc = "0.2" nix = { version = "0.29", features = ["socket"] } rustix = { version = "1", features = ["pipe", "process"] } -tokio = { version = "1", features = ["fs", "process", "time", "macros"] } +serde_json = "1" +tokio = { version = "1", features = ["fs", "io-util", "net", "process", "rt", "time", "macros"] } tracing = { workspace = true } vsock = "=0.5.1" +[dev-dependencies] +tempfile = "3" + [lints] workspace = true diff --git a/crates/bcvk-qemu/src/lib.rs b/crates/bcvk-qemu/src/lib.rs index 91cbffe..1550988 100644 --- a/crates/bcvk-qemu/src/lib.rs +++ b/crates/bcvk-qemu/src/lib.rs @@ -44,6 +44,8 @@ mod credentials; mod qemu; +/// Minimal QMP (QEMU Machine Protocol) client for runtime VM control. +pub mod qmp; mod virtiofsd; pub use credentials::{ @@ -54,7 +56,8 @@ pub use credentials::{ pub use qemu::{ BootMode, DiskFormat, DisplayMode, MachineType, NetworkMode, QemuConfig, ResourceLimits, - RunningQemu, VirtioBlkDevice, VirtioSerialOut, VirtiofsMount, VHOST_VSOCK, + RunningQemu, VirtioBlkDevice, VirtioSerialBidir, VirtioSerialOut, VirtiofsMount, + QMP_SOCKET_PATH, VHOST_VSOCK, }; pub use virtiofsd::{spawn_virtiofsd_async, validate_virtiofsd_config, VirtiofsConfig}; diff --git a/crates/bcvk-qemu/src/qemu.rs b/crates/bcvk-qemu/src/qemu.rs index 5c88e07..383a154 100644 --- a/crates/bcvk-qemu/src/qemu.rs +++ b/crates/bcvk-qemu/src/qemu.rs @@ -27,6 +27,9 @@ use crate::VirtiofsConfig; /// The device path for vsock allocation. pub const VHOST_VSOCK: &str = "/dev/vhost-vsock"; +/// Default path for the QMP (QEMU Machine Protocol) Unix socket. +pub const QMP_SOCKET_PATH: &str = "/run/bcvk-qmp.sock"; + /// VirtIO-FS mount point configuration. #[derive(Debug, Clone)] pub struct VirtiofsMount { @@ -48,6 +51,23 @@ pub struct VirtioSerialOut { pub append: bool, } +/// Bidirectional VirtIO-Serial device backed by a Unix socket. +/// +/// Unlike [`VirtioSerialOut`] which uses a write-only `chardev file` backend, +/// this uses a `chardev socket` backend for full bidirectional communication. +/// The guest sees `/dev/virtio-ports/{name}` as a read-write character device. +/// The host connects to the Unix socket to exchange data in both directions. +/// +/// Used by the shell relay feature to provide interactive shell access to +/// VMs without requiring SSH. +#[derive(Debug, Clone)] +pub struct VirtioSerialBidir { + /// Device name (becomes /dev/virtio-ports/{name}). + pub name: String, + /// Unix socket path on the host for bidirectional communication. + pub socket_path: String, +} + /// Disk image format for virtio-blk devices. #[derive(Debug, Clone, Copy, Default)] pub enum DiskFormat { @@ -100,6 +120,9 @@ pub enum NetworkMode { /// Port forwarding rules: "tcp::2222-:22" format. hostfwd: Vec, }, + /// No network device. Useful for fully isolated VMs where + /// all host-guest communication happens over virtio-serial. + None, } impl Default for NetworkMode { @@ -202,8 +225,10 @@ pub struct QemuConfig { fdset: Vec>, /// Additional VirtIO-FS mounts. pub additional_mounts: Vec, - /// Virtio-serial output devices. + /// Virtio-serial output devices (unidirectional, guest -> host). pub virtio_serial_devices: Vec, + /// Virtio-serial bidirectional devices (backed by Unix sockets). + pub virtio_serial_bidir_devices: Vec, /// Virtio-blk block devices. pub virtio_blk_devices: Vec, /// Display/console mode. @@ -229,6 +254,12 @@ pub struct QemuConfig { /// fw_cfg entries for passing config files to the guest fw_cfg_entries: Vec<(String, Utf8PathBuf)>, + + /// Path for the QMP (QEMU Machine Protocol) Unix socket. + /// + /// QMP is always enabled. If not explicitly set, defaults to + /// [`QMP_SOCKET_PATH`] at spawn time. + pub qmp_socket_path: Option, } impl QemuConfig { @@ -442,6 +473,24 @@ impl QemuConfig { Ok(read_fd) } + /// Add a bidirectional virtio-serial device backed by a Unix socket. + /// + /// QEMU creates a listening Unix socket at `socket_path`. The guest sees + /// the device as `/dev/virtio-ports/{name}` and can read/write it. + /// The host connects to the socket for bidirectional byte-stream + /// communication with the guest. + /// + /// Unlike [`add_virtio_serial_out`] (which uses a write-only `chardev file`), + /// this uses `chardev socket` with `server=on,wait=off` so QEMU doesn't + /// block waiting for a host-side connection at startup. + pub fn add_virtio_serial_bidir(&mut self, name: &str, socket_path: String) -> &mut Self { + self.virtio_serial_bidir_devices.push(VirtioSerialBidir { + name: name.to_owned(), + socket_path, + }); + self + } + /// Add SMBIOS credential for systemd credential passing. pub fn add_smbios_credential(&mut self, credential: String) -> &mut Self { self.smbios_credentials.push(credential); @@ -676,7 +725,24 @@ fn spawn( ]); } - // Configure network (only User mode supported now) + // Add bidirectional virtio-serial devices (chardev socket backend) + for (idx, bidir_device) in config.virtio_serial_bidir_devices.iter().enumerate() { + let char_id = format!("bidir_char{}", idx); + cmd.args([ + "-chardev", + &format!( + "socket,id={},path={},server=on,wait=off", + char_id, bidir_device.socket_path + ), + "-device", + &format!( + "virtserialport,chardev={},name={}", + char_id, bidir_device.name + ), + ]); + } + + // Configure network match &config.network_mode { NetworkMode::User { hostfwd } => { let mut netdev_parts = vec!["user".to_string(), "id=net0".to_string()]; @@ -694,6 +760,10 @@ fn spawn( "virtio-net-pci,netdev=net0", ]); } + NetworkMode::None => { + // No network device at all. Host-guest communication + // happens exclusively over virtio-serial. + } } // No GUI; serial console either to a log file or disabled. @@ -704,10 +774,19 @@ fn spawn( } cmd.args(["-nographic", "-display", "none"]); + // QMP socket for runtime control (hot-plug, etc.) -- always enabled. + let qmp_path = config.qmp_socket_path.as_deref().unwrap_or(QMP_SOCKET_PATH); + cmd.args([ + "-chardev", + &format!("socket,id=qmp0,path={qmp_path},server=on,wait=off"), + "-mon", + "chardev=qmp0,mode=control", + ]); + match &config.display_mode { DisplayMode::None => { - // Disable monitor in non-console mode - cmd.args(["-monitor", "none"]); + // QMP monitor is already configured above; no need for + // an additional human monitor. } DisplayMode::Console => { cmd.args(["-device", "virtconsole,chardev=console0"]); @@ -801,6 +880,8 @@ pub struct RunningQemu { pub virtiofsd_processes: Vec>>>>, #[allow(dead_code)] sd_notification: Option, + /// Path to the QMP socket (always available). + pub qmp_socket_path: String, } impl std::fmt::Debug for RunningQemu { @@ -968,6 +1049,11 @@ impl RunningQemu { }) .unwrap_or_default(); + let qmp_socket_path = config + .qmp_socket_path + .clone() + .unwrap_or_else(|| QMP_SOCKET_PATH.to_string()); + // Spawn QEMU process with additional VSOCK credential if needed let qemu_process = spawn(&config, &creds, vsockdata)?; @@ -975,6 +1061,7 @@ impl RunningQemu { qemu_process, virtiofsd_processes, sd_notification, + qmp_socket_path, }) } @@ -1029,6 +1116,25 @@ mod tests { assert_eq!(DiskFormat::Qcow2.as_str(), "qcow2"); } + #[test] + fn test_virtio_serial_bidir_device_creation() { + let mut config = QemuConfig::new_direct_boot( + 1024, + 1, + "/test/kernel".to_string(), + "/test/initramfs".to_string(), + "/test/socket".into(), + ); + config.add_virtio_serial_bidir("org.bcvk.ssh.0", "/run/bcvk-ssh-0.sock".to_string()); + + assert_eq!(config.virtio_serial_bidir_devices.len(), 1); + assert_eq!(config.virtio_serial_bidir_devices[0].name, "org.bcvk.ssh.0"); + assert_eq!( + config.virtio_serial_bidir_devices[0].socket_path, + "/run/bcvk-ssh-0.sock" + ); + } + #[test] fn test_fw_cfg_entry() { let mut config = QemuConfig::new_direct_boot( diff --git a/crates/bcvk-qemu/src/qmp.rs b/crates/bcvk-qemu/src/qmp.rs new file mode 100644 index 0000000..02b2013 --- /dev/null +++ b/crates/bcvk-qemu/src/qmp.rs @@ -0,0 +1,398 @@ +//! Async QMP (QEMU Machine Protocol) client. +//! +//! Provides the subset of QMP commands needed for hot-plugging virtio-serial +//! ports at runtime. This enables multiple concurrent SSH sessions to a VM +//! by dynamically adding and removing virtio-serial channels. +//! +//! The protocol is JSON-over-Unix-socket. After connecting, the client must +//! negotiate capabilities before issuing commands. +//! +//! Reference: + +use color_eyre::eyre::{eyre, Context}; +use color_eyre::Result; +use serde_json::{json, Value}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; +use tracing::debug; + +/// A connected QMP client. +#[derive(Debug)] +pub struct QmpClient { + reader: BufReader, + writer: tokio::net::unix::OwnedWriteHalf, +} + +impl QmpClient { + /// Connect to a QMP socket and negotiate capabilities. + /// + /// After connecting, reads the QMP greeting and sends + /// `qmp_capabilities` to enter command mode. + pub async fn connect(socket_path: &str) -> Result { + let stream = UnixStream::connect(socket_path) + .await + .with_context(|| format!("connecting to QMP socket at {socket_path}"))?; + + let (read_half, write_half) = stream.into_split(); + let mut client = Self { + reader: BufReader::new(read_half), + writer: write_half, + }; + + // Read and discard the QMP greeting + let greeting = client.read_line().await?; + debug!("QMP greeting: {}", greeting.trim()); + + // Negotiate capabilities + client.execute("qmp_capabilities", json!({})).await?; + debug!("QMP capabilities negotiated"); + + Ok(client) + } + + /// Hot-plug a virtio-serial port backed by a Unix socket chardev. + /// + /// Creates a `chardev-socket` listening at `socket_path` and attaches + /// a `virtserialport` device named `port_name` to the VM's existing + /// virtio-serial controller. The guest sees the port as + /// `/dev/virtio-ports/{port_name}`. + /// + /// `device_id` is used as both the chardev ID and device ID prefix + /// (chardev: `{device_id}_char`, device: `{device_id}_dev`). + pub async fn hotplug_virtio_serial( + &mut self, + device_id: &str, + port_name: &str, + socket_path: &str, + ) -> Result<()> { + let char_id = format!("{device_id}_char"); + let dev_id = format!("{device_id}_dev"); + + // Step 1: Add the chardev (Unix socket, server mode, non-blocking) + self.execute( + "chardev-add", + json!({ + "id": char_id, + "backend": { + "type": "socket", + "data": { + "addr": { + "type": "unix", + "data": { "path": socket_path } + }, + "server": true, + "wait": false + } + } + }), + ) + .await + .with_context(|| format!("adding chardev {char_id} at {socket_path}"))?; + + // Step 2: Add the virtserialport device + self.execute( + "device_add", + json!({ + "driver": "virtserialport", + "chardev": char_id, + "name": port_name, + "id": dev_id, + }), + ) + .await + .with_context(|| format!("adding virtserialport {dev_id}"))?; + + debug!( + "Hot-plugged virtio-serial port '{}' (chardev at {})", + port_name, socket_path + ); + Ok(()) + } + + /// Hot-unplug a virtio-serial port previously added with + /// [`hotplug_virtio_serial`](Self::hotplug_virtio_serial). + /// + /// Sends `device_del` for the port device, waits for the + /// `DEVICE_DELETED` event (up to 5 seconds), then removes the chardev. + pub async fn hot_unplug_virtio_serial(&mut self, device_id: &str) -> Result<()> { + let char_id = format!("{device_id}_char"); + let dev_id = format!("{device_id}_dev"); + + // Step 1: Request device removal + self.execute("device_del", json!({ "id": dev_id })) + .await + .with_context(|| format!("removing device {dev_id}"))?; + + // Step 2: Wait for DEVICE_DELETED event (the guest must cooperate) + self.wait_for_event("DEVICE_DELETED") + .await + .with_context(|| format!("waiting for {dev_id} to be removed"))?; + + // Step 3: Remove the chardev + self.execute("chardev-remove", json!({ "id": char_id })) + .await + .with_context(|| format!("removing chardev {char_id}"))?; + + debug!("Hot-unplugged virtio-serial device '{}'", device_id); + Ok(()) + } + + /// Execute a QMP command and return the result. + pub async fn execute(&mut self, command: &str, arguments: Value) -> Result { + let request = if arguments.is_null() + || (arguments.is_object() && arguments.as_object().map_or(true, |m| m.is_empty())) + { + json!({ "execute": command }) + } else { + json!({ "execute": command, "arguments": arguments }) + }; + + let mut request_str = serde_json::to_string(&request)?; + request_str.push('\n'); + self.writer + .write_all(request_str.as_bytes()) + .await + .with_context(|| format!("writing QMP command {command}"))?; + + // Read response, skipping any async events + loop { + let line = self.read_line().await?; + let response: Value = serde_json::from_str(&line) + .with_context(|| format!("parsing QMP response: {line}"))?; + + if response.get("return").is_some() { + return Ok(response["return"].clone()); + } + if let Some(error) = response.get("error") { + return Err(eyre!("QMP command '{}' failed: {}", command, error)); + } + // Otherwise it's an async event -- skip and read next line + if let Some(event) = response.get("event") { + debug!("QMP event (while waiting for response): {}", event); + } + } + } + + /// Wait for a specific QMP event, discarding other events. + async fn wait_for_event(&mut self, event_name: &str) -> Result { + loop { + let line = self + .read_line() + .await + .with_context(|| format!("reading while waiting for {event_name} event"))?; + let msg: Value = serde_json::from_str(&line) + .with_context(|| format!("parsing QMP message: {line}"))?; + + if let Some(event) = msg.get("event").and_then(|v| v.as_str()) { + if event == event_name { + return Ok(msg); + } + debug!("QMP event (not {event_name}): {event}"); + } + } + } + + /// Read a single line from the QMP socket. + async fn read_line(&mut self) -> Result { + let mut line = String::new(); + let n = self + .reader + .read_line(&mut line) + .await + .with_context(|| "reading from QMP socket")?; + if n == 0 { + return Err(eyre!("QMP socket closed unexpectedly")); + } + Ok(line) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + use tokio::net::UnixListener; + + /// A minimal mock QMP server for testing. + /// + /// Accepts one connection, sends a greeting, and responds to each + /// command with `{"return": {}}`. Async events can be injected by + /// the test before a command response. + struct MockQmpServer { + listener: UnixListener, + socket_path: std::path::PathBuf, + } + + impl MockQmpServer { + /// Create a mock server on a temporary Unix socket. + fn new() -> Self { + let dir = tempfile::tempdir().expect("tempdir"); + let socket_path = dir.path().join("qmp.sock"); + let listener = UnixListener::bind(&socket_path).expect("bind mock QMP socket"); + // Leak the tempdir so the socket stays alive for the test duration + std::mem::forget(dir); + Self { + listener, + socket_path, + } + } + + /// Run the mock server, handling one client connection. + /// + /// `handler` receives each command as a `Value` and returns a + /// response `Value` (or multiple values to inject events before + /// the response). + async fn run(self, handler: F) + where + F: Fn(Value) -> Vec, + { + let (stream, _) = self.listener.accept().await.expect("accept"); + let (read_half, mut write_half) = stream.into_split(); + let mut reader = BufReader::new(read_half); + + // Send QMP greeting + let greeting = json!({ + "QMP": { + "version": {"qemu": {"micro": 0, "minor": 0, "major": 9}}, + "capabilities": ["oob"] + } + }); + write_half + .write_all(format!("{}\n", greeting).as_bytes()) + .await + .unwrap(); + + // Process commands + let mut line = String::new(); + while reader.read_line(&mut line).await.unwrap() > 0 { + let cmd: Value = serde_json::from_str(line.trim()).expect("parse command"); + let responses = handler(cmd); + for resp in responses { + write_half + .write_all(format!("{}\n", resp).as_bytes()) + .await + .unwrap(); + } + line.clear(); + } + } + } + + #[tokio::test] + async fn test_connect_and_negotiate() { + let server = MockQmpServer::new(); + let path = server.socket_path.to_str().unwrap().to_string(); + + let server_handle = tokio::spawn(server.run(|_cmd| vec![json!({"return": {}})])); + + let client = QmpClient::connect(&path).await; + assert!(client.is_ok(), "connect failed: {:?}", client.err()); + drop(client); + let _ = server_handle.await; + } + + #[tokio::test] + async fn test_execute_returns_result() { + let server = MockQmpServer::new(); + let path = server.socket_path.to_str().unwrap().to_string(); + + let server_handle = tokio::spawn(server.run(|cmd| { + let command = cmd["execute"].as_str().unwrap_or(""); + match command { + "qmp_capabilities" => vec![json!({"return": {}})], + "query-version" => { + vec![json!({"return": {"qemu": {"major": 9}}})] + } + _ => vec![json!({"return": {}})], + } + })); + + let mut client = QmpClient::connect(&path).await.unwrap(); + let result = client.execute("query-version", json!({})).await.unwrap(); + assert_eq!(result["qemu"]["major"], 9); + + drop(client); + let _ = server_handle.await; + } + + #[tokio::test] + async fn test_execute_handles_error() { + let server = MockQmpServer::new(); + let path = server.socket_path.to_str().unwrap().to_string(); + + let server_handle = tokio::spawn(server.run(|cmd| { + let command = cmd["execute"].as_str().unwrap_or(""); + match command { + "qmp_capabilities" => vec![json!({"return": {}})], + _ => vec![json!({ + "error": {"class": "GenericError", "desc": "device not found"} + })], + } + })); + + let mut client = QmpClient::connect(&path).await.unwrap(); + let result = client.execute("device_del", json!({"id": "nope"})).await; + assert!(result.is_err()); + assert!(format!("{:?}", result.unwrap_err()).contains("device not found"),); + + drop(client); + let _ = server_handle.await; + } + + #[tokio::test] + async fn test_execute_skips_async_events() { + let server = MockQmpServer::new(); + let path = server.socket_path.to_str().unwrap().to_string(); + + let server_handle = tokio::spawn(server.run(|cmd| { + let command = cmd["execute"].as_str().unwrap_or(""); + match command { + "qmp_capabilities" => vec![json!({"return": {}})], + "device_del" => { + // Inject an async event before the response + vec![ + json!({"event": "SOME_OTHER_EVENT", "timestamp": {}}), + json!({"return": {}}), + ] + } + _ => vec![json!({"return": {}})], + } + })); + + let mut client = QmpClient::connect(&path).await.unwrap(); + // Should succeed despite the injected event + let result = client.execute("device_del", json!({"id": "dev0"})).await; + assert!(result.is_ok()); + + drop(client); + let _ = server_handle.await; + } + + #[tokio::test] + async fn test_hotplug_sends_chardev_and_device() { + let server = MockQmpServer::new(); + let path = server.socket_path.to_str().unwrap().to_string(); + + let commands_seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let commands_clone = commands_seen.clone(); + + let server_handle = tokio::spawn(server.run(move |cmd| { + let command = cmd["execute"].as_str().unwrap_or("").to_string(); + commands_clone.lock().unwrap().push(command); + vec![json!({"return": {}})] + })); + + let mut client = QmpClient::connect(&path).await.unwrap(); + client + .hotplug_virtio_serial("test0", "org.bcvk.ssh.0", "/tmp/test.sock") + .await + .unwrap(); + + drop(client); + let _ = server_handle.await; + + let seen = commands_seen.lock().unwrap(); + assert_eq!(*seen, vec!["qmp_capabilities", "chardev-add", "device_add"]); + } +}