feat: add Pulse STT support for smallest.ai pulse (streaming + pre-recorded) #4858
feat: add Pulse STT support for smallest.ai pulse (streaming + pre-recorded) #4858mahimairaja wants to merge 20 commits intolivekit:mainfrom
Conversation
|
Tested prerecorded: import asyncio
from pathlib import Path
import aiohttp
from dotenv import load_dotenv
from livekit.agents import utils
from livekit.plugins import smallestai
load_dotenv()
async def main():
wav = Path(__file__).resolve().parent / "sample.wav"
async with aiohttp.ClientSession() as session:
stt = smallestai.STT(language="en", http_session=session)
frames = [
f
async for f in utils.audio.audio_frames_from_file(
str(wav), sample_rate=16000, num_channels=1
)
]
event = await stt.recognize(frames)
print(event.alternatives[0].text if event.alternatives else "")
if __name__ == "__main__":
asyncio.run(main()) |
|
Testing streaming: from dotenv import load_dotenv
from livekit import agents
from livekit.agents import Agent, AgentServer, AgentSession, room_io
from livekit.plugins import silero
from livekit.plugins.openai.llm import LLM
from livekit.plugins.smallestai.stt import STT
from livekit.plugins.smallestai.tts import TTS
from livekit.plugins.turn_detector.english import EnglishModel
load_dotenv()
class Assistant(Agent):
def __init__(self) -> None:
super().__init__(
instructions="""You are a helpful voice AI assistant.""",
)
server = AgentServer()
@server.rtc_session(agent_name="my-agent")
async def my_agent(ctx: agents.JobContext):
session = AgentSession(
stt=STT(),
llm=LLM(model="gpt-4.1-mini"),
tts=TTS(),
vad=silero.VAD.load(),
turn_detection=EnglishModel(),
)
await session.start(
room=ctx.room,
agent=Assistant(),
room_options=room_io.RoomOptions(),
)
await session.generate_reply(instructions="Greet the user and offer your assistance.")
if __name__ == "__main__":
agents.cli.run_app(server) |
|
After conversations with @ harshitajain165 from smallest.ai, I came to know that few more steps needed for streaming support from the smallest server. for now I am moving this PR to draft. |
| if self._is_last_event.is_set(): | ||
| closing_ws = True | ||
| return |
There was a problem hiding this comment.
π΄ recv_task early return on is_last leaves keepalive_task blocking tasks_group, causing stream to hang
After both send_task and recv_task complete normally, keepalive_task continues running indefinitely, preventing asyncio.gather from completing. This causes _run to never return, _event_ch to never close, and any consumer iterating the speech stream to hang after receiving all transcripts.
Root Cause and Detailed Walkthrough
The shutdown sequence proceeds as follows:
send_taskexhaustsself._input_ch, sends the END message (stt.py:366-368), and returns normally.- The server processes the end signal and sends a final transcript with
is_last=True. recv_taskprocesses this event,_process_stream_eventsetsself._is_last_event(stt.py:524-525), andrecv_taskreturns early atstt.py:400-402:if self._is_last_event.is_set(): closing_ws = True return
keepalive_task(stt.py:327-333) is still running β it pings every 30 seconds and only exits whenws.ping()raises an exception.tasks_group = asyncio.gather(*tasks)atstt.py:416requires ALL three tasks to complete. Sincekeepalive_taskis still alive, the gather never resolves.asyncio.waitatstt.py:419-422blocks forever (or untilkeepalive_task's next ping detects a closed connection, which may take up to 30 seconds β or never, if the server doesn't close the WebSocket)._runnever returns β_main_tasknever returns β_event_chnever closes β consumer'sasync for event in streamhangs after the last real event.
Compare with the Deepgram plugin (livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/stt.py:531-559): Deepgram's recv_task does NOT exit early β it continues to call ws.receive() until the WebSocket is actually closed by the server, which naturally causes keepalive_task to fail on its next send and exit.
Impact: In standalone stream usage (e.g. async for event in stt.stream(): ...), the iteration hangs indefinitely after all transcripts are received. In AgentSession usage, the hang is masked by explicit aclose() calls, but cleanup is still delayed.
Prompt for agents
In livekit-plugins/livekit-plugins-smallestai/livekit/plugins/smallestai/stt.py, the recv_task function (lines 371-402) returns early when is_last_event is set (lines 400-402), but this leaves keepalive_task running and blocking asyncio.gather from completing.
The fix: instead of returning early from recv_task, continue the while loop to let the WebSocket close naturally. Since closing_ws is already set to True at line 401, the existing close-frame handler at lines 376-382 will cleanly return when the server closes the connection. This matches the Deepgram plugin's pattern.
Replace lines 400-402:
if self._is_last_event.is_set():
closing_ws = True
return
With:
if self._is_last_event.is_set():
closing_ws = True
# Don't return early; continue loop so ws.receive() sees
# the server-side close frame, which also lets keepalive_task
# detect the closed connection and exit.
Was this helpful? React with π or π to provide feedback.
What does this PR does?
Adds Speech-to-Text (STT) support to the
livekit-plugins-smallestaiplugin using Smallest AI's Pulse STT API. The existing plugin only supported TTS, this PR brings it to parity with plugins like Deepgram, ElevenLabs, and Soniox that offer both TTS and STT.Closes #4856
Summary of Changes
New:
STTclass (stt.py)/api/v1/pulse/get_text)wss://waves-api.smallest.ai/api/v1/pulse/get_text)language="multi")streaming=True,interim_results=TrueNew:
SpeechStreamclass (stt.py)AudioByteStream(~4096 byte chunks per Smallest AI docs)START_OF_SPEECHβINTERIM_TRANSCRIPT/FINAL_TRANSCRIPTβEND_OF_SPEECH{"type": "end"}signalingUsage
Configuration via
SMALLEST_API_KEYenvironment variable (same key used for TTS).Testing
language="en"andlanguage="multi"(auto-detection)API Reference