Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,20 @@ async def cancel(self):
"""Stops the running pipeline immediately."""
await self._cancel()

async def force_cancel(self):
"""Forcefully cancel the task and any running subtasks."""
logger.debug(f"Force canceling task {self} triggered")
try:
await self.cancel()
except Exception as e:
logger.warning(f"Error calling pipecat task cancel {self}: {e}")

for task in self._task_manager.current_tasks():
try:
await self._task_manager.cancel_task(task)
except Exception as e:
logger.warning(f"Error canceling task {self}: {e}")

async def run(self, params: PipelineTaskParams):
"""Starts and manages the pipeline execution until completion or cancellation."""
if self.has_finished():
Expand Down
1 change: 1 addition & 0 deletions src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
EmulateUserStartedSpeakingFrame,
EmulateUserStoppedSpeakingFrame,
EndFrame,
ErrorFrame,
Frame,
FunctionCallCancelFrame,
FunctionCallInProgressFrame,
Expand Down
55 changes: 52 additions & 3 deletions src/pipecat/services/cartesia/stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,60 @@ async def cancel(self, frame: CancelFrame):
await self._disconnect()

async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
# If the connection is closed, due to timeout, we need to reconnect when the user starts speaking again
if not self._connection or self._connection.closed:
"""Send an audio chunk for transcription, maintaining a resilient connection.

The Cartesia WebSocket can be closed by the remote side at any time. In that
scenario, ``websockets`` raises a ``ConnectionClosed`` (or
``ConnectionClosedOK``) exception on ``send``. Propagating that
exception up the stack aborts the whole Pipecat pipeline, resulting in
noisy logs like:

websockets.exceptions.ConnectionClosedOK: sent 1000 (OK); then received 1000 (OK)

Instead, we catch the closure here, perform a best-effort reconnect, and
silently drop the current audio buffer. The next audio buffer will be
delivered over the fresh connection.
"""

try:
# If the processor is cancelling/end-of-pipeline we skip sending
# altogether because the connection is expected to be closed.
if self._cancelling:
# Consume the audio frame silently and exit early.
yield None
return

# (Re)connect only when we are still active.
if not self._connection or self._connection.closed:
await self._connect()

# Attempt to send the audio payload
await self._connection.send(audio)
except (
websockets.exceptions.ConnectionClosedOK,
websockets.exceptions.ConnectionClosedError,
websockets.exceptions.ConnectionClosed,
) as e:
# Connection closed gracefully or with an error – attempt to
# reconnect so we can continue processing without failing the
# entire pipeline.
logger.warning(f"{self}: WebSocket closed during send: {e}. Reconnecting.")
try:
await self._disconnect()
except Exception:
# Ignore additional errors while cleaning up
pass

await self._connect()
# We purposefully *do not* resend the audio buffer: by the time the
# reconnection succeeds the buffer would be outdated and resending
# could confuse the transcription stream.
except Exception as e:
# Catch-all to prevent the exception from bubbling up and killing
# the pipeline. We log the error and try to keep going.
logger.error(f"{self}: Unexpected error while sending audio: {e}")

await self._connection.send(audio)
# Always yield once so the caller can keep iterating over the generator
yield None

async def _connect(self):
Expand Down
34 changes: 31 additions & 3 deletions src/pipecat/services/deepgram/stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"""Deepgram speech-to-text service implementation."""

from typing import AsyncGenerator, Dict, Optional
import asyncio
import contextlib

from loguru import logger

Expand Down Expand Up @@ -228,9 +230,35 @@ async def _connect(self):
logger.error(f"{self}: unable to connect to Deepgram")

async def _disconnect(self):
if self._connection.is_connected:
logger.debug("Disconnecting from Deepgram")
await self._connection.finish()
"""Initiate a graceful disconnect but never block on Deepgram SDK internals.

The Deepgram SDK occasionally hangs in `finish()` when the TCP socket has
already been torn down by the far-end. To ensure pipeline shutdown can
always proceed we fire-and-forget the `finish()` coroutine and return
control immediately.
"""
if not self._connection.is_connected:
return

logger.debug("Disconnecting from Deepgram – v3 (non-blocking)")

async def _finish_safe():
try:
await asyncio.wait_for(self._connection.finish(), timeout=4)
logger.debug("Deepgram finish() completed")
except (asyncio.TimeoutError, asyncio.CancelledError):
logger.warning("Deepgram finish() did not complete in 4s – closing socket")
with contextlib.suppress(Exception):
await self._connection.close()
except Exception as e:
logger.warning(f"Deepgram finish() raised {e} – closing socket")
with contextlib.suppress(Exception):
await self._connection.close()
finally:
logger.debug("Deepgram disconnect task done")

