Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ Methods:
- <code title="post /v1/devboxes/{id}/execute_async">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">execute_async</a>(id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_execute_async_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_async_execution_detail_view.py">DevboxAsyncExecutionDetailView</a></code>
- <code title="post /v1/devboxes/{id}/execute_sync">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">execute_sync</a>(id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_execute_sync_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_execution_detail_view.py">DevboxExecutionDetailView</a></code>
- <code title="post /v1/devboxes/{devbox_id}/executions/{execution_id}/kill">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">kill</a>(execution_id, \*, devbox_id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_kill_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_async_execution_detail_view.py">DevboxAsyncExecutionDetailView</a></code>
- <code title="get /v1/devboxes/{devbox_id}/executions/{execution_id}/stream_updates">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">stream_updates</a>(execution_id, \*, devbox_id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_stream_updates_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_async_execution_detail_view.py">DevboxAsyncExecutionDetailView</a></code>
- <code title="get /v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stdout_updates">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">stream_stdout_updates</a>(execution_id, \*, devbox_id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_stream_updates_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_async_execution_detail_view.py">DevboxAsyncExecutionDetailView</a></code>

# Scenarios

Expand Down
191 changes: 189 additions & 2 deletions src/runloop_api_client/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,31 @@
import json
import inspect
from types import TracebackType
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Iterator, AsyncIterator, cast
from typing_extensions import Self, Protocol, TypeGuard, override, get_origin, runtime_checkable
from typing import (
TYPE_CHECKING,
Any,
Generic,
TypeVar,
Callable,
Iterator,
Optional,
Awaitable,
AsyncIterator,
cast,
)
from typing_extensions import (
Self,
Protocol,
TypeGuard,
override,
get_origin,
runtime_checkable,
)

import httpx

from ._utils import extract_type_var_from_base
from ._exceptions import APIStatusError, APITimeoutError

if TYPE_CHECKING:
from ._client import Runloop, AsyncRunloop
Expand Down Expand Up @@ -55,6 +74,17 @@ def __stream__(self) -> Iterator[_T]:
iterator = self._iter_events()

for sse in iterator:
# Surface server-sent error events as API errors to allow callers to handle/retry
if sse.event == "error":
try:
error_obj = json.loads(sse.data)
status_code = int(error_obj.get("code", 500))
# Build a synthetic response to mirror normal error handling
fake_resp = httpx.Response(status_code, request=response.request, content=sse.data)
except Exception:
fake_resp = httpx.Response(500, request=response.request, content=sse.data)
raise self._client._make_status_error_from_response(fake_resp)

yield process_data(data=sse.json(), cast_to=cast_to, response=response)

# Ensure the entire stream is consumed
Expand Down Expand Up @@ -119,6 +149,17 @@ async def __stream__(self) -> AsyncIterator[_T]:
iterator = self._iter_events()

async for sse in iterator:
# Surface server-sent error events as API errors to allow callers to handle/retry
if sse.event == "error":
try:
error_obj = json.loads(sse.data)
status_code = int(error_obj.get("code", 500))
# Build a synthetic response to mirror normal error handling
fake_resp = httpx.Response(status_code, request=response.request, content=sse.data)
except Exception:
fake_resp = httpx.Response(500, request=response.request, content=sse.data)
raise self._client._make_status_error_from_response(fake_resp)

yield process_data(data=sse.json(), cast_to=cast_to, response=response)

# Ensure the entire stream is consumed
Expand Down Expand Up @@ -331,3 +372,149 @@ class MyStream(Stream[bytes]):
generic_bases=cast("tuple[type, ...]", (Stream, AsyncStream)),
failure_message=failure_message,
)


class ReconnectingStream(Generic[_T]):
"""Wraps a Stream with automatic reconnection on timeout (HTTP 408) or read timeouts.

The reconnection uses the last observed offset from each item, as provided by
the given `get_offset` callback. The `stream_creator` will be called with the
last known offset to resume the stream.
"""

