diff --git a/crates/nemo-data/src/sources/websocket.rs b/crates/nemo-data/src/sources/websocket.rs index d73382b..ff30052 100644 --- a/crates/nemo-data/src/sources/websocket.rs +++ b/crates/nemo-data/src/sources/websocket.rs @@ -130,16 +130,32 @@ impl DataSource for WebSocketSource { let (mut write, mut read) = ws_stream.split(); + // Channel for sending messages (heartbeats and pong responses) + let (write_tx, mut write_rx) = tokio::sync::mpsc::channel::(10); + + // Spawn write task to handle all outgoing messages + let write_task = tokio::spawn(async move { + while let Some(msg) = write_rx.recv().await { + if write.send(msg).await.is_err() { + break; + } + } + }); + // Optional heartbeat task let heartbeat_task = if let Some(hb) = &config.heartbeat { let msg = hb.message.clone(); let interval = hb.interval; + let hb_tx = write_tx.clone(); + Some(tokio::spawn(async move { let mut timer = tokio::time::interval(interval); loop { timer.tick().await; - // Note: This won't work as write is moved, simplified for now - let _ = msg.clone(); + let message = Message::Text(msg.clone()); + if hb_tx.send(message).await.is_err() { + break; + } } })) } else { @@ -172,7 +188,7 @@ impl DataSource for WebSocketSource { } } Ok(Message::Ping(data)) => { - let _ = write.send(Message::Pong(data)).await; + let _ = write_tx.send(Message::Pong(data)).await; } Ok(Message::Close(_)) => { break; @@ -184,6 +200,9 @@ impl DataSource for WebSocketSource { } } + // Cleanup tasks + drop(write_tx); + write_task.abort(); if let Some(task) = heartbeat_task { task.abort(); } diff --git a/crates/nemo/src/app.rs b/crates/nemo/src/app.rs index a1e6dc7..c0d0f2f 100644 --- a/crates/nemo/src/app.rs +++ b/crates/nemo/src/app.rs @@ -21,9 +21,9 @@ use crate::components::{ Button, CandlestickChart, Checkbox, ClusteredBarChart, ClusteredColumnChart, CodeEditor, Collapsible, ColumnChart, DropdownButton, FunnelChart, HeatmapChart, Icon, Image, Label, LineChart, List, Modal, Notification, Panel, PieChart, Progress, PyramidChart, RadarChart, - Radio, RealtimeChart, ScatterChart, Select, SidenavBar, Slider, Spinner, Stack, StackedBarChart, - StackedColumnChart, Switch, Table, Tabs, Tag, Text, TextEditor, Textarea, Toggle, Tooltip, - Tree, + Radio, RealtimeChart, ScatterChart, Select, SidenavBar, Slider, Spinner, Stack, + StackedBarChart, StackedColumnChart, Switch, Table, Tabs, Tag, Text, TextEditor, Textarea, + Toggle, Tooltip, Tree, }; use crate::runtime::NemoRuntime; use crate::workspace::HeaderBar; diff --git a/crates/nemo/src/components/realtime_chart.rs b/crates/nemo/src/components/realtime_chart.rs index b227da5..5fcb377 100644 --- a/crates/nemo/src/components/realtime_chart.rs +++ b/crates/nemo/src/components/realtime_chart.rs @@ -115,7 +115,12 @@ impl Plot for MultiLineChartInner { i if i == data_len - 1 => TextAlign::Right, _ => TextAlign::Center, }; - AxisText::new(SharedString::from(label), x_tick, cx.theme().muted_foreground).align(align) + AxisText::new( + SharedString::from(label), + x_tick, + cx.theme().muted_foreground, + ) + .align(align) }) } else { None diff --git a/plugins/streaming-stats/src/lib.rs b/plugins/streaming-stats/src/lib.rs index 2778c5e..7aa8a7f 100644 --- a/plugins/streaming-stats/src/lib.rs +++ b/plugins/streaming-stats/src/lib.rs @@ -147,7 +147,9 @@ fn spawn_stats_thread(ctx: Arc) { if let Some(value) = ctx.get_data("metrics") { for (channel, val, unit) in extract_metrics(&value) { - let window = windows.entry(channel.clone()).or_insert_with(ChannelWindow::new); + let window = windows + .entry(channel.clone()) + .or_insert_with(ChannelWindow::new); window.push(now, val); channel_units.insert(channel.clone(), unit); snapshot.insert(channel, val); @@ -194,7 +196,10 @@ fn spawn_stats_thread(ctx: Arc) { // Build summary row for table display let mut row = IndexMap::new(); - row.insert("channel".to_string(), PluginValue::String((*channel).clone())); + row.insert( + "channel".to_string(), + PluginValue::String((*channel).clone()), + ); row.insert("mean".to_string(), PluginValue::Float(mean)); row.insert("min".to_string(), PluginValue::Float(min)); row.insert("max".to_string(), PluginValue::Float(max));