Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/audio/mix/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
buffer::PooledBuffer,
constants::{MAX_LAYERS, MIXER_CHANNELS, TARGET_SAMPLE_RATE},
flow::FlowController,
playback::{StuckDetector, handle::PlaybackState},
playback::handle::PlaybackState,
},
config::player::PlayerConfig,
};
Expand Down Expand Up @@ -49,7 +49,7 @@ impl AudioMixer {
rx: Receiver<PooledBuffer>,
volume: f32,
) -> Result<(), &'static str> {
if self.layers.len() >= MAX_LAYERS {
if self.layers.len() >= self.max_layers {
return Err("Maximum mix layers reached");
}
self.layers
Expand Down Expand Up @@ -100,11 +100,12 @@ pub struct Mixer {
tracks: Vec<MixerTrack>,
mix_buf: Vec<i32>,
pub audio_mixer: AudioMixer,
opus_passthrough_track: Option<usize>,
opus_passthrough_track: Option<usize>, // index of track providing opus passthrough
final_pcm_buf: Vec<i16>,
pub stuck_detector: Arc<StuckDetector>,
}

// PassthroughTrack is now implicitly handled by MixerTrack via FlowController's latest_opus

struct MixerTrack {
flow: FlowController,
pending: Vec<i16>,
Expand All @@ -125,7 +126,6 @@ impl Mixer {
audio_mixer: AudioMixer::new(),
opus_passthrough_track: None,
final_pcm_buf: Vec::with_capacity(1920),
stuck_detector: Arc::new(StuckDetector::new(10_000)), // 10 second default threshold
}
}

Expand Down Expand Up @@ -281,7 +281,6 @@ impl Mixer {
.position
.fetch_add(filled as u64 / MIXER_CHANNELS as u64, Ordering::Relaxed);
track.is_buffering.store(false, Ordering::Release);
self.stuck_detector.record_frame_received();
} else if !track.finished {
track.is_buffering.store(true, Ordering::Release);
}
Expand Down Expand Up @@ -325,4 +324,4 @@ impl Mixer {
buf.copy_from_slice(&self.final_pcm_buf);
has_audio
}
}
}
2 changes: 1 addition & 1 deletion src/audio/playback/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl TrackHandle {

pub fn get_position(&self) -> u64 {
let samples = self.position.load(Ordering::Acquire);
(samples * 1000) / OPUS_SAMPLE_RATE
samples.saturating_mul(1000) / OPUS_SAMPLE_RATE
}

pub fn is_buffering(&self) -> bool {
Expand Down
4 changes: 1 addition & 3 deletions src/audio/playback/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pub mod handle;
pub mod stuck_detector;

pub use handle::{PlaybackState, TrackHandle};
pub use stuck_detector::StuckDetector;
pub use handle::{PlaybackState, TrackHandle};
61 changes: 0 additions & 61 deletions src/audio/playback/stuck_detector.rs

This file was deleted.

73 changes: 44 additions & 29 deletions src/player/manager/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use tracing::warn;

use super::lyrics::sync_lyrics;
use crate::{
audio::playback::{PlaybackState, StuckDetector, TrackHandle},
common::{types::GuildId, utils::now_ms},
audio::playback::{PlaybackState, TrackHandle},
common::types::GuildId,
player::state::PlayerState,
protocol::{
self,
Expand All @@ -24,20 +24,21 @@ pub struct MonitorCtx {
pub track: Track,
pub stop_signal: Arc<std::sync::atomic::AtomicBool>,
pub ping: Arc<std::sync::atomic::AtomicI64>,
pub stuck_threshold_ms: u64,
pub update_every_n: u64,
pub lyrics_subscribed: Arc<std::sync::atomic::AtomicBool>,
pub lyrics_data: Arc<tokio::sync::Mutex<Option<LyricsData>>>,
pub last_lyric_index: Arc<std::sync::atomic::AtomicI64>,
pub end_time_ms: Option<u64>,
pub stuck_detector: Arc<StuckDetector>,
}

pub async fn monitor_loop(ctx: MonitorCtx) {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
let mut tick: u64 = 0;
let mut last_pos = ctx.handle.get_position();
let mut last_frame_received_at = std::time::Instant::now();
let mut stuck_event_sent = false;

send_player_update(&ctx, last_pos);
send_player_update(&ctx, ctx.handle.get_position());

loop {
interval.tick().await;
Expand Down Expand Up @@ -65,17 +66,20 @@ pub async fn monitor_loop(ctx: MonitorCtx) {
}

if state == PlaybackState::Playing {
if cur_pos != last_pos {
ctx.stuck_detector.reset_stuck_flag();
}

if ctx.stuck_detector.check_stuck() {
send_stuck_event(&ctx, cur_pos);
if ctx.handle.is_buffering() {
if !stuck_event_sent {
check_stuck(&ctx, last_frame_received_at, cur_pos).await;
stuck_event_sent = true;
}
} else {
last_frame_received_at = std::time::Instant::now();
stuck_event_sent = false;
}
} else {
last_frame_received_at = std::time::Instant::now();
stuck_event_sent = false;
}

last_pos = cur_pos;

if tick.is_multiple_of(ctx.update_every_n) {
send_player_update(&ctx, cur_pos);
}
Expand Down Expand Up @@ -154,31 +158,42 @@ async fn handle_track_end_marker(ctx: &MonitorCtx) {
});
}

fn send_stuck_event(ctx: &MonitorCtx, cur_pos: u64) {
let threshold = ctx.stuck_detector.threshold_ms();
async fn check_stuck(
ctx: &MonitorCtx,
last_frame_received_at: std::time::Instant,
cur_pos: u64,
) {
if ctx.handle.get_state() != PlaybackState::Playing {
return;
}

ctx.session.send_message(&protocol::OutgoingMessage::Event {
event: Box::new(RustalinkEvent::TrackStuck {
guild_id: ctx.guild_id.clone(),
track: ctx.track.clone(),
threshold_ms: threshold,
}),
});
let elapsed_ms = last_frame_received_at.elapsed().as_millis() as u64;
let threshold = ctx.stuck_threshold_ms;

send_player_update(ctx, cur_pos);
if elapsed_ms >= threshold {
ctx.session.send_message(&protocol::OutgoingMessage::Event {
event: Box::new(RustalinkEvent::TrackStuck {
guild_id: ctx.guild_id.clone(),
track: ctx.track.clone(),
threshold_ms: threshold,
}),
});

warn!(
"[{}] Track stuck: no frames received for >= {}ms",
ctx.guild_id, threshold
);
send_player_update(ctx, cur_pos);

warn!(
"[{}] Track stuck: no frames received for {}ms (threshold {}ms)",
ctx.guild_id, elapsed_ms, threshold
);
}
}

fn send_player_update(ctx: &MonitorCtx, cur_pos: u64) {
ctx.session
.send_message(&protocol::OutgoingMessage::PlayerUpdate {
guild_id: ctx.guild_id.clone(),
state: PlayerState {
time: now_ms(),
time: crate::common::utils::now_ms(),
position: cur_pos,
connected: true,
ping: ctx.ping.load(Ordering::Acquire),
Expand All @@ -204,4 +219,4 @@ async fn clear_player_state(ctx: &MonitorCtx) {
p.track_handle = None;
}
}
}
}
13 changes: 3 additions & 10 deletions src/player/manager/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn start_playback(player: &mut PlayerContext, config: PlaybackStartCon
send_load_failed(
player,
&config.session,
format!("Track resolution timed out: {}", identifier),
format!("Track resolution timed out: {identifier}"),
)
.await;
return;
Expand Down Expand Up @@ -109,9 +109,6 @@ pub async fn start_playback(player: &mut PlayerContext, config: PlaybackStartCon
is_buffering,
player.config.clone(),
);
mixer
.stuck_detector
.set_threshold(player.config.stuck_threshold_ms);
}

player.track_handle = Some(handle.clone());
Expand Down Expand Up @@ -152,10 +149,6 @@ pub async fn start_playback(player: &mut PlayerContext, config: PlaybackStartCon
player.guild_id.clone(),
);

let engine = player.engine.lock().await;
let mixer = engine.mixer.lock().await;
let stuck_detector = mixer.stuck_detector.clone();

let ctx = MonitorCtx {
guild_id: player.guild_id.clone(),
handle: handle.clone(),
Expand All @@ -164,12 +157,12 @@ pub async fn start_playback(player: &mut PlayerContext, config: PlaybackStartCon
track: track_response,
stop_signal: player.stop_signal.clone(),
ping: player.ping.clone(),
stuck_threshold_ms: player.config.stuck_threshold_ms,
update_every_n: (config.update_interval_secs * 2).max(1),
lyrics_subscribed: player.lyrics_subscribed.clone(),
lyrics_data: player.lyrics_data.clone(),
last_lyric_index: player.last_lyric_index.clone(),
end_time_ms: player.end_time,
stuck_detector,
};

let track_task = tokio::spawn(monitor_loop(ctx));
Expand Down Expand Up @@ -217,4 +210,4 @@ async fn stop_current_track(player: &mut PlayerContext, session: &Session) {

let engine = player.engine.lock().await;
engine.mixer.lock().await.stop_all();
}
}
5 changes: 2 additions & 3 deletions src/protocol/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub fn decode_track(encoded: &str) -> Result<Track, CodecError> {
};

let source_name = stream.read_string()?;
let position = stream.read_u64::<BigEndian>()?;

let user_data = stream.read_json().unwrap_or_else(|_| serde_json::json!({}));

Expand All @@ -59,7 +58,7 @@ pub fn decode_track(encoded: &str) -> Result<Track, CodecError> {
author,
length,
is_stream,
position,
position: 0,
title,
uri,
artwork_url,
Expand All @@ -79,4 +78,4 @@ pub fn decode_playlist_info(encoded: &str) -> Result<PlaylistInfo, CodecError> {
name: stream.read_string()?,
selected_track: stream.read_i32::<BigEndian>()?,
})
}
}
Loading