def __init__(
self,
*,
current_stream: Stream[_T],
stream_creator: Callable[[Optional[str]], Stream[_T]],
get_offset: Callable[[_T], Optional[str]],
) -> None:
self._current_stream = current_stream
self._stream_creator = stream_creator
self._get_offset = get_offset
self._last_offset: Optional[str] = None
self._iterator = self.__stream__()

@property
def response(self) -> httpx.Response:
return self._current_stream.response

def __next__(self) -> _T:
return self._iterator.__next__()

def __iter__(self) -> Iterator[_T]:
for item in self._iterator:
yield item

def __enter__(self) -> "ReconnectingStream[_T]":
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()

def close(self) -> None:
self._current_stream.close()

def __stream__(self) -> Iterator[_T]:
while True:
try:
for item in self._current_stream:
offset = self._get_offset(item)
if offset is not None:
self._last_offset = offset
yield item
return
except Exception as e:
# Reconnect on timeouts
should_reconnect = False
if isinstance(e, APITimeoutError):
should_reconnect = True
elif isinstance(e, APIStatusError) and getattr(e, "status_code", None) == 408:
should_reconnect = True
elif isinstance(e, httpx.TimeoutException):
should_reconnect = True

if should_reconnect:
# Close existing response before reconnecting
try:
self._current_stream.close()
except Exception:
pass
self._current_stream = self._stream_creator(self._last_offset)
continue
raise


class AsyncReconnectingStream(Generic[_T]):
"""Async variant of ReconnectingStream supporting auto-reconnect on timeouts."""

def __init__(
self,
*,
current_stream: AsyncStream[_T],
stream_creator: Callable[[Optional[str]], Awaitable[AsyncStream[_T]]],
get_offset: Callable[[_T], Optional[str]],
) -> None:
self._current_stream = current_stream
self._stream_creator = stream_creator
self._get_offset = get_offset
self._last_offset: Optional[str] = None
self._iterator = self.__stream__()

@property
def response(self) -> httpx.Response:
return self._current_stream.response

async def __anext__(self) -> _T:
return await self._iterator.__anext__()

async def __aiter__(self) -> AsyncIterator[_T]:
async for item in self._iterator:
yield item

async def __aenter__(self) -> "AsyncReconnectingStream[_T]":
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self.close()

async def close(self) -> None:
await self._current_stream.close()

async def __stream__(self) -> AsyncIterator[_T]:
while True:
try:
async for item in self._current_stream:
offset = self._get_offset(item)
if offset is not None:
self._last_offset = offset
yield item
return
except Exception as e:
# Reconnect on timeouts
should_reconnect = False
if isinstance(e, APITimeoutError):
should_reconnect = True
elif isinstance(e, APIStatusError) and getattr(e, "status_code", None) == 408:
should_reconnect = True
elif isinstance(e, httpx.TimeoutException):
should_reconnect = True

if should_reconnect:
try:
await self._current_stream.close()
except Exception:
pass
self._current_stream = await self._stream_creator(self._last_offset)
continue
raise
4 changes: 2 additions & 2 deletions src/runloop_api_client/resources/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def create_and_await_build_complete(
launch_parameters: Optional[LaunchParameters] | NotGiven = NOT_GIVEN,
polling_config: PollingConfig | None = None,
services: Optional[Iterable[blueprint_create_params.Service]] | NotGiven = NOT_GIVEN,
system_setup_commands: Optional[List[str]] | NotGiven = NOT_GIVEN,
system_setup_commands: Optional[SequenceNotStr[str]] | NotGiven = NOT_GIVEN,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
extra_headers: Headers | None = None,
Expand Down Expand Up @@ -766,7 +766,7 @@ async def create_and_await_build_complete(
launch_parameters: Optional[LaunchParameters] | NotGiven = NOT_GIVEN,
polling_config: PollingConfig | None = None,
services: Optional[Iterable[blueprint_create_params.Service]] | NotGiven = NOT_GIVEN,
system_setup_commands: Optional[List[str]] | NotGiven = NOT_GIVEN,
system_setup_commands: Optional[SequenceNotStr[str]] | NotGiven = NOT_GIVEN,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
extra_headers: Headers | None = None,
Expand Down
Loading