From f32156d246cea82008035b9128c0019ee98b4a1e Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Wed, 18 Feb 2026 12:50:01 -0800 Subject: [PATCH 1/2] fix: implement functional WebSocket heartbeat mechanism The WebSocket heartbeat task was broken - it would tick a timer and clone the message but never actually send it. The write handle was moved to the read loop, making it inaccessible to the heartbeat task. This commit introduces a channel-based architecture where: - A dedicated write task owns the WebSocket write handle - The heartbeat task sends messages through a channel to the write task - Pong responses also use the same channel for proper handling This eliminates the CPU-burning no-op loop and provides a clean separation of concerns between reading, writing, and heartbeat functionality. Fixes: td-fbb644 Co-authored-by: Cursor --- crates/nemo-data/src/sources/websocket.rs | 25 ++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/crates/nemo-data/src/sources/websocket.rs b/crates/nemo-data/src/sources/websocket.rs index d73382b..7ff36a2 100644 --- a/crates/nemo-data/src/sources/websocket.rs +++ b/crates/nemo-data/src/sources/websocket.rs @@ -130,16 +130,32 @@ impl DataSource for WebSocketSource { let (mut write, mut read) = ws_stream.split(); + // Channel for sending messages (heartbeats and pong responses) + let (write_tx, mut write_rx) = tokio::sync::mpsc::channel::(10); + + // Spawn write task to handle all outgoing messages + let write_task = tokio::spawn(async move { + while let Some(msg) = write_rx.recv().await { + if write.send(msg).await.is_err() { + break; + } + } + }); + // Optional heartbeat task let heartbeat_task = if let Some(hb) = &config.heartbeat { let msg = hb.message.clone(); let interval = hb.interval; + let hb_tx = write_tx.clone(); + Some(tokio::spawn(async move { let mut timer = tokio::time::interval(interval); loop { timer.tick().await; - // Note: This won't work as write is moved, simplified for now - let _ = msg.clone(); + let message = Message::Text(msg.clone()); + if hb_tx.send(message).await.is_err() { + break; + } } })) } else { @@ -172,7 +188,7 @@ impl DataSource for WebSocketSource { } } Ok(Message::Ping(data)) => { - let _ = write.send(Message::Pong(data)).await; + let _ = write_tx.send(Message::Pong(data)).await; } Ok(Message::Close(_)) => { break; @@ -184,6 +200,9 @@ impl DataSource for WebSocketSource { } } + // Cleanup tasks + drop(write_tx); + write_task.abort(); if let Some(task) = heartbeat_task { task.abort(); } From ea03c7e4658c9e01740213daa338f697e90ac2c8 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Wed, 18 Feb 2026 15:31:03 -0800 Subject: [PATCH 2/2] fix: run cargo fmt --- crates/nemo-data/src/sources/websocket.rs | 2 +- crates/nemo/src/app.rs | 6 +++--- crates/nemo/src/components/realtime_chart.rs | 7 ++++++- plugins/streaming-stats/src/lib.rs | 9 +++++++-- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/crates/nemo-data/src/sources/websocket.rs b/crates/nemo-data/src/sources/websocket.rs index 7ff36a2..ff30052 100644 --- a/crates/nemo-data/src/sources/websocket.rs +++ b/crates/nemo-data/src/sources/websocket.rs @@ -147,7 +147,7 @@ impl DataSource for WebSocketSource { let msg = hb.message.clone(); let interval = hb.interval; let hb_tx = write_tx.clone(); - + Some(tokio::spawn(async move { let mut timer = tokio::time::interval(interval); loop { diff --git a/crates/nemo/src/app.rs b/crates/nemo/src/app.rs index a1e6dc7..c0d0f2f 100644 --- a/crates/nemo/src/app.rs +++ b/crates/nemo/src/app.rs @@ -21,9 +21,9 @@ use crate::components::{ Button, CandlestickChart, Checkbox, ClusteredBarChart, ClusteredColumnChart, CodeEditor, Collapsible, ColumnChart, DropdownButton, FunnelChart, HeatmapChart, Icon, Image, Label, LineChart, List, Modal, Notification, Panel, PieChart, Progress, PyramidChart, RadarChart, - Radio, RealtimeChart, ScatterChart, Select, SidenavBar, Slider, Spinner, Stack, StackedBarChart, - StackedColumnChart, Switch, Table, Tabs, Tag, Text, TextEditor, Textarea, Toggle, Tooltip, - Tree, + Radio, RealtimeChart, ScatterChart, Select, SidenavBar, Slider, Spinner, Stack, + StackedBarChart, StackedColumnChart, Switch, Table, Tabs, Tag, Text, TextEditor, Textarea, + Toggle, Tooltip, Tree, }; use crate::runtime::NemoRuntime; use crate::workspace::HeaderBar; diff --git a/crates/nemo/src/components/realtime_chart.rs b/crates/nemo/src/components/realtime_chart.rs index b227da5..5fcb377 100644 --- a/crates/nemo/src/components/realtime_chart.rs +++ b/crates/nemo/src/components/realtime_chart.rs @@ -115,7 +115,12 @@ impl Plot for MultiLineChartInner { i if i == data_len - 1 => TextAlign::Right, _ => TextAlign::Center, }; - AxisText::new(SharedString::from(label), x_tick, cx.theme().muted_foreground).align(align) + AxisText::new( + SharedString::from(label), + x_tick, + cx.theme().muted_foreground, + ) + .align(align) }) } else { None diff --git a/plugins/streaming-stats/src/lib.rs b/plugins/streaming-stats/src/lib.rs index 2778c5e..7aa8a7f 100644 --- a/plugins/streaming-stats/src/lib.rs +++ b/plugins/streaming-stats/src/lib.rs @@ -147,7 +147,9 @@ fn spawn_stats_thread(ctx: Arc) { if let Some(value) = ctx.get_data("metrics") { for (channel, val, unit) in extract_metrics(&value) { - let window = windows.entry(channel.clone()).or_insert_with(ChannelWindow::new); + let window = windows + .entry(channel.clone()) + .or_insert_with(ChannelWindow::new); window.push(now, val); channel_units.insert(channel.clone(), unit); snapshot.insert(channel, val); @@ -194,7 +196,10 @@ fn spawn_stats_thread(ctx: Arc) { // Build summary row for table display let mut row = IndexMap::new(); - row.insert("channel".to_string(), PluginValue::String((*channel).clone())); + row.insert( + "channel".to_string(), + PluginValue::String((*channel).clone()), + ); row.insert("mean".to_string(), PluginValue::Float(mean)); row.insert("min".to_string(), PluginValue::Float(min)); row.insert("max".to_string(), PluginValue::Float(max));