From 9d3008b8f200f799cba674865311f868316da0d1 Mon Sep 17 00:00:00 2001 From: appujet Date: Fri, 20 Mar 2026 23:26:28 +0530 Subject: [PATCH] feat: Enhance playback reliability with HLS fetch retries, improved Deezer error handling, and integrated track stuck detection. --- src/audio/mix/mixer.rs | 13 +++-- src/audio/playback/handle.rs | 2 +- src/audio/playback/mod.rs | 4 +- src/audio/playback/stuck_detector.rs | 61 ----------------------- src/player/manager/monitor.rs | 73 +++++++++++++++++----------- src/player/manager/start.rs | 13 ++--- src/protocol/codec/decode.rs | 5 +- src/sources/deezer/reader/mod.rs | 34 +++++++++---- src/sources/youtube/hls/fetcher.rs | 51 +++++++++++++------ 9 files changed, 119 insertions(+), 137 deletions(-) delete mode 100644 src/audio/playback/stuck_detector.rs diff --git a/src/audio/mix/mixer.rs b/src/audio/mix/mixer.rs index a7ec0652..776ce82b 100644 --- a/src/audio/mix/mixer.rs +++ b/src/audio/mix/mixer.rs @@ -15,7 +15,7 @@ use crate::{ buffer::PooledBuffer, constants::{MAX_LAYERS, MIXER_CHANNELS, TARGET_SAMPLE_RATE}, flow::FlowController, - playback::{StuckDetector, handle::PlaybackState}, + playback::handle::PlaybackState, }, config::player::PlayerConfig, }; @@ -49,7 +49,7 @@ impl AudioMixer { rx: Receiver, volume: f32, ) -> Result<(), &'static str> { - if self.layers.len() >= MAX_LAYERS { + if self.layers.len() >= self.max_layers { return Err("Maximum mix layers reached"); } self.layers @@ -100,11 +100,12 @@ pub struct Mixer { tracks: Vec, mix_buf: Vec, pub audio_mixer: AudioMixer, - opus_passthrough_track: Option, + opus_passthrough_track: Option, // index of track providing opus passthrough final_pcm_buf: Vec, - pub stuck_detector: Arc, } +// PassthroughTrack is now implicitly handled by MixerTrack via FlowController's latest_opus + struct MixerTrack { flow: FlowController, pending: Vec, @@ -125,7 +126,6 @@ impl Mixer { audio_mixer: AudioMixer::new(), opus_passthrough_track: None, final_pcm_buf: Vec::with_capacity(1920), - stuck_detector: Arc::new(StuckDetector::new(10_000)), // 10 second default threshold } } @@ -281,7 +281,6 @@ impl Mixer { .position .fetch_add(filled as u64 / MIXER_CHANNELS as u64, Ordering::Relaxed); track.is_buffering.store(false, Ordering::Release); - self.stuck_detector.record_frame_received(); } else if !track.finished { track.is_buffering.store(true, Ordering::Release); } @@ -325,4 +324,4 @@ impl Mixer { buf.copy_from_slice(&self.final_pcm_buf); has_audio } -} +} \ No newline at end of file diff --git a/src/audio/playback/handle.rs b/src/audio/playback/handle.rs index d926e89e..b68272f3 100644 --- a/src/audio/playback/handle.rs +++ b/src/audio/playback/handle.rs @@ -111,7 +111,7 @@ impl TrackHandle { pub fn get_position(&self) -> u64 { let samples = self.position.load(Ordering::Acquire); - (samples * 1000) / OPUS_SAMPLE_RATE + samples.saturating_mul(1000) / OPUS_SAMPLE_RATE } pub fn is_buffering(&self) -> bool { diff --git a/src/audio/playback/mod.rs b/src/audio/playback/mod.rs index a7fdd654..8dd66b10 100644 --- a/src/audio/playback/mod.rs +++ b/src/audio/playback/mod.rs @@ -1,5 +1,3 @@ pub mod handle; -pub mod stuck_detector; -pub use handle::{PlaybackState, TrackHandle}; -pub use stuck_detector::StuckDetector; +pub use handle::{PlaybackState, TrackHandle}; \ No newline at end of file diff --git a/src/audio/playback/stuck_detector.rs b/src/audio/playback/stuck_detector.rs deleted file mode 100644 index b89b01e8..00000000 --- a/src/audio/playback/stuck_detector.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; - -use crate::common::utils::now_nanos; - -pub struct StuckDetector { - last_frame_received_at_nanos: AtomicU64, - threshold_ms: AtomicU64, - stuck_event_sent: AtomicBool, -} - -impl StuckDetector { - pub fn new(threshold_ms: u64) -> Self { - Self { - last_frame_received_at_nanos: AtomicU64::new(now_nanos()), - threshold_ms: AtomicU64::new(threshold_ms), - stuck_event_sent: AtomicBool::new(false), - } - } - - pub fn record_frame_received(&self) { - let now_nanos = now_nanos(); - self.last_frame_received_at_nanos - .store(now_nanos, Ordering::Release); - } - - pub fn reset_stuck_flag(&self) { - self.stuck_event_sent.store(false, Ordering::Release); - } - - pub fn check_stuck(&self) -> bool { - if self.stuck_event_sent.load(Ordering::Acquire) { - return false; - } - - let now_nanos = now_nanos(); - let last_received = self.last_frame_received_at_nanos.load(Ordering::Acquire); - let elapsed_nanos = now_nanos.saturating_sub(last_received); - let threshold_nanos = self.threshold_ms.load(Ordering::Acquire) * 1_000_000; - - if elapsed_nanos >= threshold_nanos { - self.stuck_event_sent.store(true, Ordering::Release); - true - } else { - false - } - } - - pub fn threshold_ms(&self) -> u64 { - self.threshold_ms.load(Ordering::Acquire) - } - - pub fn set_threshold(&self, threshold_ms: u64) { - self.threshold_ms.store(threshold_ms, Ordering::Release); - } -} - -impl Default for StuckDetector { - fn default() -> Self { - Self::new(10_000) - } -} diff --git a/src/player/manager/monitor.rs b/src/player/manager/monitor.rs index df22bdfa..190eb3ac 100644 --- a/src/player/manager/monitor.rs +++ b/src/player/manager/monitor.rs @@ -4,8 +4,8 @@ use tracing::warn; use super::lyrics::sync_lyrics; use crate::{ - audio::playback::{PlaybackState, StuckDetector, TrackHandle}, - common::{types::GuildId, utils::now_ms}, + audio::playback::{PlaybackState, TrackHandle}, + common::types::GuildId, player::state::PlayerState, protocol::{ self, @@ -24,20 +24,21 @@ pub struct MonitorCtx { pub track: Track, pub stop_signal: Arc, pub ping: Arc, + pub stuck_threshold_ms: u64, pub update_every_n: u64, pub lyrics_subscribed: Arc, pub lyrics_data: Arc>>, pub last_lyric_index: Arc, pub end_time_ms: Option, - pub stuck_detector: Arc, } pub async fn monitor_loop(ctx: MonitorCtx) { let mut interval = tokio::time::interval(std::time::Duration::from_millis(500)); let mut tick: u64 = 0; - let mut last_pos = ctx.handle.get_position(); + let mut last_frame_received_at = std::time::Instant::now(); + let mut stuck_event_sent = false; - send_player_update(&ctx, last_pos); + send_player_update(&ctx, ctx.handle.get_position()); loop { interval.tick().await; @@ -65,17 +66,20 @@ pub async fn monitor_loop(ctx: MonitorCtx) { } if state == PlaybackState::Playing { - if cur_pos != last_pos { - ctx.stuck_detector.reset_stuck_flag(); - } - - if ctx.stuck_detector.check_stuck() { - send_stuck_event(&ctx, cur_pos); + if ctx.handle.is_buffering() { + if !stuck_event_sent { + check_stuck(&ctx, last_frame_received_at, cur_pos).await; + stuck_event_sent = true; + } + } else { + last_frame_received_at = std::time::Instant::now(); + stuck_event_sent = false; } + } else { + last_frame_received_at = std::time::Instant::now(); + stuck_event_sent = false; } - last_pos = cur_pos; - if tick.is_multiple_of(ctx.update_every_n) { send_player_update(&ctx, cur_pos); } @@ -154,23 +158,34 @@ async fn handle_track_end_marker(ctx: &MonitorCtx) { }); } -fn send_stuck_event(ctx: &MonitorCtx, cur_pos: u64) { - let threshold = ctx.stuck_detector.threshold_ms(); +async fn check_stuck( + ctx: &MonitorCtx, + last_frame_received_at: std::time::Instant, + cur_pos: u64, +) { + if ctx.handle.get_state() != PlaybackState::Playing { + return; + } - ctx.session.send_message(&protocol::OutgoingMessage::Event { - event: Box::new(RustalinkEvent::TrackStuck { - guild_id: ctx.guild_id.clone(), - track: ctx.track.clone(), - threshold_ms: threshold, - }), - }); + let elapsed_ms = last_frame_received_at.elapsed().as_millis() as u64; + let threshold = ctx.stuck_threshold_ms; - send_player_update(ctx, cur_pos); + if elapsed_ms >= threshold { + ctx.session.send_message(&protocol::OutgoingMessage::Event { + event: Box::new(RustalinkEvent::TrackStuck { + guild_id: ctx.guild_id.clone(), + track: ctx.track.clone(), + threshold_ms: threshold, + }), + }); - warn!( - "[{}] Track stuck: no frames received for >= {}ms", - ctx.guild_id, threshold - ); + send_player_update(ctx, cur_pos); + + warn!( + "[{}] Track stuck: no frames received for {}ms (threshold {}ms)", + ctx.guild_id, elapsed_ms, threshold + ); + } } fn send_player_update(ctx: &MonitorCtx, cur_pos: u64) { @@ -178,7 +193,7 @@ fn send_player_update(ctx: &MonitorCtx, cur_pos: u64) { .send_message(&protocol::OutgoingMessage::PlayerUpdate { guild_id: ctx.guild_id.clone(), state: PlayerState { - time: now_ms(), + time: crate::common::utils::now_ms(), position: cur_pos, connected: true, ping: ctx.ping.load(Ordering::Acquire), @@ -204,4 +219,4 @@ async fn clear_player_state(ctx: &MonitorCtx) { p.track_handle = None; } } -} +} \ No newline at end of file diff --git a/src/player/manager/start.rs b/src/player/manager/start.rs index 5227a1e6..056f4150 100644 --- a/src/player/manager/start.rs +++ b/src/player/manager/start.rs @@ -80,7 +80,7 @@ pub async fn start_playback(player: &mut PlayerContext, config: PlaybackStartCon send_load_failed( player, &config.session, - format!("Track resolution timed out: {}", identifier), + format!("Track resolution timed out: {identifier}"), ) .await; return; @@ -109,9 +109,6 @@ pub async fn start_playback(player: &mut PlayerContext, config: PlaybackStartCon is_buffering, player.config.clone(), ); - mixer - .stuck_detector - .set_threshold(player.config.stuck_threshold_ms); } player.track_handle = Some(handle.clone()); @@ -152,10 +149,6 @@ pub async fn start_playback(player: &mut PlayerContext, config: PlaybackStartCon player.guild_id.clone(), ); - let engine = player.engine.lock().await; - let mixer = engine.mixer.lock().await; - let stuck_detector = mixer.stuck_detector.clone(); - let ctx = MonitorCtx { guild_id: player.guild_id.clone(), handle: handle.clone(), @@ -164,12 +157,12 @@ pub async fn start_playback(player: &mut PlayerContext, config: PlaybackStartCon track: track_response, stop_signal: player.stop_signal.clone(), ping: player.ping.clone(), + stuck_threshold_ms: player.config.stuck_threshold_ms, update_every_n: (config.update_interval_secs * 2).max(1), lyrics_subscribed: player.lyrics_subscribed.clone(), lyrics_data: player.lyrics_data.clone(), last_lyric_index: player.last_lyric_index.clone(), end_time_ms: player.end_time, - stuck_detector, }; let track_task = tokio::spawn(monitor_loop(ctx)); @@ -217,4 +210,4 @@ async fn stop_current_track(player: &mut PlayerContext, session: &Session) { let engine = player.engine.lock().await; engine.mixer.lock().await.stop_all(); -} +} \ No newline at end of file diff --git a/src/protocol/codec/decode.rs b/src/protocol/codec/decode.rs index 369b167b..eaecaa0f 100644 --- a/src/protocol/codec/decode.rs +++ b/src/protocol/codec/decode.rs @@ -47,7 +47,6 @@ pub fn decode_track(encoded: &str) -> Result { }; let source_name = stream.read_string()?; - let position = stream.read_u64::()?; let user_data = stream.read_json().unwrap_or_else(|_| serde_json::json!({})); @@ -59,7 +58,7 @@ pub fn decode_track(encoded: &str) -> Result { author, length, is_stream, - position, + position: 0, title, uri, artwork_url, @@ -79,4 +78,4 @@ pub fn decode_playlist_info(encoded: &str) -> Result { name: stream.read_string()?, selected_track: stream.read_i32::()?, }) -} +} \ No newline at end of file diff --git a/src/sources/deezer/reader/mod.rs b/src/sources/deezer/reader/mod.rs index 19beba5c..9de92d39 100644 --- a/src/sources/deezer/reader/mod.rs +++ b/src/sources/deezer/reader/mod.rs @@ -5,7 +5,7 @@ pub mod remote_reader; use std::io::{Read, Seek, SeekFrom}; use symphonia::core::io::MediaSource; -use tracing::debug; +use tracing::{debug, warn}; use self::{ crypt::{CHUNK_SIZE, DeezerCrypt}, @@ -19,6 +19,7 @@ pub struct DeezerReader { raw_buf: Vec, ready_buf: Vec, skip_pending: usize, + decrypt_failures: u32, } impl DeezerReader { @@ -38,9 +39,10 @@ impl DeezerReader { source, crypt, pos: 0, - raw_buf: Vec::with_capacity(CHUNK_SIZE * 2), - ready_buf: Vec::with_capacity(CHUNK_SIZE * 2), + raw_buf: Vec::with_capacity(CHUNK_SIZE), + ready_buf: Vec::with_capacity(CHUNK_SIZE), skip_pending: 0, + decrypt_failures: 0, }) } } @@ -48,14 +50,12 @@ impl DeezerReader { impl Read for DeezerReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { loop { - // 1. Drain skip_pending if we have data if self.skip_pending > 0 && !self.ready_buf.is_empty() { let to_skip = std::cmp::min(self.skip_pending, self.ready_buf.len()); self.ready_buf.drain(..to_skip); self.skip_pending -= to_skip; } - // 2. Supply data from ready_buf if available if self.skip_pending == 0 && !self.ready_buf.is_empty() { let n = std::cmp::min(buf.len(), self.ready_buf.len()); buf[..n].copy_from_slice(&self.ready_buf[..n]); @@ -63,15 +63,19 @@ impl Read for DeezerReader { return Ok(n); } - // 3. Need more data from source let mut tmp = [0u8; CHUNK_SIZE]; - let n = self.source.read(&mut tmp)?; + let n = match self.source.read(&mut tmp) { + Ok(n) => n, + Err(e) => { + warn!("DeezerReader: failed to read from CDN: {}", e); + return Err(e); + } + }; if n == 0 { if self.raw_buf.is_empty() { return Ok(0); } - // Process remaining block even if it's smaller than CHUNK_SIZE let leftovers = self.raw_buf.clone(); let chunk_idx = self.pos / CHUNK_SIZE as u64; self.crypt @@ -83,12 +87,24 @@ impl Read for DeezerReader { self.raw_buf.extend_from_slice(&tmp[..n]); - // 4. Process all full chunks while self.raw_buf.len() >= CHUNK_SIZE { let chunk: Vec = self.raw_buf.drain(..CHUNK_SIZE).collect(); let chunk_idx = self.pos / CHUNK_SIZE as u64; + + let before_len = self.ready_buf.len(); self.crypt .decrypt_chunk(chunk_idx, &chunk, &mut self.ready_buf); + + if self.ready_buf.len() == before_len { + self.decrypt_failures += 1; + if self.decrypt_failures % 10 == 1 { + warn!( + "DeezerReader: {} decryption failures (chunk {}), track may be corrupted or key invalid", + self.decrypt_failures, chunk_idx + ); + } + } + self.pos += CHUNK_SIZE as u64; } } diff --git a/src/sources/youtube/hls/fetcher.rs b/src/sources/youtube/hls/fetcher.rs index e6b5f4c8..35dddd11 100644 --- a/src/sources/youtube/hls/fetcher.rs +++ b/src/sources/youtube/hls/fetcher.rs @@ -6,21 +6,44 @@ pub async fn fetch_segment_into( resource: &Resource, out: &mut Vec, ) -> AnyResult<()> { - let mut req = client.get(&resource.url).header("Accept", "*/*"); + for attempt in 0..3 { + let mut req = client.get(&resource.url).header("Accept", "*/*"); - if let Some(range) = &resource.range { - let end = range.offset + range.length - 1; - req = req.header("Range", format!("bytes={}-{}", range.offset, end)); - } - - let res = req.send().await?; + if let Some(range) = &resource.range { + let end = range.offset + range.length - 1; + req = req.header("Range", format!("bytes={}-{}", range.offset, end)); + } - if !res.status().is_success() { - return Err(format!("HLS fetch failed {}: {}", res.status(), resource.url).into()); + match req.timeout(std::time::Duration::from_secs(15)).send().await { + Ok(res) => { + if res.status().is_success() { + let bytes = res.bytes().await?; + out.extend_from_slice(&bytes); + return Ok(()); + } else { + if attempt < 2 { + tracing::warn!( + "HLS fetch attempt {} failed {}: {} - retrying...", + attempt + 1, + res.status(), + resource.url + ); + tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt + 1) as u64)).await; + } else { + return Err(format!("HLS fetch failed after 3 attempts {}: {}", res.status(), resource.url).into()); + } + } + } + Err(e) => { + if attempt < 2 { + tracing::warn!("HLS fetch attempt {} failed: {} - retrying...", attempt + 1, e); + tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt + 1) as u64)).await; + } else { + return Err(format!("HLS fetch failed after 3 attempts: {} - {}", e, resource.url).into()); + } + } + } } - let bytes = res.bytes().await?; - out.extend_from_slice(&bytes); - - Ok(()) -} + Err(format!("HLS fetch failed: all retry attempts exhausted for {}", resource.url).into()) +} \ No newline at end of file