# Run in background – shutdown must not wait for this.
asyncio.create_task(_finish_safe())

async def start_metrics(self):
"""Start TTFB and processing metrics collection."""
Expand Down
12 changes: 10 additions & 2 deletions src/pipecat/services/tts_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""Base classes for Text-to-speech services."""

import asyncio
import re
from abc import abstractmethod
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional, Sequence, Tuple

Expand Down Expand Up @@ -394,8 +395,15 @@ async def _push_tts_frames(self, text: str):
await filter.reset_interruption()
text = await filter.filter(text)

if text:
await self.process_generator(self.run_tts(text))
if text and text.strip():
# Additional check: ensure text contains at least one alphanumeric character
# This prevents sending text that's only punctuation (which Cartesia rejects)
if re.search(r'[a-zA-Z0-9]', text):
await self.process_generator(self.run_tts(text))
else:
logger.debug(f"Skipping TTS for text with only punctuation/whitespace: '{text}'")
else:
logger.debug(f"Skipping TTS for empty/whitespace text after filtering: '{text}'")

await self.stop_processing_metrics()

Expand Down
1 change: 1 addition & 0 deletions src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
MixerControlFrame,
OutputAudioRawFrame,
Expand Down
57 changes: 40 additions & 17 deletions src/pipecat/transports/network/fastapi_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,34 @@ def receive(self) -> typing.AsyncIterator[bytes | str]:
return self._websocket.iter_bytes() if self._is_binary else self._websocket.iter_text()

async def send(self, data: str | bytes):
try:
if self._can_send():
if self._is_binary:
await self._websocket.send_bytes(data)
else:
await self._websocket.send_text(data)
except Exception as e:
logger.error(
f"{self} exception sending data: {e.__class__.__name__} ({e}), application_state: {self._websocket.application_state}"
)
# For some reason the websocket is disconnected, and we are not able to send data
# So let's properly handle it and disconnect the transport
if self._websocket.application_state == WebSocketState.DISCONNECTED:
logger.warning("Closing already disconnected websocket!")
self._closing = True
await self.trigger_client_disconnected()

if self._can_send():
if self._is_binary:
await self._websocket.send_bytes(data)
else:
await self._websocket.send_text(data)

# NOTE: HL OVERRIDE, this Fix below from pipecat
# https://github.com/pipecat-ai/pipecat/commit/74280829fcf5e03754ba5263162ff70eaabcdc4a
# can catch the error but causes occasional stalling
# fall back using the HL Override below in FastAPIWebsocketOutputTransport

# try:
# if self._can_send():
# if self._is_binary:
# await self._websocket.send_bytes(data)
# else:
# await self._websocket.send_text(data)
# except Exception as e:
# logger.error(
# f"{self} exception sending data: {e.__class__.__name__} ({e}), application_state: {self._websocket.application_state}"
# )
# # For some reason the websocket is disconnected, and we are not able to send data
# # So let's properly handle it and disconnect the transport
# if self._websocket.application_state == WebSocketState.DISCONNECTED:
# logger.warning("Closing already disconnected websocket!")
# self._closing = True
# await self.trigger_client_disconnected()

async def disconnect(self):
self._leave_counter -= 1
if self._leave_counter > 0:
Expand Down Expand Up @@ -311,7 +322,19 @@ async def _write_frame(self, frame: Frame):
if payload:
await self._client.send(payload)
except Exception as e:
# NOTE: HL OVERRIDE, detect closed websocket and shut down pipeline
msg = str(e).lower()
if (
("cannot call" in msg and "once a close message has been sent" in msg) or
self._client._websocket.application_state == WebSocketState.DISCONNECTED
):
logger.warning(f"{self} detected closed websocket, shutting down pipeline")
await self._client.disconnect()
await self.cleanup()
return
# Otherwise, log as before
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")


async def _write_audio_sleep(self):
# Simulate a clock.
Expand Down
6 changes: 6 additions & 0 deletions src/pipecat/utils/asyncio/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None)

"""
name = task.get_name()

# Avoid awaiting on ourselves which would lead to a recursion error.
if task is asyncio.current_task():
task.cancel()
return

task.cancel()
try:
# Make sure to reset watchdog if a task is cancelled.
Expand Down