diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 0cf294b054..00c7082942 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -383,6 +383,20 @@ async def cancel(self): """Stops the running pipeline immediately.""" await self._cancel() + async def force_cancel(self): + """Forcefully cancel the task and any running subtasks.""" + logger.debug(f"Force canceling task {self} triggered") + try: + await self.cancel() + except Exception as e: + logger.warning(f"Error calling pipecat task cancel {self}: {e}") + + for task in self._task_manager.current_tasks(): + try: + await self._task_manager.cancel_task(task) + except Exception as e: + logger.warning(f"Error canceling task {self}: {e}") + async def run(self, params: PipelineTaskParams): """Starts and manages the pipeline execution until completion or cancellation.""" if self.has_finished(): diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 40016aaa0d..13f90419f3 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -20,6 +20,7 @@ EmulateUserStartedSpeakingFrame, EmulateUserStoppedSpeakingFrame, EndFrame, + ErrorFrame, Frame, FunctionCallCancelFrame, FunctionCallInProgressFrame, diff --git a/src/pipecat/services/cartesia/stt.py b/src/pipecat/services/cartesia/stt.py index 104e4b2c5d..b7c9a0add2 100644 --- a/src/pipecat/services/cartesia/stt.py +++ b/src/pipecat/services/cartesia/stt.py @@ -123,11 +123,60 @@ async def cancel(self, frame: CancelFrame): await self._disconnect() async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - # If the connection is closed, due to timeout, we need to reconnect when the user starts speaking again - if not self._connection or self._connection.closed: + """Send an audio chunk for transcription, maintaining a resilient connection. + + The Cartesia WebSocket can be closed by the remote side at any time. In that + scenario, ``websockets`` raises a ``ConnectionClosed`` (or + ``ConnectionClosedOK``) exception on ``send``. Propagating that + exception up the stack aborts the whole Pipecat pipeline, resulting in + noisy logs like: + + websockets.exceptions.ConnectionClosedOK: sent 1000 (OK); then received 1000 (OK) + + Instead, we catch the closure here, perform a best-effort reconnect, and + silently drop the current audio buffer. The next audio buffer will be + delivered over the fresh connection. + """ + + try: + # If the processor is cancelling/end-of-pipeline we skip sending + # altogether because the connection is expected to be closed. + if self._cancelling: + # Consume the audio frame silently and exit early. + yield None + return + + # (Re)connect only when we are still active. + if not self._connection or self._connection.closed: + await self._connect() + + # Attempt to send the audio payload + await self._connection.send(audio) + except ( + websockets.exceptions.ConnectionClosedOK, + websockets.exceptions.ConnectionClosedError, + websockets.exceptions.ConnectionClosed, + ) as e: + # Connection closed gracefully or with an error – attempt to + # reconnect so we can continue processing without failing the + # entire pipeline. + logger.warning(f"{self}: WebSocket closed during send: {e}. Reconnecting.") + try: + await self._disconnect() + except Exception: + # Ignore additional errors while cleaning up + pass + await self._connect() + # We purposefully *do not* resend the audio buffer: by the time the + # reconnection succeeds the buffer would be outdated and resending + # could confuse the transcription stream. + except Exception as e: + # Catch-all to prevent the exception from bubbling up and killing + # the pipeline. We log the error and try to keep going. + logger.error(f"{self}: Unexpected error while sending audio: {e}") - await self._connection.send(audio) + # Always yield once so the caller can keep iterating over the generator yield None async def _connect(self): diff --git a/src/pipecat/services/deepgram/stt.py b/src/pipecat/services/deepgram/stt.py index da7ec535f3..7bb90c3a88 100644 --- a/src/pipecat/services/deepgram/stt.py +++ b/src/pipecat/services/deepgram/stt.py @@ -7,6 +7,8 @@ """Deepgram speech-to-text service implementation.""" from typing import AsyncGenerator, Dict, Optional +import asyncio +import contextlib from loguru import logger @@ -228,9 +230,35 @@ async def _connect(self): logger.error(f"{self}: unable to connect to Deepgram") async def _disconnect(self): - if self._connection.is_connected: - logger.debug("Disconnecting from Deepgram") - await self._connection.finish() + """Initiate a graceful disconnect but never block on Deepgram SDK internals. + + The Deepgram SDK occasionally hangs in `finish()` when the TCP socket has + already been torn down by the far-end. To ensure pipeline shutdown can + always proceed we fire-and-forget the `finish()` coroutine and return + control immediately. + """ + if not self._connection.is_connected: + return + + logger.debug("Disconnecting from Deepgram – v3 (non-blocking)") + + async def _finish_safe(): + try: + await asyncio.wait_for(self._connection.finish(), timeout=4) + logger.debug("Deepgram finish() completed") + except (asyncio.TimeoutError, asyncio.CancelledError): + logger.warning("Deepgram finish() did not complete in 4s – closing socket") + with contextlib.suppress(Exception): + await self._connection.close() + except Exception as e: + logger.warning(f"Deepgram finish() raised {e} – closing socket") + with contextlib.suppress(Exception): + await self._connection.close() + finally: + logger.debug("Deepgram disconnect task done") + + # Run in background – shutdown must not wait for this. + asyncio.create_task(_finish_safe()) async def start_metrics(self): """Start TTFB and processing metrics collection.""" diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index e502449862..d7f1afb11d 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -7,6 +7,7 @@ """Base classes for Text-to-speech services.""" import asyncio +import re from abc import abstractmethod from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional, Sequence, Tuple @@ -394,8 +395,15 @@ async def _push_tts_frames(self, text: str): await filter.reset_interruption() text = await filter.filter(text) - if text: - await self.process_generator(self.run_tts(text)) + if text and text.strip(): + # Additional check: ensure text contains at least one alphanumeric character + # This prevents sending text that's only punctuation (which Cartesia rejects) + if re.search(r'[a-zA-Z0-9]', text): + await self.process_generator(self.run_tts(text)) + else: + logger.debug(f"Skipping TTS for text with only punctuation/whitespace: '{text}'") + else: + logger.debug(f"Skipping TTS for empty/whitespace text after filtering: '{text}'") await self.stop_processing_metrics() diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 36d0536d79..af9c543634 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -22,6 +22,7 @@ BotStoppedSpeakingFrame, CancelFrame, EndFrame, + ErrorFrame, Frame, MixerControlFrame, OutputAudioRawFrame, diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index 5ddaacff77..4c328a9bd5 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -71,23 +71,34 @@ def receive(self) -> typing.AsyncIterator[bytes | str]: return self._websocket.iter_bytes() if self._is_binary else self._websocket.iter_text() async def send(self, data: str | bytes): - try: - if self._can_send(): - if self._is_binary: - await self._websocket.send_bytes(data) - else: - await self._websocket.send_text(data) - except Exception as e: - logger.error( - f"{self} exception sending data: {e.__class__.__name__} ({e}), application_state: {self._websocket.application_state}" - ) - # For some reason the websocket is disconnected, and we are not able to send data - # So let's properly handle it and disconnect the transport - if self._websocket.application_state == WebSocketState.DISCONNECTED: - logger.warning("Closing already disconnected websocket!") - self._closing = True - await self.trigger_client_disconnected() - + if self._can_send(): + if self._is_binary: + await self._websocket.send_bytes(data) + else: + await self._websocket.send_text(data) + + # NOTE: HL OVERRIDE, this Fix below from pipecat + # https://github.com/pipecat-ai/pipecat/commit/74280829fcf5e03754ba5263162ff70eaabcdc4a + # can catch the error but causes occasional stalling + # fall back using the HL Override below in FastAPIWebsocketOutputTransport + + # try: + # if self._can_send(): + # if self._is_binary: + # await self._websocket.send_bytes(data) + # else: + # await self._websocket.send_text(data) + # except Exception as e: + # logger.error( + # f"{self} exception sending data: {e.__class__.__name__} ({e}), application_state: {self._websocket.application_state}" + # ) + # # For some reason the websocket is disconnected, and we are not able to send data + # # So let's properly handle it and disconnect the transport + # if self._websocket.application_state == WebSocketState.DISCONNECTED: + # logger.warning("Closing already disconnected websocket!") + # self._closing = True + # await self.trigger_client_disconnected() + async def disconnect(self): self._leave_counter -= 1 if self._leave_counter > 0: @@ -311,7 +322,19 @@ async def _write_frame(self, frame: Frame): if payload: await self._client.send(payload) except Exception as e: + # NOTE: HL OVERRIDE, detect closed websocket and shut down pipeline + msg = str(e).lower() + if ( + ("cannot call" in msg and "once a close message has been sent" in msg) or + self._client._websocket.application_state == WebSocketState.DISCONNECTED + ): + logger.warning(f"{self} detected closed websocket, shutting down pipeline") + await self._client.disconnect() + await self.cleanup() + return + # Otherwise, log as before logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})") + async def _write_audio_sleep(self): # Simulate a clock. diff --git a/src/pipecat/utils/asyncio/task_manager.py b/src/pipecat/utils/asyncio/task_manager.py index 8445361867..4589bf8036 100644 --- a/src/pipecat/utils/asyncio/task_manager.py +++ b/src/pipecat/utils/asyncio/task_manager.py @@ -240,6 +240,12 @@ async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None) """ name = task.get_name() + + # Avoid awaiting on ourselves which would lead to a recursion error. + if task is asyncio.current_task(): + task.cancel() + return + task.cancel() try: # Make sure to reset watchdog if a task is cancelled.