diff --git a/crates/ark/src/comm_handler.rs b/crates/ark/src/comm_handler.rs index 0a5654246..564276df2 100644 --- a/crates/ark/src/comm_handler.rs +++ b/crates/ark/src/comm_handler.rs @@ -18,9 +18,12 @@ use serde::de::DeserializeOwned; use serde::Serialize; use stdext::result::ResultExt; +use crate::console::Console; + /// Context provided to `CommHandler` methods, giving access to the outgoing -/// channel and close-request mechanism. In the future, we'll provide access to -/// more of the Console state, such as the currently active environment. +/// channel, close-request mechanism, and the Console singleton (via +/// `console()`). Through the Console, handlers can reach runtime state such +/// as the graphics device context. #[derive(Debug)] pub struct CommHandlerContext { pub outgoing_tx: CommOutgoingTx, @@ -47,6 +50,20 @@ impl CommHandlerContext { pub fn is_closed(&self) -> bool { self.closed.get() } + + /// Send a serializable event as `CommMsg::Data` on the outgoing channel. + /// Serialization or send errors are logged and ignored. + pub fn send_event(&self, event: &T) { + let Some(json) = serde_json::to_value(event).log_err() else { + return; + }; + self.outgoing_tx.send(CommMsg::Data(json)).log_err(); + } + + /// Access the Console singleton (the R runtime). + pub(crate) fn console(&self) -> &Console { + Console::get() + } } /// Trait for comm handlers that run synchronously on the R thread. diff --git a/crates/ark/src/console.rs b/crates/ark/src/console.rs index 05efb80f1..a92ff9215 100644 --- a/crates/ark/src/console.rs +++ b/crates/ark/src/console.rs @@ -128,7 +128,6 @@ pub(crate) use console_repl::ConsoleNotification; pub(crate) use console_repl::ConsoleOutputCapture; pub(crate) use console_repl::KernelInfo; use console_repl::PendingInputs; -use console_repl::PromptInfo; use console_repl::ReadConsolePendingAction; pub use console_repl::SessionMode; @@ -148,7 +147,7 @@ use crate::lsp::state_handlers::ConsoleInputs; use crate::modules; use crate::modules::ARK_ENVS; use crate::plots::graphics_device; -use crate::plots::graphics_device::GraphicsDeviceNotification; +use crate::plots::graphics_device::DeviceContext; use crate::r_task; use crate::r_task::BoxFuture; use crate::r_task::RTask; @@ -166,8 +165,6 @@ use crate::srcref::ns_populate_srcref; use crate::srcref::resource_loaded_namespaces; use crate::startup; use crate::sys::console::console_to_utf8; -use crate::ui::UiCommMessage; -use crate::ui::UiCommSender; use crate::url::UrlId; thread_local! { @@ -224,9 +221,9 @@ pub(crate) struct Console { tasks_idle_any_rx: Receiver, pending_futures: HashMap, RTaskStartInfo, Option)>, - /// Channel to communicate requests and events to the frontend - /// by forwarding them through the UI comm. Optional, and really Positron specific. - ui_comm_tx: Option, + /// Comm ID of the currently connected UI comm, if any. + /// The handler lives in `self.comms`; this is just an index into it. + ui_comm_id: Option, /// Error captured by our global condition handler during the last iteration /// of the REPL. @@ -337,4 +334,7 @@ pub(crate) struct Console { /// Comm handlers registered on the R thread (keyed by comm ID). comms: HashMap, + + /// Graphics device state (plot recording, rendering, comm management). + device_context: DeviceContext, } diff --git a/crates/ark/src/console/console_comm.rs b/crates/ark/src/console/console_comm.rs index f10fbf754..ef8d1d6fb 100644 --- a/crates/ark/src/console/console_comm.rs +++ b/crates/ark/src/console/console_comm.rs @@ -3,11 +3,11 @@ // // Copyright (C) 2026 Posit Software, PBC. All rights reserved. // -// use amalthea::comm::comm_channel::CommMsg; use amalthea::comm::event::CommEvent; use amalthea::socket::comm::CommInitiator; +use amalthea::socket::comm::CommOutgoingTx; use amalthea::socket::comm::CommSocket; use stdext::result::ResultExt; use uuid::Uuid; @@ -17,6 +17,7 @@ use crate::comm_handler::CommHandlerContext; use crate::comm_handler::ConsoleComm; use crate::comm_handler::EnvironmentChanged; use crate::console::Console; +use crate::ui::UI_COMM_NAME; impl Console { pub(super) fn comm_handle_msg(&mut self, comm_id: &str, msg: CommMsg) { @@ -29,6 +30,10 @@ impl Console { } pub(super) fn comm_handle_close(&mut self, comm_id: &str) { + if self.ui_comm_id.as_deref() == Some(comm_id) { + self.ui_comm_id = None; + } + let Some(mut reg) = self.comms.remove(comm_id) else { log::warn!("Received close for unknown registered comm {comm_id}"); return; @@ -40,7 +45,7 @@ impl Console { /// /// Creates the `CommSocket` and `CommHandlerContext`, calls `handle_open`, /// sends `CommEvent::Opened` to amalthea, and returns the comm ID. - pub(crate) fn comm_register( + pub(crate) fn comm_open_backend( &mut self, comm_name: &str, mut handler: Box, @@ -67,6 +72,35 @@ impl Console { Ok(comm_id) } + /// Register a frontend-initiated comm on the R thread. + /// + /// Unlike `comm_open_backend` (which is for backend-initiated comms and + /// sends `CommEvent::Opened`), this is called when the frontend opened the + /// comm. The `CommSocket` already exists in amalthea's open_comms list, so + /// we only need to register the handler and call `handle_open`. + pub(super) fn comm_open_frontend( + &mut self, + comm_id: String, + comm_name: &str, + outgoing_tx: CommOutgoingTx, + mut handler: Box, + ) { + let ctx = CommHandlerContext::new(outgoing_tx, self.comm_event_tx.clone()); + handler.handle_open(&ctx); + + if comm_name == UI_COMM_NAME { + if let Some(old_id) = self.ui_comm_id.take() { + log::info!("Replacing an existing UI comm."); + if let Some(mut old) = self.comms.remove(&old_id) { + old.handler.handle_close(&old.ctx); + } + } + self.ui_comm_id = Some(comm_id.clone()); + } + + self.comms.insert(comm_id, ConsoleComm { handler, ctx }); + } + pub(super) fn comm_notify_environment_changed(&mut self, event: EnvironmentChanged) { for (_, reg) in self.comms.iter_mut() { reg.handler.handle_environment(event, ®.ctx); @@ -84,6 +118,11 @@ impl Console { .collect(); for comm_id in closed_ids { + // We're not expecting the UI comm to close itself but we handle the + // case explicitly to be defensive + if self.ui_comm_id.as_deref() == Some(comm_id.as_str()) { + self.ui_comm_id = None; + } if let Some(reg) = self.comms.remove(&comm_id) { self.comm_notify_closed(&comm_id, ®); } diff --git a/crates/ark/src/console/console_integration.rs b/crates/ark/src/console/console_integration.rs index 274357083..52d61e463 100644 --- a/crates/ark/src/console/console_integration.rs +++ b/crates/ark/src/console/console_integration.rs @@ -11,129 +11,22 @@ use super::*; /// UI comm integration. impl Console { - pub(super) fn handle_establish_ui_comm_channel( - &mut self, - ui_comm_tx: Sender, - info: &PromptInfo, - ) { - if self.ui_comm_tx.is_some() { - log::info!("Replacing an existing UI comm channel."); - } - - // Create and store the sender channel - self.ui_comm_tx = Some(UiCommSender::new(ui_comm_tx)); - - // Go ahead and do an initial refresh - self.with_mut_ui_comm_tx(|ui_comm_tx| { - let input_prompt = info.input_prompt.clone(); - let continuation_prompt = info.continuation_prompt.clone(); - - ui_comm_tx.send_refresh(input_prompt, continuation_prompt); - }); - } - pub(crate) fn session_mode(&self) -> SessionMode { self.session_mode } - pub(crate) fn get_ui_comm_tx(&self) -> Option<&UiCommSender> { - self.ui_comm_tx.as_ref() - } - - fn get_mut_ui_comm_tx(&mut self) -> Option<&mut UiCommSender> { - self.ui_comm_tx.as_mut() - } - - pub(super) fn with_ui_comm_tx(&self, f: F) - where - F: FnOnce(&UiCommSender), - { - match self.get_ui_comm_tx() { - Some(ui_comm_tx) => f(ui_comm_tx), - None => { - // Trace level logging, its typically not a bug if the frontend - // isn't connected. Happens in all Jupyter use cases. - log::trace!("UI comm isn't connected, dropping `f`."); - }, - } + pub(crate) fn ui_comm(&self) -> Option> { + let comm = self.comms.get(self.ui_comm_id.as_deref()?)?; + Some(UiCommRef { + comm, + originator: self.active_request.as_ref().map(|r| &r.originator), + stdin_request_tx: &self.stdin_request_tx, + }) } - pub(super) fn with_mut_ui_comm_tx(&mut self, mut f: F) - where - F: FnMut(&mut UiCommSender), - { - match self.get_mut_ui_comm_tx() { - Some(ui_comm_tx) => f(ui_comm_tx), - None => { - // Trace level logging, its typically not a bug if the frontend - // isn't connected. Happens in all Jupyter use cases. - log::trace!("UI comm isn't connected, dropping `f`."); - }, - } - } - - pub(crate) fn is_ui_comm_connected(&self) -> bool { - self.get_ui_comm_tx().is_some() - } - - pub(crate) fn call_frontend_method( - &self, - request: UiFrontendRequest, - ) -> anyhow::Result { - log::trace!("Calling frontend method {request:?}"); - - let ui_comm_tx = self.get_ui_comm_tx().ok_or_else(|| { - anyhow::anyhow!("UI comm is not connected. Can't execute request {request:?}") - })?; - - let (reply_tx, reply_rx) = bounded(1); - - let Some(req) = &self.active_request else { - return Err(anyhow::anyhow!( - "No active request. Can't execute request {request:?}" - )); - }; - - // Forward request to UI comm - ui_comm_tx.send_request(UiCommFrontendRequest { - originator: req.originator.clone(), - reply_tx, - request: request.clone(), - }); - - // Block for reply - let reply = reply_rx.recv().unwrap(); - - log::trace!("Got reply from frontend method: {reply:?}"); - - match reply { - StdInRpcReply::Reply(reply) => match reply { - JsonRpcReply::Result(reply) => { - // Deserialize to Rust first to verify the OpenRPC contract. - // Errors are propagated to R. - if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) { - return Err(anyhow::anyhow!( - "Can't deserialize RPC reply for {request:?}:\n{err:?}" - )); - } - - // Now deserialize to an R object - Ok(RObject::try_from(reply.result)?) - }, - JsonRpcReply::Error(reply) => { - let message = reply.error.message; - - return Err(anyhow::anyhow!( - "While calling frontend method:\n\ - {message}", - )); - }, - }, - // If an interrupt was signalled, return `NULL`. This should not be - // visible to the caller since `r_unwrap()` (called e.g. by - // `harp::register`) will trigger an interrupt jump right away. - StdInRpcReply::Interrupt => Ok(RObject::null()), - } + pub(crate) fn try_ui_comm(&self) -> anyhow::Result> { + self.ui_comm() + .ok_or_else(|| anyhow!("UI comm is not connected")) } } @@ -270,3 +163,83 @@ impl Console { self.lsp_virtual_documents.get(uri).cloned() } } + +/// Reference to the UI comm. Returned by `Console::ui_comm()`. +/// +/// Existence of this value guarantees the comm is connected. +pub(crate) struct UiCommRef<'a> { + comm: &'a ConsoleComm, + originator: Option<&'a Originator>, + stdin_request_tx: &'a Sender, +} + +impl UiCommRef<'_> { + pub(crate) fn send_event(&self, event: &UiFrontendEvent) { + self.comm.ctx.send_event(event); + } + + pub(crate) fn busy(&self, busy: bool) { + self.send_event(&UiFrontendEvent::Busy(BusyParams { busy })); + } + + pub(crate) fn show_message(&self, message: String) { + self.send_event(&UiFrontendEvent::ShowMessage(ShowMessageParams { message })); + } + + pub(crate) fn call_frontend_method( + &self, + request: UiFrontendRequest, + ) -> anyhow::Result { + log::trace!("Calling frontend method {request:?}"); + + let (reply_tx, reply_rx) = bounded(1); + + let Some(originator) = self.originator else { + return Err(anyhow!( + "No active request. Can't execute request {request:?}" + )); + }; + + // Forward request directly to the stdin channel + let comm_msg = StdInRequest::Comm(UiCommFrontendRequest { + originator: originator.clone(), + reply_tx, + request: request.clone(), + }); + self.stdin_request_tx.send(comm_msg)?; + + // Block for reply + let reply = reply_rx.recv()?; + + log::trace!("Got reply from frontend method: {reply:?}"); + + match reply { + StdInRpcReply::Reply(reply) => match reply { + JsonRpcReply::Result(reply) => { + // Deserialize to Rust first to verify the OpenRPC contract. + // Errors are propagated to R. + if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) { + return Err(anyhow!( + "Can't deserialize RPC reply for {request:?}:\n{err:?}" + )); + } + + // Now deserialize to an R object + Ok(RObject::try_from(reply.result)?) + }, + JsonRpcReply::Error(reply) => { + let message = reply.error.message; + + return Err(anyhow!( + "While calling frontend method:\n\ + {message}", + )); + }, + }, + // If an interrupt was signalled, return `NULL`. This should not be + // visible to the caller since `r_unwrap()` (called e.g. by + // `harp::register`) will trigger an interrupt jump right away. + StdInRpcReply::Interrupt => Ok(RObject::null()), + } + } +} diff --git a/crates/ark/src/console/console_repl.rs b/crates/ark/src/console/console_repl.rs index 167da981d..f500c7d7f 100644 --- a/crates/ark/src/console/console_repl.rs +++ b/crates/ark/src/console/console_repl.rs @@ -212,15 +212,6 @@ pub(super) struct PromptInfo { /// case of a browser prompt or a readline prompt. pub(super) input_prompt: String, - /// The continuation prompt string when user supplies incomplete - /// inputs. This always corresponds to `getOption("continue")`. We send - /// it to frontends along with `prompt` because some frontends such as - /// Positron do not send incomplete inputs to Ark and take charge of - /// continuation prompts themselves. For frontends that can send - /// incomplete inputs to Ark, like Jupyter Notebooks, we immediately - /// error on them rather than requesting that this be shown. - pub(super) continuation_prompt: String, - /// The kind of prompt we're handling. pub(super) kind: PromptKind, } @@ -335,7 +326,6 @@ impl Console { dap: Arc>, session_mode: SessionMode, default_repos: DefaultRepos, - graphics_device_rx: AsyncUnboundedReceiver, console_notification_rx: AsyncUnboundedReceiver, ) { // Set the main thread ID. @@ -490,17 +480,16 @@ impl Console { } }); - // Initialize the GD context on this thread. + // Perform R-side graphics device initialization (register as + // interactive, spawn notification listener). The `DeviceContext` + // itself is already created as part of `Console::new()`. + // // Note that we do it after init is complete to avoid deadlocking // integration tests by spawning an async task. The deadlock is caused // by https://github.com/posit-dev/ark/blob/bd827e735970ca17102aeddfbe2c3ccf26950a36/crates/ark/src/r_task.rs#L261. // We should be able to remove this escape hatch in `r_task()` by // instantiating an `Console` in unit tests as well. - graphics_device::init_graphics_device( - console.comm_event_tx.clone(), - console.iopub_tx().clone(), - graphics_device_rx, - ); + graphics_device::init_graphics_device(); // Now that R has started and libr and ark have fully initialized, run site and user // level R profiles, in that order @@ -598,6 +587,8 @@ impl Console { dap: Arc>, session_mode: SessionMode, ) -> Self { + let device_context = DeviceContext::new(iopub_tx.clone()); + Self { r_request_rx, comm_event_tx, @@ -608,7 +599,7 @@ impl Console { active_request: None, execution_count: 0, autoprint_output: String::new(), - ui_comm_tx: None, + ui_comm_id: None, last_error: None, help_event_tx: None, help_port: None, @@ -643,6 +634,7 @@ impl Console { read_console_shutdown: Cell::new(false), debug_filter: ConsoleFilter::new(), comms: HashMap::new(), + device_context, } } @@ -694,6 +686,10 @@ impl Console { &self.comm_event_tx } + pub(crate) fn device_context(&self) -> &DeviceContext { + &self.device_context + } + /// Run a closure while capturing console output. /// Returns the closure's result paired with any captured output. pub(crate) fn with_capture(f: impl FnOnce() -> T) -> (T, String) { @@ -881,7 +877,7 @@ impl Console { self.pending_inputs = None; // Reply to active request with error, then fall through to event loop - self.handle_active_request(&info, ConsoleValue::Error(exception)); + self.handle_active_request(ConsoleValue::Error(exception)); } else if matches!(info.kind, PromptKind::InputRequest) { // Request input from the frontend and return it to R return self.handle_input_request(&info, buf, buflen); @@ -892,7 +888,7 @@ impl Console { // Otherwise reply to active request with accumulated result, then // fall through to event loop let result = self.take_result(); - self.handle_active_request(&info, ConsoleValue::Success(result)); + self.handle_active_request(ConsoleValue::Success(result)); } // In the future we'll also send browser information, see @@ -1022,7 +1018,7 @@ impl Console { // We've got a kernel request i if i == kernel_request_index => { let req = oper.recv(&kernel_request_rx).unwrap(); - self.handle_kernel_request(req, &info); + self.handle_kernel_request(req); }, // An interrupt task woke us up @@ -1064,10 +1060,6 @@ impl Console { let prompt_slice = unsafe { CStr::from_ptr(prompt_c) }; let prompt = prompt_slice.to_string_lossy().into_owned(); - // Sent to the frontend after each top-level command so users can - // customise their prompts - let continuation_prompt: String = harp::get_option("continue").try_into().unwrap(); - // Detect browser prompt by matching the prompt string // https://github.com/posit-dev/positron/issues/4742. // There are ways to break this detection, for instance setting @@ -1089,7 +1081,6 @@ impl Console { return PromptInfo { input_prompt: prompt, - continuation_prompt, kind, }; } @@ -1197,7 +1188,7 @@ impl Console { Some(exception) } - fn handle_active_request(&mut self, info: &PromptInfo, value: ConsoleValue) { + fn handle_active_request(&mut self, value: ConsoleValue) { self.reset_global_env_rdebug(); // If we get here we finished evaluating all pending inputs. Check if we @@ -1209,17 +1200,6 @@ impl Console { return; }; - // Perform a refresh of the frontend state (Prompts, working - // directory, etc) - // TODO: Once the UI comm is migrated to the `CommHandler` path, this - // becomes a `handle_environment` impl reacting to `Execution`. - self.with_mut_ui_comm_tx(|ui_comm_tx| { - let input_prompt = info.input_prompt.clone(); - let continuation_prompt = info.continuation_prompt.clone(); - - ui_comm_tx.send_refresh(input_prompt, continuation_prompt); - }); - // Check for pending graphics updates // (Important that this occurs while in the "busy" state of this ExecuteRequest // so that the `parent` message is set correctly in any Jupyter messages) @@ -1348,7 +1328,7 @@ impl Console { } else { // Otherwise we got an empty input, e.g. `""` and there's // nothing to do. Close active request. - self.handle_active_request(info, ConsoleValue::Success(Default::default())); + self.handle_active_request(ConsoleValue::Success(Default::default())); // And return to event loop None @@ -1835,12 +1815,19 @@ impl Console { } } - fn handle_kernel_request(&mut self, req: KernelRequest, info: &PromptInfo) { + fn handle_kernel_request(&mut self, req: KernelRequest) { log::trace!("Received kernel request {req:?}"); match req { - KernelRequest::EstablishUiCommChannel(ref ui_comm_tx) => { - self.handle_establish_ui_comm_channel(ui_comm_tx.clone(), info) + KernelRequest::CommOpen { + comm_id, + comm_name, + outgoing_tx, + handler, + done_tx, + } => { + self.comm_open_frontend(comm_id, &comm_name, outgoing_tx, handler); + done_tx.send(()).log_err(); }, KernelRequest::CommMsg { comm_id, @@ -2182,9 +2169,9 @@ impl Console { let busy = which != 0; // Send updated state to the frontend over the UI comm - self.with_ui_comm_tx(|ui_comm_tx| { - ui_comm_tx.send_event(UiFrontendEvent::Busy(BusyParams { busy })); - }); + if let Some(ui) = self.ui_comm() { + ui.busy(busy); + } } /// Invoked by R to show a message to the user. @@ -2192,10 +2179,12 @@ impl Console { let message = unsafe { CStr::from_ptr(buf) }; let message = message.to_str().unwrap().to_string(); - // Deliver message to the frontend over the UI comm - self.with_ui_comm_tx(|ui_comm_tx| { - ui_comm_tx.send_event(UiFrontendEvent::ShowMessage(ShowMessageParams { message })) - }); + if let Some(ui) = self.ui_comm() { + ui.show_message(message); + } else { + // Should we emit the message in the Console? + log::info!("`show_message`: {message}") + } } /// Invoked by the R event loop @@ -2251,13 +2240,6 @@ impl Console { // might end up being executed on the LSP thread. // https://github.com/rstudio/positron/issues/431 unsafe { R_RunPendingFinalizers() }; - - // Check for Positron render requests. - // - // TODO: This should move to a spawned task that'd be woken up by - // incoming messages on plot comms. This way we'll prevent the delays - // introduced by timeout-based event polling. - graphics_device::on_process_idle_events(); } pub(super) fn eval_env(&self) -> RObject { diff --git a/crates/ark/src/data_explorer/r_data_explorer.rs b/crates/ark/src/data_explorer/r_data_explorer.rs index 559261e22..b7570389d 100644 --- a/crates/ark/src/data_explorer/r_data_explorer.rs +++ b/crates/ark/src/data_explorer/r_data_explorer.rs @@ -1213,7 +1213,7 @@ pub unsafe extern "C-unwind" fn ps_view_data_frame( }; let explorer = RDataExplorer::new(title, x, env_info)?; - Console::get_mut().comm_register(DATA_EXPLORER_COMM_NAME, Box::new(explorer))?; + Console::get_mut().comm_open_backend(DATA_EXPLORER_COMM_NAME, Box::new(explorer))?; Ok(R_NilValue) } diff --git a/crates/ark/src/lsp/backend.rs b/crates/ark/src/lsp/backend.rs index f7b745bc3..2c26a70cd 100644 --- a/crates/ark/src/lsp/backend.rs +++ b/crates/ark/src/lsp/backend.rs @@ -12,8 +12,6 @@ use std::sync::Arc; use amalthea::comm::server_comm::ServerStartMessage; use amalthea::comm::server_comm::ServerStartedMessage; -use amalthea::comm::ui_comm::ShowMessageParams as UiShowMessageParams; -use amalthea::comm::ui_comm::UiFrontendEvent; use anyhow::Context; use crossbeam::channel::Sender; use serde_json::Value; @@ -107,13 +105,11 @@ fn report_crash() { "with full logs (see https://positron.posit.co/troubleshooting.html#python-and-r-logs)." ); + // NOTE: This is a legit use of interrupt-time task. No R access here, and + // we need to go through Console since it owns the UI comm. r_task(|| { - let event = UiFrontendEvent::ShowMessage(UiShowMessageParams { - message: String::from(user_message), - }); - - if let Some(ui_comm_tx) = Console::get().get_ui_comm_tx() { - ui_comm_tx.send_event(event); + if let Some(ui) = Console::get().ui_comm() { + ui.show_message(String::from(user_message)); } }); } diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 96b99512a..85fc1bdd9 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -15,7 +15,6 @@ use std::io::BufReader; use std::io::Read; use amalthea::comm::comm_channel::CommMsg; -use amalthea::comm::event::CommEvent; use amalthea::comm::plot_comm::PlotBackendReply; use amalthea::comm::plot_comm::PlotBackendRequest; use amalthea::comm::plot_comm::PlotFrontendEvent; @@ -27,8 +26,7 @@ use amalthea::comm::plot_comm::PlotRenderSettings; use amalthea::comm::plot_comm::PlotResult; use amalthea::comm::plot_comm::PlotSize; use amalthea::comm::plot_comm::UpdateParams; -use amalthea::socket::comm::CommInitiator; -use amalthea::socket::comm::CommSocket; +use amalthea::socket::comm::CommOutgoingTx; use amalthea::socket::iopub::IOPubMessage; use amalthea::wire::display_data::DisplayData; use amalthea::wire::execute_request::CodeLocation; @@ -38,7 +36,6 @@ use anyhow::anyhow; use anyhow::Context; use base64::engine::general_purpose; use base64::Engine; -use crossbeam::channel::Select; use crossbeam::channel::Sender; use harp::exec::RFunction; use harp::exec::RFunctionExt; @@ -50,65 +47,24 @@ use libr::SEXP; use serde_json::json; use stdext::result::ResultExt; use stdext::unwrap; -use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver; use uuid::Uuid; +use crate::comm_handler::handle_rpc_request; +use crate::comm_handler::CommHandler; +use crate::comm_handler::CommHandlerContext; use crate::console::Console; use crate::console::SessionMode; use crate::modules::ARK_ENVS; -use crate::r_task; -#[derive(Debug)] -pub(crate) enum GraphicsDeviceNotification { - DidChangePlotRenderSettings(PlotRenderSettings), -} - -thread_local! { - // Safety: Set once by `Console` on initialization - static DEVICE_CONTEXT: RefCell = panic!("Must access `DEVICE_CONTEXT` from the R thread"); -} - -const POSITRON_PLOT_CHANNEL_ID: &str = "positron.plot"; - -// Expose thread initialization via function so we can keep the structs private. -// Must be called from the main R thread. -pub(crate) fn init_graphics_device( - comm_event_tx: Sender, - iopub_tx: Sender, - graphics_device_rx: AsyncUnboundedReceiver, -) { - DEVICE_CONTEXT.set(DeviceContext::new(comm_event_tx, iopub_tx)); +pub const PLOT_COMM_NAME: &str = "positron.plot"; +/// Perform R-side initialization of the graphics device. +/// Must be called from the main R thread after Console is initialized. +pub(crate) fn init_graphics_device() { // Declare our graphics device as interactive if let Err(err) = RFunction::from(".ps.graphics.register_as_interactive").call() { log::error!("Failed to register Ark graphics device as interactive: {err:?}"); }; - - // Launch an R thread task to process messages from the frontend - r_task::spawn_interrupt(async move || process_notifications(graphics_device_rx).await); -} - -async fn process_notifications( - mut graphics_device_rx: AsyncUnboundedReceiver, -) { - log::trace!("Now listening for graphics device notifications"); - - loop { - while let Some(notification) = graphics_device_rx.recv().await { - log::trace!("Got graphics device notification: {notification:#?}"); - - match notification { - GraphicsDeviceNotification::DidChangePlotRenderSettings(plot_render_settings) => { - // Safety: Note that `DEVICE_CONTEXT` is accessed at - // interrupt time. Other methods in this file should be - // written in accordance and avoid causing R interrupt - // checks while they themselves access the device. - DEVICE_CONTEXT - .with_borrow(|ctx| ctx.prerender_settings.replace(plot_render_settings)); - }, - } - } - } } /// Wrapped callbacks of the original graphics device we shadow @@ -133,10 +89,7 @@ struct ExecutionContext { code_location: Option, } -struct DeviceContext { - /// Channel for sending [CommEvent]s to Positron when plot events occur - comm_event_tx: Sender, - +pub(crate) struct DeviceContext { /// Channel for sending [IOPubMessage::DisplayData] and /// [IOPubMessage::UpdateDisplayData] to Jupyter frontends when plot events occur iopub_tx: Sender, @@ -179,9 +132,9 @@ struct DeviceContext { /// device specifications (i.e. for Positron's Plots pane). id: RefCell, - /// Mapping of plot ID to the communication socket used for communicating its - /// rendered results to the frontend. - sockets: RefCell>, + /// Mapping of `PlotId` to comm ID, used for sending update events to + /// existing plot comms via `CommOutgoingTx`. + comm_ids: RefCell>, /// Mapping of plot ID to its metadata (captured at creation time) metadata: RefCell>, @@ -213,16 +166,15 @@ struct DeviceContext { } impl DeviceContext { - fn new(comm_event_tx: Sender, iopub_tx: Sender) -> Self { + pub fn new(iopub_tx: Sender) -> Self { Self { - comm_event_tx, iopub_tx, has_changes: Cell::new(false), is_new_page: Cell::new(true), is_drawing: Cell::new(false), should_render: Cell::new(true), id: RefCell::new(Self::new_id()), - sockets: RefCell::new(HashMap::new()), + comm_ids: RefCell::new(HashMap::new()), metadata: RefCell::new(HashMap::new()), kind_counters: RefCell::new(HashMap::new()), wrapped_callbacks: WrappedDeviceCallbacks::default(), @@ -240,6 +192,10 @@ impl DeviceContext { } } + pub fn set_prerender_settings(&self, settings: PlotRenderSettings) { + self.prerender_settings.replace(settings); + } + /// Set the current execution context (called when an execute request starts) fn set_execution_context( &self, @@ -301,7 +257,7 @@ impl DeviceContext { /// [SessionMode::Console] mode. fn should_use_dynamic_plots(&self) -> bool { let console = Console::get(); - console.is_ui_comm_connected() && console.session_mode() == SessionMode::Console + console.ui_comm().is_some() && console.session_mode() == SessionMode::Console } /// Deactivation hook @@ -335,9 +291,15 @@ impl DeviceContext { #[tracing::instrument(level = "trace", skip_all, fields(level = %level))] fn hook_holdflush(&self, level: i32) { + let was_held = !self.should_render.get(); // Be extra safe and check `level <= 0` rather than just `level == 0` in case // our shadowed device returns a negative `level` self.should_render.replace(level <= 0); + + // Flush deferred changes on hold→release transition + if was_held && self.should_render.get() { + self.process_changes(); + } } #[tracing::instrument(level = "trace", skip_all, fields(mode = %mode))] @@ -474,111 +436,6 @@ impl DeviceContext { format!("{} {}", kind, counter) } - /// Process outstanding RPC requests received from Positron - /// - /// At idle time we loop through our set of plot channels and check if Positron has - /// responded on any of them stating that it is ready for us to replay and render - /// the actual plot, and then send back the bytes that represent that plot. - /// - /// Note that we only send back rendered plots at idle time. This means that if you - /// do something like: - /// - /// ```r - /// for (i in 1:5) { - /// plot(i) - /// Sys.sleep(1) - /// } - /// ``` - /// - /// Then it goes something like this: - /// - At each new page event we tell Positron there we have a new plot for it - /// - Positron sets up 5 blank plot windows and sends back an RPC requesting the plot - /// data - /// - AFTER the entire for loop has finished and we hit idle time, we drop into - /// `process_rpc_requests()` and render all 5 plots at once - /// - /// Practically this seems okay, it is just something to keep in mind. - #[tracing::instrument(level = "trace", skip_all)] - fn process_rpc_requests(&self) { - // Don't try to render a plot if we're currently drawing. - if self.is_drawing.get() { - log::trace!("Refusing to render due to `is_drawing`"); - return; - } - - // Don't try to render a plot if someone is asking us not to, i.e. `dev.hold()` - if !self.should_render.get() { - log::trace!("Refusing to render due to `should_render`"); - return; - } - - // Collect existing sockets into a vector of tuples. - // Necessary for handling Select in a clean way. - let sockets = { - // Refcell Safety: Clone the hashmap so we don't hold a reference for too long - let sockets = self.sockets.borrow().clone(); - sockets.into_iter().collect::>() - }; - - // Dynamically load all incoming channels within the sockets into a single `Select` - let mut select = Select::new(); - for (_id, sockets) in sockets.iter() { - select.recv(&sockets.incoming_rx); - } - - // Check for incoming plot render requests. - // Totally possible to have >1 requests pending, especially if we've plotted - // multiple things in a single chunk of R code. The `Err` case is likely just - // that no channels have any messages, so we don't log in that case. - while let Ok(selection) = select.try_select() { - let socket = sockets - .get(selection.index()) - .expect("Socket should exist for the selection index"); - let id = &socket.0; - let socket = &socket.1; - - // Receive on the "selected" channel - let message = match selection.recv(&socket.incoming_rx) { - Ok(message) => message, - Err(error) => { - // If the channel is disconnected, log and remove it so we don't try - // and `recv()` on it ever again - log::error!("{error:?}"); - // Refcell Safety: Short borrows in the file. - self.sockets.borrow_mut().remove(id); - - // Process remaining messages. Safe to do because we have - // removed the `DeviceContext`'s copy off the sockets but we - // are working through our own copy of them. - continue; - }, - }; - - match message { - CommMsg::Rpc { .. } => { - log::trace!("Handling `RPC` for plot `id` {id}"); - socket.handle_request(message, |req| self.handle_rpc(req, id)); - }, - - // Note that ideally this handler should be invoked before we - // check for `should_render`. I.e. we should acknowledge a plot - // has been closed on the frontend side even when `dev.hold()` - // is active. Doing so would require some more careful - // bookkeeping of the state though, and since this is a very - // unlikely sequence of action nothing really bad happens with - // the current approach, we decided to keep handling here. - CommMsg::Close => { - log::trace!("Handling `Close` for plot `id` {id}"); - self.close_plot(id) - }, - - message => { - log::error!("Received unexpected comm message for plot `id` {id}: {message:?}") - }, - } - } - } - #[tracing::instrument(level = "trace", skip_all, fields(id = %id))] fn handle_rpc( &self, @@ -630,7 +487,7 @@ impl DeviceContext { format: plot_meta.format, }; - let data = self.render_plot(&id, &settings)?; + let data = self.render_plot(id, &settings)?; let mime_type = Self::get_mime_type(&plot_meta.format); Ok(PlotBackendReply::RenderReply(PlotResult { @@ -643,16 +500,10 @@ impl DeviceContext { } #[tracing::instrument(level = "trace", skip(self))] - fn close_plot(&self, id: &PlotId) { - // RefCell safety: Short borrows in the file - self.sockets.borrow_mut().remove(id); - - // Remove metadata for this plot + fn on_plot_closed(&self, id: &PlotId) { + self.comm_ids.borrow_mut().remove(id); self.metadata.borrow_mut().remove(id); - // The plot data is stored at R level. Assumes we're called on the R - // thread at idle time so there's no race issues (see - // `on_process_idle_events()`). if let Err(err) = RFunction::from("remove_recording") .param("id", id) .call_in(ARK_ENVS.positron_ns) @@ -685,11 +536,18 @@ impl DeviceContext { fn process_changes(&self) { let id = self.id(); - if !self.has_changes.replace(false) { + if !self.has_changes.get() { log::trace!("No changes to process for plot `id` {id}"); return; } + if !self.should_render.get() { + log::trace!("Deferring changes for plot `id` {id} (rendering held)"); + return; + } + + self.has_changes.replace(false); + log::trace!("Processing changes for plot `id` {id}"); // Record the changes so we can replay them when Positron asks us for them. @@ -749,18 +607,9 @@ impl DeviceContext { origin, }); - // Let Positron know that we just created a new plot. - let socket = CommSocket::new( - CommInitiator::BackEnd, - id.to_string(), - POSITRON_PLOT_CHANNEL_ID.to_string(), - self.iopub_tx.clone(), - ); - let settings = self.prerender_settings.get(); - // Prepare a pre-rendering of the plot so Positron has something to display immediately - let data = match self.render_plot(id, &settings) { + let open_data = match self.render_plot(id, &settings) { Ok(pre_render) => { let mime_type = Self::get_mime_type(&PlotRenderFormat::Png); @@ -778,14 +627,19 @@ impl DeviceContext { }, }; - let event = CommEvent::Opened(socket.clone(), data); - if let Err(error) = self.comm_event_tx.send(event) { - log::error!("{error:?}"); - } + let plot_comm = PlotComm { + id: id.clone(), + open_data, + }; - // Save our new socket. - // Refcell Safety: Short borrows in the file. - self.sockets.borrow_mut().insert(id.clone(), socket); + match Console::get_mut().comm_open_backend(PLOT_COMM_NAME, Box::new(plot_comm)) { + Ok(comm_id) => { + self.comm_ids.borrow_mut().insert(id.clone(), comm_id); + }, + Err(err) => { + log::error!("Failed to register plot comm: {err:?}"); + }, + } } #[tracing::instrument(level = "trace", skip_all, fields(id = %id))] @@ -845,17 +699,14 @@ impl DeviceContext { fn process_update_plot_positron(&self, id: &PlotId) { log::trace!("Notifying Positron of plot update"); - // Refcell Safety: Make sure not to call other methods from this whole block. - let sockets = self.sockets.borrow(); - - // Find our socket - let socket = unwrap!(sockets.get(id), None => { - // If socket doesn't exist, bail, nothing to update (should be rare, likely a bug?) - log::error!("Can't find socket to update with id: {id}."); - return; - }); + let comm_id = match self.comm_ids.borrow().get(id).cloned() { + Some(id) => id, + None => { + log::error!("Can't find comm to update with id: {id}."); + return; + }, + }; - // Create a pre-rendering of the updated plot let settings = self.prerender_settings.get(); let update_params = match self.render_plot(id, &settings) { Ok(pre_render) => { @@ -879,11 +730,10 @@ impl DeviceContext { let value = serde_json::to_value(PlotFrontendEvent::Update(update_params)).unwrap(); - // Tell Positron we have an updated plot with optional pre-rendering - socket - .outgoing_tx + let outgoing_tx = CommOutgoingTx::new(comm_id, self.iopub_tx.clone()); + outgoing_tx .send(CommMsg::Data(value)) - .context("Failed to send update message for id {id}.") + .context(format!("Failed to send update message for id {id}.")) .log_err(); } @@ -939,36 +789,24 @@ impl DeviceContext { fn render_plot(&self, id: &PlotId, settings: &PlotRenderSettings) -> anyhow::Result { log::trace!("Rendering plot"); - let image_path = r_task(|| unsafe { - RFunction::from(".ps.graphics.render_plot_from_recording") - .param("id", id) - .param("width", RObject::try_from(settings.size.width)?) - .param("height", RObject::try_from(settings.size.height)?) - .param("pixel_ratio", settings.pixel_ratio) - .param("format", settings.format.to_string()) - .call()? - .to::() - }); - - let image_path = match image_path { - Ok(image_path) => image_path, - Err(error) => { - return Err(anyhow::anyhow!( - "Failed to render plot with `id` {id} due to: {error}." - )) - }, - }; + let image_path: String = RFunction::from(".ps.graphics.render_plot_from_recording") + .param("id", id) + .param("width", RObject::try_from(settings.size.width)?) + .param("height", RObject::try_from(settings.size.height)?) + .param("pixel_ratio", settings.pixel_ratio) + .param("format", settings.format.to_string()) + .call()? + .try_into() + .map_err(|err: harp::Error| anyhow!("Failed to render plot with `id` {id}: {err:?}"))?; log::trace!("Rendered plot to {image_path}"); - // Read contents into bytes. let conn = File::open(image_path)?; let mut reader = BufReader::new(conn); let mut buffer = vec![]; reader.read_to_end(&mut buffer)?; - // what an odd interface let data = general_purpose::STANDARD_NO_PAD.encode(buffer); Ok(data) @@ -995,6 +833,31 @@ impl DeviceContext { } } +/// Per-plot comm handler registered in Console's comm table. +/// Delegates RPC handling and lifecycle events to the shared `DeviceContext`. +#[derive(Debug)] +struct PlotComm { + id: PlotId, + open_data: serde_json::Value, +} + +impl CommHandler for PlotComm { + fn open_metadata(&self) -> serde_json::Value { + self.open_data.clone() + } + + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { + let dc = ctx.console().device_context(); + handle_rpc_request(&ctx.outgoing_tx, PLOT_COMM_NAME, msg, |req| { + dc.handle_rpc(req, &self.id) + }); + } + + fn handle_close(&mut self, ctx: &CommHandlerContext) { + ctx.console().device_context().on_plot_closed(&self.id); + } +} + // TODO: This macro needs to be updated every time we introduce support // for a new graphics device. Is there a better way? macro_rules! with_device { @@ -1047,15 +910,6 @@ impl From<&PlotId> for RObject { } } -/// Hook applied at idle time (`R_ProcessEvents()` time) to process any outstanding -/// RPC requests from Positron -/// -/// This is called a lot, so we don't trace log each entry -#[tracing::instrument(level = "trace", skip_all)] -pub(crate) fn on_process_idle_events() { - DEVICE_CONTEXT.with_borrow(|cell| cell.process_rpc_requests()); -} - /// Hook applied when an execute request starts /// /// Pushes the execution context (execution_id, code, code_location) to the graphics device @@ -1070,8 +924,9 @@ pub(crate) fn on_execute_request( code_location: Option, ) { log::trace!("Entering on_execute_request"); - DEVICE_CONTEXT - .with_borrow(|cell| cell.set_execution_context(execution_id, code, code_location)); + Console::get() + .device_context() + .set_execution_context(execution_id, code, code_location); } /// Hook applied after a code chunk has finished executing @@ -1094,11 +949,10 @@ pub(crate) fn on_execute_request( #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn on_did_execute_request() { log::trace!("Entering on_did_execute_request"); - DEVICE_CONTEXT.with_borrow(|cell| { - cell.process_changes(); - cell.clear_execution_context(); - cell.clear_pending_origin(); - }); + let dc = Console::get().device_context(); + dc.process_changes(); + dc.clear_execution_context(); + dc.clear_pending_origin(); } /// Activation callback @@ -1111,11 +965,10 @@ pub(crate) fn on_did_execute_request() { unsafe extern "C-unwind" fn callback_activate(dev: pDevDesc) { log::trace!("Entering callback_activate"); - DEVICE_CONTEXT.with_borrow(|cell| { - if let Some(callback) = cell.wrapped_callbacks.activate.get() { - callback(dev); - } - }); + let dc = Console::get().device_context(); + if let Some(callback) = dc.wrapped_callbacks.activate.get() { + callback(dev); + } } /// Deactivation callback @@ -1126,42 +979,40 @@ unsafe extern "C-unwind" fn callback_activate(dev: pDevDesc) { unsafe extern "C-unwind" fn callback_deactivate(dev: pDevDesc) { log::trace!("Entering callback_deactivate"); - DEVICE_CONTEXT.with_borrow(|cell| { - // We run our hook first to record before we deactivate the underlying device, - // in case device deactivation messes with the display list - cell.hook_deactivate(); - if let Some(callback) = cell.wrapped_callbacks.deactivate.get() { - callback(dev); - } - }); + let dc = Console::get().device_context(); + // We run our hook first to record before we deactivate the underlying device, + // in case device deactivation messes with the display list + dc.hook_deactivate(); + if let Some(callback) = dc.wrapped_callbacks.deactivate.get() { + callback(dev); + } } #[tracing::instrument(level = "trace", skip_all, fields(level_delta = %level_delta))] unsafe extern "C-unwind" fn callback_holdflush(dev: pDevDesc, level_delta: i32) -> i32 { log::trace!("Entering callback_holdflush"); - DEVICE_CONTEXT.with_borrow(|cell| { - // If our wrapped device has a `holdflush()` method, we rely on it to apply - // the `level_delta` (typically `+1` or `-1`) and return the new level. Otherwise - // we follow the lead of `devholdflush()` in R and use a resolved `level` of `0`. - // Notably, `grDevices::png()` with a Cairo backend does not have a holdflush - // hook. - // https://github.com/wch/r-source/blob/8cebcc0a5d99890839e5171f398da643d858dcca/src/library/grDevices/src/devices.c#L129-L138 - let level = match cell.wrapped_callbacks.holdflush.get() { - Some(callback) => { - let level = callback(dev, level_delta); - log::trace!("Using resolved holdflush level from wrapped callback: {level}"); - level - }, - None => { - let level = 0; - log::trace!("Using default holdflush level: {level}"); - level - }, - }; - cell.hook_holdflush(level); - level - }) + let dc = Console::get().device_context(); + // If our wrapped device has a `holdflush()` method, we rely on it to apply + // the `level_delta` (typically `+1` or `-1`) and return the new level. Otherwise + // we follow the lead of `devholdflush()` in R and use a resolved `level` of `0`. + // Notably, `grDevices::png()` with a Cairo backend does not have a holdflush + // hook. + // https://github.com/wch/r-source/blob/8cebcc0a5d99890839e5171f398da643d858dcca/src/library/grDevices/src/devices.c#L129-L138 + let level = match dc.wrapped_callbacks.holdflush.get() { + Some(callback) => { + let level = callback(dev, level_delta); + log::trace!("Using resolved holdflush level from wrapped callback: {level}"); + level + }, + None => { + let level = 0; + log::trace!("Using default holdflush level: {level}"); + level + }, + }; + dc.hook_holdflush(level); + level } // mode = 0, graphics off @@ -1171,24 +1022,22 @@ unsafe extern "C-unwind" fn callback_holdflush(dev: pDevDesc, level_delta: i32) unsafe extern "C-unwind" fn callback_mode(mode: i32, dev: pDevDesc) { log::trace!("Entering callback_mode"); - DEVICE_CONTEXT.with_borrow(|cell| { - if let Some(callback) = cell.wrapped_callbacks.mode.get() { - callback(mode, dev); - } - cell.hook_mode(mode); - }); + let dc = Console::get().device_context(); + if let Some(callback) = dc.wrapped_callbacks.mode.get() { + callback(mode, dev); + } + dc.hook_mode(mode); } #[tracing::instrument(level = "trace", skip_all)] unsafe extern "C-unwind" fn callback_new_page(dd: pGEcontext, dev: pDevDesc) { log::trace!("Entering callback_new_page"); - DEVICE_CONTEXT.with_borrow(|cell| { - if let Some(callback) = cell.wrapped_callbacks.newPage.get() { - callback(dd, dev); - } - cell.hook_new_page(); - }); + let dc = Console::get().device_context(); + if let Some(callback) = dc.wrapped_callbacks.newPage.get() { + callback(dd, dev); + } + dc.hook_new_page(); } unsafe fn ps_graphics_device_impl() -> anyhow::Result { @@ -1211,26 +1060,24 @@ unsafe fn ps_graphics_device_impl() -> anyhow::Result { with_device!(ge_device, |ge_device, device| { (*ge_device).displayListOn = 1; - DEVICE_CONTEXT.with_borrow(|cell| { - let wrapped_callbacks = &cell.wrapped_callbacks; + let wrapped_callbacks = &Console::get().device_context().wrapped_callbacks; - // Safety: The callbacks are stored in simple cells. + // Safety: The callbacks are stored in simple cells. - wrapped_callbacks.activate.replace((*device).activate); - (*device).activate = Some(callback_activate); + wrapped_callbacks.activate.replace((*device).activate); + (*device).activate = Some(callback_activate); - wrapped_callbacks.deactivate.replace((*device).deactivate); - (*device).deactivate = Some(callback_deactivate); + wrapped_callbacks.deactivate.replace((*device).deactivate); + (*device).deactivate = Some(callback_deactivate); - wrapped_callbacks.holdflush.replace((*device).holdflush); - (*device).holdflush = Some(callback_holdflush); + wrapped_callbacks.holdflush.replace((*device).holdflush); + (*device).holdflush = Some(callback_holdflush); - wrapped_callbacks.mode.replace((*device).mode); - (*device).mode = Some(callback_mode); + wrapped_callbacks.mode.replace((*device).mode); + (*device).mode = Some(callback_mode); - wrapped_callbacks.newPage.replace((*device).newPage); - (*device).newPage = Some(callback_new_page); - }); + wrapped_callbacks.newPage.replace((*device).newPage); + (*device).newPage = Some(callback_new_page); }); Ok(R_NilValue) @@ -1273,11 +1120,9 @@ unsafe extern "C-unwind" fn ps_graphics_device() -> anyhow::Result { unsafe extern "C-unwind" fn ps_graphics_before_plot_new(_name: SEXP) -> anyhow::Result { log::trace!("Entering ps_graphics_before_plot_new"); - DEVICE_CONTEXT.with_borrow(|cell| { - // Process changes related to the last plot before opening a new page. - // Particularly important if we make multiple plots in a single chunk. - cell.process_changes(); - }); + // Process changes related to the last plot before opening a new page. + // Particularly important if we make multiple plots in a single chunk. + Console::get().device_context().process_changes(); Ok(harp::r_null()) } @@ -1292,38 +1137,36 @@ unsafe extern "C-unwind" fn ps_graphics_get_metadata(id: SEXP) -> anyhow::Result let id_str: String = RObject::view(id).try_into()?; let plot_id = PlotId(id_str); - DEVICE_CONTEXT.with_borrow(|cell| { - let metadata = cell.metadata.borrow(); - match metadata.get(&plot_id) { - Some(info) => { - let origin_uri = info.origin.as_ref().map(|o| o.uri.as_str()).unwrap_or(""); - - // Create a list with the metadata values - let values: Vec = vec![ - RObject::from(info.name.as_str()), - RObject::from(info.kind.as_str()), - RObject::from(info.execution_id.as_str()), - RObject::from(info.code.as_str()), - RObject::from(origin_uri), - ]; - let list = RObject::try_from(values)?; - - // Set the names attribute - let names: Vec = vec![ - "name".to_string(), - "kind".to_string(), - "execution_id".to_string(), - "code".to_string(), - "origin_uri".to_string(), - ]; - let names = RObject::from(names); - libr::Rf_setAttrib(list.sexp, libr::R_NamesSymbol, names.sexp); - - Ok(list.sexp) - }, - None => Ok(harp::r_null()), - } - }) + let metadata = Console::get().device_context().metadata.borrow(); + match metadata.get(&plot_id) { + Some(info) => { + let origin_uri = info.origin.as_ref().map(|o| o.uri.as_str()).unwrap_or(""); + + // Create a list with the metadata values + let values: Vec = vec![ + RObject::from(info.name.as_str()), + RObject::from(info.kind.as_str()), + RObject::from(info.execution_id.as_str()), + RObject::from(info.code.as_str()), + RObject::from(origin_uri), + ]; + let list = RObject::try_from(values)?; + + // Set the names attribute + let names: Vec = vec![ + "name".to_string(), + "kind".to_string(), + "execution_id".to_string(), + "code".to_string(), + "origin_uri".to_string(), + ]; + let names = RObject::from(names); + libr::Rf_setAttrib(list.sexp, libr::R_NamesSymbol, names.sexp); + + Ok(list.sexp) + }, + None => Ok(harp::r_null()), + } } /// Push a source file URI onto the source context stack. @@ -1331,7 +1174,7 @@ unsafe extern "C-unwind" fn ps_graphics_get_metadata(id: SEXP) -> anyhow::Result #[harp::register] unsafe extern "C-unwind" fn ps_graphics_push_source_context(uri: SEXP) -> anyhow::Result { let uri_str: String = RObject::view(uri).try_into()?; - DEVICE_CONTEXT.with_borrow(|cell| cell.push_source_context(uri_str)); + Console::get().device_context().push_source_context(uri_str); Ok(harp::r_null()) } @@ -1339,6 +1182,6 @@ unsafe extern "C-unwind" fn ps_graphics_push_source_context(uri: SEXP) -> anyhow /// Called from the `source()` hook when leaving a sourced file. #[harp::register] unsafe extern "C-unwind" fn ps_graphics_pop_source_context() -> anyhow::Result { - DEVICE_CONTEXT.with_borrow(|cell| cell.pop_source_context()); + Console::get().device_context().pop_source_context(); Ok(harp::r_null()) } diff --git a/crates/ark/src/request.rs b/crates/ark/src/request.rs index 0f7390de8..6d3370207 100644 --- a/crates/ark/src/request.rs +++ b/crates/ark/src/request.rs @@ -6,12 +6,13 @@ // use amalthea::comm::comm_channel::CommMsg; +use amalthea::socket::comm::CommOutgoingTx; use amalthea::wire::execute_reply::ExecuteReply; use amalthea::wire::execute_request::ExecuteRequest; use amalthea::wire::originator::Originator; use crossbeam::channel::Sender; -use crate::ui::UiCommMessage; +use crate::comm_handler::CommHandler; /// Represents requests to the primary R execution thread. #[derive(Debug, Clone)] @@ -53,8 +54,15 @@ pub fn debug_request_command(req: DebugRequest) -> String { /// Represents requests to the kernel. #[derive(Debug)] pub enum KernelRequest { - /// Establish a channel to the UI comm which forwards messages to the frontend - EstablishUiCommChannel(Sender), + /// Register a frontend-initiated comm handler on the R thread. + /// The handler is constructed on the Shell thread and sent here for registration. + CommOpen { + comm_id: String, + comm_name: String, + outgoing_tx: CommOutgoingTx, + handler: Box, + done_tx: Sender<()>, + }, /// Deliver an incoming comm message to the R thread CommMsg { diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index 3089105e5..24ad342ef 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -10,7 +10,6 @@ use amalthea::comm::comm_channel::CommMsg; use amalthea::language::shell_handler::CommHandled; use amalthea::language::shell_handler::ShellHandler; use amalthea::socket::comm::CommSocket; -use amalthea::socket::stdin::StdInRequest; use amalthea::wire::complete_reply::CompleteReply; use amalthea::wire::complete_request::CompleteRequest; use amalthea::wire::execute_reply::ExecuteReply; @@ -41,30 +40,26 @@ use harp::ParseResult; use log::*; use serde_json::json; use stdext::unwrap; -use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; use crate::ark_comm::ArkComm; use crate::console::Console; -use crate::console::ConsoleNotification; use crate::console::KernelInfo; use crate::data_explorer::r_data_explorer::DATA_EXPLORER_COMM_NAME; use crate::help::r_help::RHelp; use crate::help_proxy; -use crate::plots::graphics_device::GraphicsDeviceNotification; +use crate::plots::graphics_device::PLOT_COMM_NAME; use crate::r_task; use crate::request::KernelRequest; use crate::request::RRequest; use crate::ui::UiComm; +use crate::ui::UI_COMM_NAME; use crate::variables::r_variables::RVariables; pub struct Shell { r_request_tx: Sender, - stdin_request_tx: Sender, kernel_request_tx: Sender, kernel_init_rx: BusReader, kernel_info: Option, - graphics_device_tx: AsyncUnboundedSender, - console_notification_tx: AsyncUnboundedSender, } #[derive(Debug)] @@ -76,20 +71,14 @@ impl Shell { /// Creates a new instance of the shell message handler. pub(crate) fn new( r_request_tx: Sender, - stdin_request_tx: Sender, kernel_init_rx: BusReader, kernel_request_tx: Sender, - graphics_device_tx: AsyncUnboundedSender, - console_notification_tx: AsyncUnboundedSender, ) -> Self { Self { r_request_tx, - stdin_request_tx, kernel_request_tx, kernel_init_rx, kernel_info: None, - graphics_device_tx, - console_notification_tx, } } @@ -247,13 +236,7 @@ impl ShellHandler for Shell { async fn handle_comm_open(&self, target: Comm, comm: CommSocket) -> amalthea::Result { match target { Comm::Variables => handle_comm_open_variables(comm), - Comm::Ui => handle_comm_open_ui( - comm, - self.stdin_request_tx.clone(), - self.kernel_request_tx.clone(), - self.graphics_device_tx.clone(), - self.console_notification_tx.clone(), - ), + Comm::Ui => handle_comm_open_ui(comm, self.kernel_request_tx.clone()), Comm::Help => handle_comm_open_help(comm), Comm::Other(target_name) if target_name == "ark" => ArkComm::handle_comm_open(comm), _ => Ok(false), @@ -267,7 +250,7 @@ impl ShellHandler for Shell { msg: CommMsg, ) -> amalthea::Result { match comm_name { - DATA_EXPLORER_COMM_NAME => { + DATA_EXPLORER_COMM_NAME | PLOT_COMM_NAME | UI_COMM_NAME => { self.dispatch_kernel_request(|done_tx| KernelRequest::CommMsg { comm_id: comm_id.to_string(), msg, @@ -285,7 +268,7 @@ impl ShellHandler for Shell { comm_name: &str, ) -> amalthea::Result { match comm_name { - DATA_EXPLORER_COMM_NAME => { + DATA_EXPLORER_COMM_NAME | PLOT_COMM_NAME | UI_COMM_NAME => { self.dispatch_kernel_request(|done_tx| KernelRequest::CommClose { comm_id: comm_id.to_string(), done_tx, @@ -324,20 +307,23 @@ fn handle_comm_open_variables(comm: CommSocket) -> amalthea::Result { fn handle_comm_open_ui( comm: CommSocket, - stdin_request_tx: Sender, kernel_request_tx: Sender, - graphics_device_tx: AsyncUnboundedSender, - _console_notification_tx: AsyncUnboundedSender, ) -> amalthea::Result { - // Create a frontend to wrap the comm channel we were just given. This starts - // a thread that proxies messages to the frontend. - let ui_comm_tx = UiComm::start(comm, stdin_request_tx, graphics_device_tx); + let handler = UiComm::new(); - // Send the frontend event channel to the execution thread so it can emit - // events to the frontend. - if let Err(err) = kernel_request_tx.send(KernelRequest::EstablishUiCommChannel(ui_comm_tx)) { - log::error!("Could not deliver UI comm channel to execution thread: {err:?}"); - }; + let (done_tx, done_rx) = bounded(0); + kernel_request_tx + .send(KernelRequest::CommOpen { + comm_id: comm.comm_id.clone(), + comm_name: comm.comm_name.clone(), + outgoing_tx: comm.outgoing_tx.clone(), + handler: Box::new(handler), + done_tx, + }) + .map_err(|err| amalthea::Error::SendError(err.to_string()))?; + done_rx + .recv() + .map_err(|err| amalthea::Error::ReceiveError(err.to_string()))?; Ok(true) } diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs index 2c3cbf5b1..d900f8855 100644 --- a/crates/ark/src/start.rs +++ b/crates/ark/src/start.rs @@ -25,7 +25,6 @@ use crate::console::SessionMode; use crate::control::Control; use crate::dap; use crate::lsp; -use crate::plots::graphics_device::GraphicsDeviceNotification; use crate::repos::DefaultRepos; use crate::request::KernelRequest; use crate::request::RRequest; @@ -79,20 +78,12 @@ pub fn start_kernel( // StdIn socket thread let (stdin_request_tx, stdin_request_rx) = bounded::(1); - // Communication channel between the graphics device (running on the R - // thread) and the shell thread - let (graphics_device_tx, graphics_device_rx) = - tokio::sync::mpsc::unbounded_channel::(); - // Create the shell. let kernel_init_rx = kernel_init_tx.add_rx(); let shell = Box::new(Shell::new( r_request_tx.clone(), - stdin_request_tx.clone(), kernel_init_rx, kernel_request_tx, - graphics_device_tx, - console_notification_tx, )); // Create the control handler; this is used to handle shutdown/interrupt and @@ -152,7 +143,6 @@ pub fn start_kernel( dap, session_mode, default_repos, - graphics_device_rx, console_notification_rx, ) } diff --git a/crates/ark/src/ui/events.rs b/crates/ark/src/ui/events.rs index 241ac48f5..52828911e 100644 --- a/crates/ark/src/ui/events.rs +++ b/crates/ark/src/ui/events.rs @@ -14,7 +14,6 @@ use amalthea::comm::ui_comm::OpenWorkspaceParams; use amalthea::comm::ui_comm::Position; use amalthea::comm::ui_comm::Range; use amalthea::comm::ui_comm::SetEditorSelectionsParams; -use amalthea::comm::ui_comm::ShowMessageParams; use amalthea::comm::ui_comm::ShowUrlParams; use amalthea::comm::ui_comm::UiFrontendEvent; use harp::object::RObject; @@ -25,16 +24,9 @@ use crate::console::Console; #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_show_message(message: SEXP) -> anyhow::Result { - let params = ShowMessageParams { - message: RObject::view(message).try_into()?, - }; - - let event = UiFrontendEvent::ShowMessage(params); + let message: String = RObject::view(message).try_into()?; - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_show_message"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.show_message(message); Ok(R_NilValue) } @@ -51,10 +43,7 @@ pub unsafe extern "C-unwind" fn ps_ui_open_workspace( let event = UiFrontendEvent::OpenWorkspace(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_open_workspace"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -79,10 +68,7 @@ pub unsafe extern "C-unwind" fn ps_ui_navigate_to_file( let event = UiFrontendEvent::OpenEditor(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_navigate_to_file"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -94,10 +80,7 @@ pub unsafe extern "C-unwind" fn ps_ui_set_selection_ranges(ranges: SEXP) -> anyh let event = UiFrontendEvent::SetEditorSelections(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_set_selection_ranges"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -109,10 +92,7 @@ pub fn send_show_url_event(url: &str) -> anyhow::Result<()> { }; let event = UiFrontendEvent::ShowUrl(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("show_url"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(()) } @@ -130,10 +110,7 @@ pub fn send_open_with_system_event(path: &str) -> anyhow::Result<()> { }; let event = UiFrontendEvent::OpenWithSystem(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("open_with_system"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(()) } @@ -160,7 +137,3 @@ pub fn ps_ui_robj_as_ranges(ranges: SEXP) -> anyhow::Result> { .collect(); Ok(selections) } - -fn ui_comm_not_connected(name: &str) -> anyhow::Error { - anyhow::anyhow!("UI comm not connected, can't run `{name}`.") -} diff --git a/crates/ark/src/ui/methods.rs b/crates/ark/src/ui/methods.rs index 5e9778c24..3036077c5 100644 --- a/crates/ark/src/ui/methods.rs +++ b/crates/ark/src/ui/methods.rs @@ -25,7 +25,9 @@ use crate::ui::events::ps_ui_robj_as_ranges; #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_last_active_editor_context() -> anyhow::Result { - let out = Console::get().call_frontend_method(UiFrontendRequest::LastActiveEditorContext)?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::LastActiveEditorContext)?; Ok(out.sexp) } @@ -43,14 +45,17 @@ pub unsafe extern "C-unwind" fn ps_ui_modify_editor_selections( } let params = ModifyEditorSelectionsParams { selections, values }; - let out = - Console::get().call_frontend_method(UiFrontendRequest::ModifyEditorSelections(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ModifyEditorSelections(params))?; Ok(out.sexp) } #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_workspace_folder() -> anyhow::Result { - let out = Console::get().call_frontend_method(UiFrontendRequest::WorkspaceFolder)?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::WorkspaceFolder)?; Ok(out.sexp) } @@ -64,7 +69,9 @@ pub unsafe extern "C-unwind" fn ps_ui_show_dialog( message: RObject::view(message).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowDialog(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowDialog(params))?; Ok(out.sexp) } @@ -90,7 +97,9 @@ pub unsafe extern "C-unwind" fn ps_ui_show_question( }, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowQuestion(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowQuestion(params))?; Ok(out.sexp) } @@ -120,7 +129,9 @@ pub extern "C-unwind" fn ps_ui_show_prompt( timeout: timeout_secs, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowPrompt(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowPrompt(params))?; Ok(out.sexp) } @@ -130,7 +141,9 @@ pub unsafe extern "C-unwind" fn ps_ui_ask_for_password(prompt: SEXP) -> anyhow:: prompt: RObject::view(prompt).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::AskForPassword(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::AskForPassword(params))?; Ok(out.sexp) } @@ -144,7 +157,9 @@ pub unsafe extern "C-unwind" fn ps_ui_new_document( language_id: RObject::view(language_id).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::NewDocument(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::NewDocument(params))?; Ok(out.sexp) } @@ -154,7 +169,9 @@ pub unsafe extern "C-unwind" fn ps_ui_execute_command(command: SEXP) -> anyhow:: command: RObject::view(command).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ExecuteCommand(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ExecuteCommand(params))?; Ok(out.sexp) } @@ -170,7 +187,9 @@ pub unsafe extern "C-unwind" fn ps_ui_execute_code( allow_incomplete: false, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ExecuteCode(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ExecuteCode(params))?; Ok(out.sexp) } @@ -182,7 +201,9 @@ pub unsafe extern "C-unwind" fn ps_ui_evaluate_when_clause( when_clause: RObject::view(when_clause).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::EvaluateWhenClause(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::EvaluateWhenClause(params))?; Ok(out.sexp) } @@ -192,6 +213,8 @@ pub unsafe extern "C-unwind" fn ps_ui_debug_sleep(ms: SEXP) -> anyhow::Result` that communicates -/// messages to the `UiComm` -/// -/// Adds convenience methods for sending `Event`s and `Request`s. -/// -/// Manages a bit of state for performing a state refresh -/// (the `working_directory`). -pub struct UiCommSender { - ui_comm_tx: Sender, - working_directory: PathBuf, -} - -impl UiCommSender { - pub fn new(ui_comm_tx: Sender) -> Self { - // Empty path buf will get updated on first directory refresh - let working_directory = PathBuf::new(); - - Self { - ui_comm_tx, - working_directory, - } - } - - pub fn send_event(&self, event: UiFrontendEvent) { - self.send(UiCommMessage::Event(event)) - } - - pub fn send_request(&self, request: UiCommFrontendRequest) { - self.send(UiCommMessage::Request(request)) - } - - fn send(&self, msg: UiCommMessage) { - log::info!("Sending message to UI comm: {msg:?}"); - - if let Err(err) = self.ui_comm_tx.send(msg) { - log::error!("Error sending message to UI comm: {err:?}"); - - // TODO: Something is wrong with the UI thread, we should - // disconnect to avoid more errors but then we need a mutable self - // self.ui_comm_tx = None; - } - } - - pub fn send_refresh(&mut self, input_prompt: String, continuation_prompt: String) { - self.refresh_prompt_info(input_prompt, continuation_prompt); - - if let Err(err) = self.refresh_working_directory() { - log::error!("Can't refresh working directory: {err:?}"); - } - } - - fn refresh_prompt_info(&self, input_prompt: String, continuation_prompt: String) { - self.send_event(UiFrontendEvent::PromptState(PromptStateParams { - input_prompt, - continuation_prompt, - })); - } - - /// Checks for changes to the working directory, and sends an event to the - /// frontend if the working directory has changed. - fn refresh_working_directory(&mut self) -> anyhow::Result<()> { - // Get the current working directory - let mut new_working_directory = std::env::current_dir()?; - - // If it isn't the same as the last working directory, send an event - if new_working_directory != self.working_directory { - self.working_directory = new_working_directory.clone(); - - // Attempt to alias the directory, if it's within the home directory - if let Some(home_dir) = home::home_dir() { - if let Ok(stripped_dir) = new_working_directory.strip_prefix(home_dir) { - let mut new_path = PathBuf::from("~"); - new_path.push(stripped_dir); - new_working_directory = new_path; - } - } - - // Deliver event to client - self.send_event(UiFrontendEvent::WorkingDirectory(WorkingDirectoryParams { - directory: new_working_directory.to_string_lossy().to_string(), - })); - }; - - Ok(()) - } -} diff --git a/crates/ark/src/ui/ui.rs b/crates/ark/src/ui/ui.rs deleted file mode 100644 index 9fb1a26ac..000000000 --- a/crates/ark/src/ui/ui.rs +++ /dev/null @@ -1,536 +0,0 @@ -// -// ui.rs -// -// Copyright (C) 2023-2026 by Posit Software, PBC -// -// - -use amalthea::comm::comm_channel::CommMsg; -use amalthea::comm::ui_comm::CallMethodParams; -use amalthea::comm::ui_comm::DidChangePlotsRenderSettingsParams; -use amalthea::comm::ui_comm::EditorContextChangedParams; -use amalthea::comm::ui_comm::EvalResult; -use amalthea::comm::ui_comm::EvaluateCodeParams; -use amalthea::comm::ui_comm::UiBackendReply; -use amalthea::comm::ui_comm::UiBackendRequest; -use amalthea::comm::ui_comm::UiFrontendEvent; -use amalthea::socket::comm::CommSocket; -use amalthea::socket::stdin::StdInRequest; -use amalthea::wire::input_request::UiCommFrontendRequest; -use crossbeam::channel::Receiver; -use crossbeam::channel::Sender; -use crossbeam::select; -use harp::eval::parse_eval_global; -use harp::exec::RFunction; -use harp::exec::RFunctionExt; -use harp::object::RObject; -use serde_json::Value; -use stdext::spawn; -use stdext::unwrap; -use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; - -use crate::console::Console; -use crate::console::ConsoleOutputCapture; -use crate::plots::graphics_device::GraphicsDeviceNotification; -use crate::r_task; - -#[derive(Debug)] -pub enum UiCommMessage { - Event(UiFrontendEvent), - Request(UiCommFrontendRequest), -} - -/// UiComm is a wrapper around a comm channel whose lifetime matches -/// that of the Positron UI frontend. It is used to perform communication with the -/// frontend that isn't scoped to any particular view. -pub struct UiComm { - comm: CommSocket, - ui_comm_rx: Receiver, - stdin_request_tx: Sender, - graphics_device_tx: AsyncUnboundedSender, -} - -impl UiComm { - pub(crate) fn start( - comm: CommSocket, - stdin_request_tx: Sender, - graphics_device_tx: AsyncUnboundedSender, - ) -> Sender { - // Create a sender-receiver pair for Positron global events - let (ui_comm_tx, ui_comm_rx) = crossbeam::channel::unbounded::(); - - spawn!("ark-comm-ui", move || { - let frontend = Self { - comm, - ui_comm_rx, - stdin_request_tx, - graphics_device_tx, - }; - frontend.execution_thread(); - }); - - ui_comm_tx - } - - fn execution_thread(&self) { - loop { - // Wait for an event on either the event channel (which forwards - // Positron events to the frontend) or the comm channel (which - // receives requests from the frontend) - select! { - recv(&self.ui_comm_rx) -> msg => { - let msg = unwrap!(msg, Err(err) => { - log::error!( - "Error receiving Positron event; closing event listener: {err:?}" - ); - // Most likely the channel was closed, so we should stop the thread - break; - }); - match msg { - UiCommMessage::Event(event) => self.dispatch_event(&event), - UiCommMessage::Request(request) => self.call_frontend_method(request).unwrap(), - } - }, - - recv(&self.comm.incoming_rx) -> msg => { - match msg { - Ok(msg) => { - if !self.handle_comm_message(msg) { - log::info!("UI comm {} closing by request from frontend.", self.comm.comm_id); - break; - } - }, - Err(err) => { - log::error!("Error receiving message from frontend: {:?}", err); - break; - }, - } - }, - } - } - } - - fn dispatch_event(&self, event: &UiFrontendEvent) { - let json = serde_json::to_value(event).unwrap(); - - // Deliver the event to the frontend over the comm channel - if let Err(err) = self.comm.outgoing_tx.send(CommMsg::Data(json)) { - log::error!("Error sending UI event to frontend: {}", err); - }; - } - - /** - * Handles a comm message from the frontend. - * - * Returns true if the thread should continue, false if it should exit. - */ - fn handle_comm_message(&self, message: CommMsg) -> bool { - if let CommMsg::Close = message { - // The frontend has closed the connection; let the - // thread exit. - return false; - } - - if self - .comm - .handle_request(message.clone(), |req| self.handle_backend_method(req)) - { - return true; - } - - // We don't really expect to receive data messages from the - // frontend; they are events - log::warn!("Unexpected data message from frontend: {message:?}"); - true - } - - /** - * Handles an RPC request from the frontend. - */ - fn handle_backend_method( - &self, - request: UiBackendRequest, - ) -> anyhow::Result { - match request { - UiBackendRequest::CallMethod(request) => self.handle_call_method(request), - UiBackendRequest::DidChangePlotsRenderSettings(params) => { - self.handle_did_change_plot_render_settings(params) - }, - UiBackendRequest::EditorContextChanged(params) => { - self.handle_editor_context_changed(params) - }, - UiBackendRequest::EvaluateCode(params) => self.handle_evaluate_code(params), - } - } - - fn handle_call_method( - &self, - request: CallMethodParams, - ) -> anyhow::Result { - log::trace!("Handling '{}' frontend RPC method", request.method); - - // Today, all RPCs are fulfilled by R directly. Check to see if an R - // method of the appropriate name is defined. - // - // Consider: In the future, we may want to allow requests to be - // fulfilled here on the Rust side, with only some requests forwarded to - // R; Rust methods may wish to establish their own RPC handlers. - - // The method name is prefixed with ".ps.rpc.", by convention - let method = format!(".ps.rpc.{}", request.method); - - // Use the `exists` function to see if the method exists - let exists = r_task(|| unsafe { - let exists = RFunction::from("exists") - .param("x", method.clone()) - .call()?; - RObject::to::(exists) - })?; - - if !exists { - anyhow::bail!("No such method: {}", request.method); - } - - // Form an R function call from the request - let result = r_task(|| { - let mut call = RFunction::from(method); - for param in request.params.iter() { - let p = RObject::try_from(param.clone())?; - call.add(p); - } - let result = call.call()?; - Value::try_from(result) - })?; - - Ok(UiBackendReply::CallMethodReply(result)) - } - - fn handle_did_change_plot_render_settings( - &self, - params: DidChangePlotsRenderSettingsParams, - ) -> anyhow::Result { - // The frontend shoudn't send invalid sizes but be defensive. Sometimes - // the plot container is in a strange state when it's hidden. - if params.settings.size.height <= 0 || params.settings.size.width <= 0 { - return Err(anyhow::anyhow!( - "Got invalid plot render size: {size:?}", - size = params.settings.size, - )); - } - - self.graphics_device_tx - .send(GraphicsDeviceNotification::DidChangePlotRenderSettings( - params.settings, - )) - .unwrap(); - - Ok(UiBackendReply::DidChangePlotsRenderSettingsReply()) - } - - fn handle_editor_context_changed( - &self, - params: EditorContextChangedParams, - ) -> anyhow::Result { - log::trace!( - "Editor context changed: document_uri={}, is_execution_source={}", - params.document_uri, - params.is_execution_source - ); - Ok(UiBackendReply::EditorContextChangedReply()) - } - - fn handle_evaluate_code( - &self, - params: EvaluateCodeParams, - ) -> anyhow::Result { - log::trace!("Evaluating code: {}", params.code); - - let result = r_task(|| { - let mut capture = if Console::is_initialized() { - Console::get_mut().start_capture() - } else { - ConsoleOutputCapture::dummy() - }; - - // Evaluate the user's code - let eval_result = parse_eval_global(¶ms.code); - - // Take captured output before dropping the capture guard - let output = capture.take(); - drop(capture); - - // Now handle the eval result - let evaluated = eval_result?; - let value = Value::try_from(evaluated)?; - - Ok((value, output)) - }); - - match result { - Ok((value, output)) => Ok(UiBackendReply::EvaluateCodeReply(EvalResult { - result: value, - output, - })), - Err(err) => { - let message = match err { - harp::Error::TryCatchError { message, .. } => message, - harp::Error::ParseError { message, .. } => message, - harp::Error::ParseSyntaxError { message } => message, - _ => format!("{err}"), - }; - Err(anyhow::anyhow!("{message}")) - }, - } - } - - /** - * Send an RPC request to the frontend. - */ - fn call_frontend_method(&self, request: UiCommFrontendRequest) -> anyhow::Result<()> { - let comm_msg = StdInRequest::Comm(request); - self.stdin_request_tx.send(comm_msg)?; - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use amalthea::comm::base_comm::JsonRpcError; - use amalthea::comm::comm_channel::CommMsg; - use amalthea::comm::ui_comm::BusyParams; - use amalthea::comm::ui_comm::CallMethodParams; - use amalthea::comm::ui_comm::EvalResult; - use amalthea::comm::ui_comm::EvaluateCodeParams; - use amalthea::comm::ui_comm::UiBackendReply; - use amalthea::comm::ui_comm::UiBackendRequest; - use amalthea::comm::ui_comm::UiFrontendEvent; - use amalthea::socket::comm::CommInitiator; - use amalthea::socket::comm::CommSocket; - use amalthea::socket::iopub::IOPubMessage; - use amalthea::socket::stdin::StdInRequest; - use ark_test::dummy_jupyter_header; - use ark_test::IOPubReceiverExt; - use crossbeam::channel::bounded; - use crossbeam::channel::Sender; - use harp::exec::RFunction; - use harp::exec::RFunctionExt; - use harp::object::RObject; - use serde_json::Value; - - use crate::plots::graphics_device::GraphicsDeviceNotification; - use crate::r_task::r_task; - use crate::ui::UiComm; - use crate::ui::UiCommMessage; - - #[test] - fn test_ui_comm() { - // Create a dummy iopub channel to receive responses. - let (iopub_tx, iopub_rx) = bounded::(10); - - // Create a sender/receiver pair for the comm channel. - let comm_socket = CommSocket::new( - CommInitiator::FrontEnd, - String::from("test-ui-comm-id"), - String::from("positron.UI"), - iopub_tx, - ); - - // Communication channel between the main thread and the Amalthea - // StdIn socket thread - let (stdin_request_tx, _stdin_request_rx) = bounded::(1); - - let (graphics_device_tx, _graphics_device_rx) = - tokio::sync::mpsc::unbounded_channel::(); - - // Create a frontend instance, get access to the sender channel - let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx); - - // Get the current console width - let old_width = r_task(|| unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }); - - // Send a message to the frontend - let id = String::from("test-id-1"); - let request = UiBackendRequest::CallMethod(CallMethodParams { - method: String::from("setConsoleWidth"), - params: vec![Value::from(123)], - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id, - parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); - - // Wait for the reply; this should be a FrontendRpcResult. - let response = iopub_rx.recv_comm_msg(); - match response { - CommMsg::Rpc { id, data, .. } => { - println!("Got RPC result: {:?}", data); - let result = serde_json::from_value::(data).unwrap(); - assert_eq!(id, "test-id-1"); - // This RPC should return the old width - assert_eq!( - result, - UiBackendReply::CallMethodReply(Value::from(old_width)) - ); - }, - _ => panic!("Unexpected response: {:?}", response), - } - - // Get the new console width - let new_width = r_task(|| unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }); - - // Assert that the console width changed - assert_eq!(new_width, 123); - - // Now try to invoke an RPC that doesn't exist - let id = String::from("test-id-2"); - let request = UiBackendRequest::CallMethod(CallMethodParams { - method: String::from("thisRpcDoesNotExist"), - params: vec![], - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id, - parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); - - // Wait for the reply - let response = iopub_rx.recv_comm_msg(); - match response { - CommMsg::Rpc { id, data, .. } => { - println!("Got RPC result: {:?}", data); - let _reply = serde_json::from_value::(data).unwrap(); - // Ensure that the error code is -32601 (method not found) - assert_eq!(id, "test-id-2"); - - // TODO: This should normally throw a `MethodNotFound` but - // that's currently a bit hard because of the nested method - // call. One way to solve this would be for RPC handler - // functions to return a typed JSON-RPC error instead of a - // `anyhow::Result`. Then we could return a `MethodNotFound` from - // `callMethod()`. - // - // assert_eq!(reply.error.code, JsonRpcErrorCode::MethodNotFound); - }, - _ => panic!("Unexpected response: {:?}", response), - } - - // Mark not busy (this prevents the frontend comm from being closed due to - // the Sender being dropped) - ui_comm_tx - .send(UiCommMessage::Event(UiFrontendEvent::Busy(BusyParams { - busy: false, - }))) - .unwrap(); - } - - /// Helper to set up a UiComm and return the pieces needed for testing - fn setup_ui_comm() -> ( - CommSocket, - crossbeam::channel::Receiver, - Sender, - ) { - let (iopub_tx, iopub_rx) = bounded::(10); - let comm_socket = CommSocket::new( - CommInitiator::FrontEnd, - String::from("test-eval-comm-id"), - String::from("positron.UI"), - iopub_tx, - ); - let (stdin_request_tx, _stdin_request_rx) = bounded::(1); - let (graphics_device_tx, _graphics_device_rx) = - tokio::sync::mpsc::unbounded_channel::(); - let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx); - (comm_socket, iopub_rx, ui_comm_tx) - } - - /// Send an evaluate_code RPC and return the reply - fn send_evaluate_code( - comm_socket: &CommSocket, - iopub_rx: &crossbeam::channel::Receiver, - id: &str, - code: &str, - ) -> UiBackendReply { - let request = UiBackendRequest::EvaluateCode(EvaluateCodeParams { - code: String::from(code), - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id: String::from(id), - parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); - - let response = iopub_rx.recv_comm_msg(); - match response { - CommMsg::Rpc { data, .. } => serde_json::from_value::(data).unwrap(), - _ => panic!("Unexpected response: {:?}", response), - } - } - - #[test] - fn test_evaluate_code() { - let (comm_socket, iopub_rx, ui_comm_tx) = setup_ui_comm(); - - // Test 1: Pure result with no output (e.g. 1 + 1) - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-1", "1 + 1"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::from(2.0), - output: String::from(""), - }) - ); - - // Test 2: Code that returns a value - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-2", "isTRUE(cat('oatmeal'))"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::from(false), - // Output capture relies on Console::start_capture(), which is - // not available in unit tests (Console is not initialized). - // Output capture is exercised in integration tests instead. - output: String::from(""), - }) - ); - - // Test 3: Code that only prints, with an invisible NULL result - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-3", "cat('hello\\nworld')"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::Null, - output: String::from(""), - }) - ); - - // Keep the comm alive - ui_comm_tx - .send(UiCommMessage::Event(UiFrontendEvent::Busy(BusyParams { - busy: false, - }))) - .unwrap(); - } -} diff --git a/crates/ark/src/ui/ui_comm.rs b/crates/ark/src/ui/ui_comm.rs new file mode 100644 index 000000000..cfcd9f02d --- /dev/null +++ b/crates/ark/src/ui/ui_comm.rs @@ -0,0 +1,363 @@ +// +// ui.rs +// +// Copyright (C) 2023-2026 by Posit Software, PBC +// +// + +use std::path::PathBuf; + +use amalthea::comm::comm_channel::CommMsg; +use amalthea::comm::ui_comm::CallMethodParams; +use amalthea::comm::ui_comm::DidChangePlotsRenderSettingsParams; +use amalthea::comm::ui_comm::EditorContextChangedParams; +use amalthea::comm::ui_comm::EvalResult; +use amalthea::comm::ui_comm::EvaluateCodeParams; +use amalthea::comm::ui_comm::PromptStateParams; +use amalthea::comm::ui_comm::UiBackendReply; +use amalthea::comm::ui_comm::UiBackendRequest; +use amalthea::comm::ui_comm::UiFrontendEvent; +use amalthea::comm::ui_comm::WorkingDirectoryParams; +use harp::eval::parse_eval_global; +use harp::exec::RFunction; +use harp::exec::RFunctionExt; +use harp::object::RObject; +use serde_json::Value; +use stdext::result::ResultExt; + +use crate::comm_handler::handle_rpc_request; +use crate::comm_handler::CommHandler; +use crate::comm_handler::CommHandlerContext; +use crate::comm_handler::EnvironmentChanged; +use crate::console::Console; +use crate::console::ConsoleOutputCapture; + +pub const UI_COMM_NAME: &str = "positron.ui"; + +/// Comm handler for the Positron UI comm. +#[derive(Debug)] +pub struct UiComm { + working_directory: PathBuf, +} + +impl CommHandler for UiComm { + fn handle_open(&mut self, ctx: &CommHandlerContext) { + self.refresh(ctx); + } + + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { + handle_rpc_request(&ctx.outgoing_tx, UI_COMM_NAME, msg, |req| { + self.handle_rpc(req) + }); + } + + fn handle_environment(&mut self, event: EnvironmentChanged, ctx: &CommHandlerContext) { + let EnvironmentChanged::Execution = event else { + return; + }; + self.refresh(ctx); + } +} + +impl UiComm { + pub(crate) fn new() -> Self { + Self { + working_directory: PathBuf::new(), + } + } + + fn handle_rpc(&mut self, request: UiBackendRequest) -> anyhow::Result { + match request { + UiBackendRequest::CallMethod(params) => self.handle_call_method(params), + UiBackendRequest::DidChangePlotsRenderSettings(params) => { + self.handle_did_change_plot_render_settings(params) + }, + UiBackendRequest::EditorContextChanged(params) => { + self.handle_editor_context_changed(params) + }, + UiBackendRequest::EvaluateCode(params) => self.handle_evaluate_code(params), + } + } + + fn handle_call_method(&self, request: CallMethodParams) -> anyhow::Result { + log::trace!("Handling '{}' frontend RPC method", request.method); + + // Today, all RPCs are fulfilled by R directly. Check to see if an R + // method of the appropriate name is defined. + // + // Consider: In the future, we may want to allow requests to be + // fulfilled here on the Rust side, with only some requests forwarded to + // R; Rust methods may wish to establish their own RPC handlers. + + let method = format!(".ps.rpc.{}", request.method); + + let exists_obj = RFunction::from("exists") + .param("x", method.clone()) + .call()?; + let exists: bool = exists_obj.try_into()?; + + if !exists { + let method = &request.method; + return Err(anyhow::anyhow!("No such method: {method}")); + } + + let mut call = RFunction::from(method); + for param in request.params.iter() { + let p = RObject::try_from(param.clone())?; + call.add(p); + } + let result = call.call()?; + let result = Value::try_from(result)?; + + Ok(UiBackendReply::CallMethodReply(result)) + } + + fn handle_did_change_plot_render_settings( + &self, + params: DidChangePlotsRenderSettingsParams, + ) -> anyhow::Result { + // The frontend shouldn't send invalid sizes but be defensive. Sometimes + // the plot container is in a strange state when it's hidden. + if params.settings.size.height <= 0 || params.settings.size.width <= 0 { + return Err(anyhow::anyhow!( + "Got invalid plot render size: {size:?}", + size = params.settings.size, + )); + } + + Console::get() + .device_context() + .set_prerender_settings(params.settings); + + Ok(UiBackendReply::DidChangePlotsRenderSettingsReply()) + } + + fn handle_editor_context_changed( + &self, + params: EditorContextChangedParams, + ) -> anyhow::Result { + log::trace!( + "Editor context changed: document_uri={}, is_execution_source={}", + params.document_uri, + params.is_execution_source + ); + Ok(UiBackendReply::EditorContextChangedReply()) + } + + fn handle_evaluate_code(&self, params: EvaluateCodeParams) -> anyhow::Result { + log::trace!("Evaluating code: {}", params.code); + + let mut capture = if Console::is_initialized() { + Console::get_mut().start_capture() + } else { + ConsoleOutputCapture::dummy() + }; + + let value = parse_eval_global(¶ms.code); + + let output = capture.take(); + drop(capture); + + match value { + Ok(evaluated) => { + let result = Value::try_from(evaluated)?; + Ok(UiBackendReply::EvaluateCodeReply(EvalResult { + result, + output, + })) + }, + Err(err) => { + let message = match err { + harp::Error::TryCatchError { message, .. } => message, + harp::Error::ParseError { message, .. } => message, + harp::Error::ParseSyntaxError { message } => message, + _ => format!("{err}"), + }; + Err(anyhow::anyhow!("{message}")) + }, + } + } + + fn refresh(&mut self, ctx: &CommHandlerContext) { + self.refresh_prompt_info(ctx); + self.refresh_working_directory(ctx).log_err(); + } + + fn refresh_prompt_info(&self, ctx: &CommHandlerContext) { + let input_prompt: String = harp::get_option("prompt") + .try_into() + .unwrap_or_else(|_| String::from("> ")); + let continuation_prompt: String = harp::get_option("continue") + .try_into() + .unwrap_or_else(|_| String::from("+ ")); + + ctx.send_event(&UiFrontendEvent::PromptState(PromptStateParams { + input_prompt, + continuation_prompt, + })); + } + + /// Checks for changes to the working directory, and sends an event to the + /// frontend if the working directory has changed. + fn refresh_working_directory(&mut self, ctx: &CommHandlerContext) -> anyhow::Result<()> { + let mut new_working_directory = std::env::current_dir()?; + + if new_working_directory != self.working_directory { + self.working_directory = new_working_directory.clone(); + + // Attempt to alias the directory, if it's within the home directory + if let Some(home_dir) = home::home_dir() { + if let Ok(stripped_dir) = new_working_directory.strip_prefix(home_dir) { + let mut new_path = PathBuf::from("~"); + new_path.push(stripped_dir); + new_working_directory = new_path; + } + } + + ctx.send_event(&UiFrontendEvent::WorkingDirectory(WorkingDirectoryParams { + directory: new_working_directory.to_string_lossy().to_string(), + })); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use amalthea::comm::base_comm::JsonRpcError; + use amalthea::comm::comm_channel::CommMsg; + use amalthea::comm::event::CommEvent; + use amalthea::comm::ui_comm::CallMethodParams; + use amalthea::comm::ui_comm::EvalResult; + use amalthea::comm::ui_comm::EvaluateCodeParams; + use amalthea::comm::ui_comm::UiBackendReply; + use amalthea::comm::ui_comm::UiBackendRequest; + use amalthea::socket::comm::CommOutgoingTx; + use amalthea::socket::iopub::IOPubMessage; + use ark_test::dummy_jupyter_header; + use ark_test::IOPubReceiverExt; + use crossbeam::channel::bounded; + use serde_json::Value; + + use super::*; + use crate::comm_handler::CommHandlerContext; + use crate::r_task; + + fn setup_ui_comm( + iopub_tx: crossbeam::channel::Sender, + ) -> (UiComm, CommHandlerContext) { + let comm_id = uuid::Uuid::new_v4().to_string(); + + let outgoing_tx = CommOutgoingTx::new(comm_id, iopub_tx); + let (comm_event_tx, _) = bounded::(10); + let ctx = CommHandlerContext::new(outgoing_tx, comm_event_tx); + + let handler = UiComm::new(); + (handler, ctx) + } + + #[test] + fn test_ui_comm() { + let (iopub_tx, iopub_rx) = bounded::(10); + + let old_width = r_task(move || { + let (mut handler, ctx) = setup_ui_comm(iopub_tx); + + // Get the current console width + let old_width: i32 = harp::get_option("width").try_into().unwrap(); + + // Send a setConsoleWidth RPC + let msg = CommMsg::Rpc { + id: String::from("test-id-1"), + parent_header: dummy_jupyter_header(), + data: serde_json::to_value(UiBackendRequest::CallMethod(CallMethodParams { + method: String::from("setConsoleWidth"), + params: vec![Value::from(123)], + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); + + // Assert that the console width changed + let new_width: i32 = harp::get_option("width").try_into().unwrap(); + assert_eq!(new_width, 123); + + // Now try to invoke an RPC that doesn't exist + let msg = CommMsg::Rpc { + id: String::from("test-id-2"), + parent_header: dummy_jupyter_header(), + data: serde_json::to_value(UiBackendRequest::CallMethod(CallMethodParams { + method: String::from("thisRpcDoesNotExist"), + params: vec![], + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); + + old_width + }); + + // Check first response (setConsoleWidth) + let response = iopub_rx.recv_comm_msg(); + match response { + CommMsg::Rpc { id, data, .. } => { + let result = serde_json::from_value::(data).unwrap(); + assert_eq!(id, "test-id-1"); + assert_eq!( + result, + UiBackendReply::CallMethodReply(Value::from(old_width)) + ); + }, + _ => panic!("Unexpected response: {:?}", response), + } + + // Check second response (non-existent method error) + let response = iopub_rx.recv_comm_msg(); + match response { + CommMsg::Rpc { id, data, .. } => { + let _reply = serde_json::from_value::(data).unwrap(); + assert_eq!(id, "test-id-2"); + }, + _ => panic!("Unexpected response: {:?}", response), + } + } + + #[test] + fn test_evaluate_code() { + let (iopub_tx, iopub_rx) = bounded::(10); + + r_task(move || { + let (mut handler, ctx) = setup_ui_comm(iopub_tx); + + // Pure result with no output (e.g. 1 + 1) + let msg = CommMsg::Rpc { + id: String::from("eval-1"), + parent_header: dummy_jupyter_header(), + data: serde_json::to_value(UiBackendRequest::EvaluateCode(EvaluateCodeParams { + code: String::from("1 + 1"), + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); + }); + + let response = iopub_rx.recv_comm_msg(); + match response { + CommMsg::Rpc { data, .. } => { + let result = serde_json::from_value::(data).unwrap(); + assert_eq!( + result, + UiBackendReply::EvaluateCodeReply(EvalResult { + result: Value::from(2.0), + // Output capture relies on Console::start_capture(), which is + // not available in unit tests (Console is not initialized). + // Output capture is exercised in integration tests instead. + output: String::from(""), + }) + ); + }, + _ => panic!("Unexpected response: {:?}", response), + } + } +} diff --git a/crates/ark/src/variables/r_variables.rs b/crates/ark/src/variables/r_variables.rs index e3e0f973b..9af576f43 100644 --- a/crates/ark/src/variables/r_variables.rs +++ b/crates/ark/src/variables/r_variables.rs @@ -361,7 +361,7 @@ impl RVariables { let explorer = RDataExplorer::new(name.clone(), obj, Some(binding)) .map_err(|err| harp::Error::Anyhow(err))?; let viewer_id = Console::get_mut() - .comm_register(DATA_EXPLORER_COMM_NAME, Box::new(explorer)) + .comm_open_backend(DATA_EXPLORER_COMM_NAME, Box::new(explorer)) .map_err(|err| harp::Error::Anyhow(err))?; Ok(Some(viewer_id)) }) diff --git a/crates/ark/src/viewer.rs b/crates/ark/src/viewer.rs index e6de45c55..351ffa698 100644 --- a/crates/ark/src/viewer.rs +++ b/crates/ark/src/viewer.rs @@ -112,11 +112,7 @@ pub unsafe extern "C-unwind" fn ps_html_viewer( // TODO: What's the right thing to do in `Console` mode when // we aren't connected to Positron? Right now we error. - let ui_comm_tx = console - .get_ui_comm_tx() - .ok_or_else(|| anyhow::anyhow!("UI comm not connected."))?; - - ui_comm_tx.send_event(event); + console.try_ui_comm()?.send_event(&event); }, } }, diff --git a/crates/ark/tests/evaluate-code.rs b/crates/ark/tests/evaluate-code.rs index 939a6bc7d..f3e4a8cb7 100644 --- a/crates/ark/tests/evaluate-code.rs +++ b/crates/ark/tests/evaluate-code.rs @@ -21,14 +21,11 @@ fn evaluate_code(frontend: &DummyArkFrontend, comm_id: &str, code: &str) -> UiBa }); frontend.send_shell_comm_msg(String::from(comm_id), data); - - // The shell routes the message to the UI comm thread and goes busy/idle. - // The RPC reply is sent from the UI comm thread and can arrive on IOPub - // either before or after the shell's Idle status. frontend.recv_iopub_busy(); - let reply = frontend.recv_iopub_comm_msg_and_idle(); + let reply = frontend.recv_iopub_comm_msg(); assert_eq!(reply.comm_id, comm_id); + frontend.recv_iopub_idle(); serde_json::from_value::(reply.data).unwrap() } diff --git a/crates/ark/tests/plots.rs b/crates/ark/tests/plots.rs index 9b2f204cb..89a79713b 100644 --- a/crates/ark/tests/plots.rs +++ b/crates/ark/tests/plots.rs @@ -583,3 +583,71 @@ fn test_plot_source_context_stacking() { file_a.uri_id, ); } + +/// Test that `dev.hold()` suppresses intermediate plot output. +/// +/// Without hold, each `plot()` call emits a separate `display_data`. +/// With hold active, intermediate plots are suppressed and only the +/// final state after `dev.flush()` is emitted. +#[test] +fn test_dev_hold_suppresses_intermediate_plots() { + let frontend = DummyArkFrontend::lock(); + + // Activate the graphics device + frontend.send_execute_request("plot(1:10)", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_display_data(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Hold, draw two intermediate plots, then flush. + // Only the final plot should produce output. + let code = r#" +invisible(dev.hold()) +plot(1:5) +plot(1:3) +invisible(dev.flush()) +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_display_data(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Test that `dev.hold()` persists across execute requests. +/// +/// A hold started in one request should suppress output until +/// `dev.flush()` is called in a subsequent request. +#[test] +fn test_dev_hold_across_execute_requests() { + let frontend = DummyArkFrontend::lock(); + + // Activate the graphics device + frontend.send_execute_request("plot(1:10)", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_display_data(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Hold and plot without flushing. No display_data should appear. + frontend.send_execute_request( + "invisible(dev.hold())\nplot(1:5)", + ExecuteRequestOptions::default(), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Flush in a separate request. The held plot should now appear. + frontend.send_execute_request("invisible(dev.flush())", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_display_data(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} diff --git a/crates/ark_test/src/dummy_frontend.rs b/crates/ark_test/src/dummy_frontend.rs index 09edb1814..743fd0be0 100644 --- a/crates/ark_test/src/dummy_frontend.rs +++ b/crates/ark_test/src/dummy_frontend.rs @@ -636,36 +636,6 @@ impl DummyArkFrontend { } } - /// Receive a CommMsg and Idle from IOPub in either order. - /// - /// Some comm RPC replies race with the shell's Idle status because the - /// reply is sent from a separate thread (e.g. the UI comm thread). This - /// helper accepts both orderings and returns the CommMsg content. - #[track_caller] - pub fn recv_iopub_comm_msg_and_idle(&self) -> amalthea::wire::comm_msg::CommWireMsg { - let first = self.recv_iopub_next(); - let second = self.recv_iopub_next(); - - let (comm_msg, idle) = match (first, second) { - (Message::CommMsg(comm), Message::Status(status)) => (comm, status), - (Message::Status(status), Message::CommMsg(comm)) => (comm, status), - (a, b) => panic!( - "Expected CommMsg and Idle in either order, got {:?} and {:?}", - a, b - ), - }; - - assert_eq!( - idle.content.execution_state, - amalthea::wire::status::ExecutionState::Idle, - "Expected Idle status" - ); - - self.flush_streams_at_boundary(); - - comm_msg.content - } - /// Receive from IOPub and assert CommOpen message. /// Automatically skips any Stream messages. #[track_caller] @@ -1094,54 +1064,28 @@ impl DummyArkFrontend { data: serde_json::json!({}), }); - // The comm_open triggers busy/idle on the shell thread and also - // sends an EstablishUiCommChannel kernel request to the console. - // The UI comm then sends events (prompt_state, working_directory) - // as CommMsg on IOPub. These can arrive in any order relative to - // the busy/idle. We wait for the prompt_state CommMsg as evidence - // that the UI comm has been established, draining everything. - let deadline = Instant::now() + RECV_TIMEOUT; - let mut got_prompt_state = false; - let mut idle_count = 0u32; + // The UI comm runs on the R thread via CommHandler. The comm_open + // blocks Shell while the handler's `handle_open()` runs, so events + // arrive deterministically within the Busy/Idle window. + self.recv_iopub_busy(); - // We need to see the prompt_state AND at least one idle - while !got_prompt_state || idle_count == 0 { - let remaining = deadline.saturating_duration_since(Instant::now()); - let Some(msg) = self.recv_iopub_with_timeout(remaining) else { - panic!( - "Timed out waiting for UI comm (got_prompt_state={got_prompt_state}, \ - idle_count={idle_count})" - ); - }; - match &msg { - Message::CommMsg(data) => { - if let Some(method) = data.content.data.get("method").and_then(|v| v.as_str()) { - if method == "prompt_state" { - got_prompt_state = true; - } - } - }, - Message::Status(ref data) - if data.content.execution_state == - amalthea::wire::status::ExecutionState::Idle => - { - idle_count += 1; - }, - Message::Stream(ref data) => { - self.buffer_stream(&data.content); - }, - _ => {}, - } - } + // `handle_open()` calls `refresh()` which sends prompt_state then + // working_directory. + let prompt_state = self.recv_iopub_comm_msg(); + assert_eq!(prompt_state.comm_id, comm_id); + assert_eq!( + prompt_state.data.get("method").and_then(|v| v.as_str()), + Some("prompt_state") + ); - // The UI comm sends events asynchronously. Some may arrive after - // idle. Drain any stragglers with a short timeout. - while let Some(msg) = self.recv_iopub_with_timeout(Duration::from_millis(200)) { - if let Message::Stream(ref data) = msg { - self.buffer_stream(&data.content); - } - // Discard late comm messages and other events - } + let working_dir = self.recv_iopub_comm_msg(); + assert_eq!(working_dir.comm_id, comm_id); + assert_eq!( + working_dir.data.get("method").and_then(|v| v.as_str()), + Some("working_directory") + ); + + self.recv_iopub_idle(); comm_id }