Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions crates/nemo-data/src/sources/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,32 @@ impl DataSource for WebSocketSource {

let (mut write, mut read) = ws_stream.split();

// Channel for sending messages (heartbeats and pong responses)
let (write_tx, mut write_rx) = tokio::sync::mpsc::channel::<Message>(10);

// Spawn write task to handle all outgoing messages
let write_task = tokio::spawn(async move {
while let Some(msg) = write_rx.recv().await {
if write.send(msg).await.is_err() {
break;
}
}
});

// Optional heartbeat task
let heartbeat_task = if let Some(hb) = &config.heartbeat {
let msg = hb.message.clone();
let interval = hb.interval;
let hb_tx = write_tx.clone();

Some(tokio::spawn(async move {
let mut timer = tokio::time::interval(interval);
loop {
timer.tick().await;
// Note: This won't work as write is moved, simplified for now
let _ = msg.clone();
let message = Message::Text(msg.clone());
if hb_tx.send(message).await.is_err() {
break;
}
}
}))
} else {
Expand Down Expand Up @@ -172,7 +188,7 @@ impl DataSource for WebSocketSource {
}
}
Ok(Message::Ping(data)) => {
let _ = write.send(Message::Pong(data)).await;
let _ = write_tx.send(Message::Pong(data)).await;
}
Ok(Message::Close(_)) => {
break;
Expand All @@ -184,6 +200,9 @@ impl DataSource for WebSocketSource {
}
}

// Cleanup tasks
drop(write_tx);
write_task.abort();
if let Some(task) = heartbeat_task {
task.abort();
}
Expand Down
6 changes: 3 additions & 3 deletions crates/nemo/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use crate::components::{
Button, CandlestickChart, Checkbox, ClusteredBarChart, ClusteredColumnChart, CodeEditor,
Collapsible, ColumnChart, DropdownButton, FunnelChart, HeatmapChart, Icon, Image, Label,
LineChart, List, Modal, Notification, Panel, PieChart, Progress, PyramidChart, RadarChart,
Radio, RealtimeChart, ScatterChart, Select, SidenavBar, Slider, Spinner, Stack, StackedBarChart,
StackedColumnChart, Switch, Table, Tabs, Tag, Text, TextEditor, Textarea, Toggle, Tooltip,
Tree,
Radio, RealtimeChart, ScatterChart, Select, SidenavBar, Slider, Spinner, Stack,
StackedBarChart, StackedColumnChart, Switch, Table, Tabs, Tag, Text, TextEditor, Textarea,
Toggle, Tooltip, Tree,
};
use crate::runtime::NemoRuntime;
use crate::workspace::HeaderBar;
Expand Down
7 changes: 6 additions & 1 deletion crates/nemo/src/components/realtime_chart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ impl Plot for MultiLineChartInner {
i if i == data_len - 1 => TextAlign::Right,
_ => TextAlign::Center,
};
AxisText::new(SharedString::from(label), x_tick, cx.theme().muted_foreground).align(align)
AxisText::new(
SharedString::from(label),
x_tick,
cx.theme().muted_foreground,
)
.align(align)
})
} else {
None
Expand Down
9 changes: 7 additions & 2 deletions plugins/streaming-stats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ fn spawn_stats_thread(ctx: Arc<dyn PluginContext>) {

if let Some(value) = ctx.get_data("metrics") {
for (channel, val, unit) in extract_metrics(&value) {
let window = windows.entry(channel.clone()).or_insert_with(ChannelWindow::new);
let window = windows
.entry(channel.clone())
.or_insert_with(ChannelWindow::new);
window.push(now, val);
channel_units.insert(channel.clone(), unit);
snapshot.insert(channel, val);
Expand Down Expand Up @@ -194,7 +196,10 @@ fn spawn_stats_thread(ctx: Arc<dyn PluginContext>) {

// Build summary row for table display
let mut row = IndexMap::new();
row.insert("channel".to_string(), PluginValue::String((*channel).clone()));
row.insert(
"channel".to_string(),
PluginValue::String((*channel).clone()),
);
row.insert("mean".to_string(), PluginValue::Float(mean));
row.insert("min".to_string(), PluginValue::Float(min));
row.insert("max".to_string(), PluginValue::Float(max));
Expand Down