Skip to content

Commit 3048f8b

Browse files
authored
Merge pull request #638 from runloopai/alb/retry-execution-log-stream
Update Execution Streaming APIs and add retries on timeouts
2 parents 742357d + ddcf283 commit 3048f8b

File tree

5 files changed

+725
-68
lines changed

5 files changed

+725
-68
lines changed

api.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ Methods:
266266
- <code title="post /v1/devboxes/{id}/execute_async">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">execute_async</a>(id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_execute_async_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_async_execution_detail_view.py">DevboxAsyncExecutionDetailView</a></code>
267267
- <code title="post /v1/devboxes/{id}/execute_sync">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">execute_sync</a>(id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_execute_sync_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_execution_detail_view.py">DevboxExecutionDetailView</a></code>
268268
- <code title="post /v1/devboxes/{devbox_id}/executions/{execution_id}/kill">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">kill</a>(execution_id, \*, devbox_id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_kill_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_async_execution_detail_view.py">DevboxAsyncExecutionDetailView</a></code>
269-
- <code title="get /v1/devboxes/{devbox_id}/executions/{execution_id}/stream_updates">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">stream_updates</a>(execution_id, \*, devbox_id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_stream_updates_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_async_execution_detail_view.py">DevboxAsyncExecutionDetailView</a></code>
269+
- <code title="get /v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stdout_updates">client.devboxes.executions.<a href="./src/runloop_api_client/resources/devboxes/executions.py">stream_stdout_updates</a>(execution_id, \*, devbox_id, \*\*<a href="src/runloop_api_client/types/devboxes/execution_stream_updates_params.py">params</a>) -> <a href="./src/runloop_api_client/types/devbox_async_execution_detail_view.py">DevboxAsyncExecutionDetailView</a></code>
270270

271271
# Scenarios
272272

src/runloop_api_client/_streaming.py

Lines changed: 189 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,31 @@
44
import json
55
import inspect
66
from types import TracebackType
7-
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Iterator, AsyncIterator, cast
8-
from typing_extensions import Self, Protocol, TypeGuard, override, get_origin, runtime_checkable
7+
from typing import (
8+
TYPE_CHECKING,
9+
Any,
10+
Generic,
11+
TypeVar,
12+
Callable,
13+
Iterator,
14+
Optional,
15+
Awaitable,
16+
AsyncIterator,
17+
cast,
18+
)
19+
from typing_extensions import (
20+
Self,
21+
Protocol,
22+
TypeGuard,
23+
override,
24+
get_origin,
25+
runtime_checkable,
26+
)
927

1028
import httpx
1129

1230
from ._utils import extract_type_var_from_base
31+
from ._exceptions import APIStatusError, APITimeoutError
1332

1433
if TYPE_CHECKING:
1534
from ._client import Runloop, AsyncRunloop
@@ -55,6 +74,17 @@ def __stream__(self) -> Iterator[_T]:
5574
iterator = self._iter_events()
5675

5776
for sse in iterator:
77+
# Surface server-sent error events as API errors to allow callers to handle/retry
78+
if sse.event == "error":
79+
try:
80+
error_obj = json.loads(sse.data)
81+
status_code = int(error_obj.get("code", 500))
82+
# Build a synthetic response to mirror normal error handling
83+
fake_resp = httpx.Response(status_code, request=response.request, content=sse.data)
84+
except Exception:
85+
fake_resp = httpx.Response(500, request=response.request, content=sse.data)
86+
raise self._client._make_status_error_from_response(fake_resp)
87+
5888
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
5989

6090
# Ensure the entire stream is consumed
@@ -119,6 +149,17 @@ async def __stream__(self) -> AsyncIterator[_T]:
119149
iterator = self._iter_events()
120150

121151
async for sse in iterator:
152+
# Surface server-sent error events as API errors to allow callers to handle/retry
153+
if sse.event == "error":
154+
try:
155+
error_obj = json.loads(sse.data)
156+
status_code = int(error_obj.get("code", 500))
157+
# Build a synthetic response to mirror normal error handling
158+
fake_resp = httpx.Response(status_code, request=response.request, content=sse.data)
159+
except Exception:
160+
fake_resp = httpx.Response(500, request=response.request, content=sse.data)
161+
raise self._client._make_status_error_from_response(fake_resp)
162+
122163
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
123164

124165
# Ensure the entire stream is consumed
@@ -331,3 +372,149 @@ class MyStream(Stream[bytes]):
331372
generic_bases=cast("tuple[type, ...]", (Stream, AsyncStream)),
332373
failure_message=failure_message,
333374
)
375+
376+
377+
class ReconnectingStream(Generic[_T]):
378+
"""Wraps a Stream with automatic reconnection on timeout (HTTP 408) or read timeouts.
379+
380+
The reconnection uses the last observed offset from each item, as provided by
381+
the given `get_offset` callback. The `stream_creator` will be called with the
382+
last known offset to resume the stream.
383+
"""
384+
385+
def __init__(
386+
self,
387+
*,
388+
current_stream: Stream[_T],
389+
stream_creator: Callable[[Optional[str]], Stream[_T]],
390+
get_offset: Callable[[_T], Optional[str]],
391+
) -> None:
392+
self._current_stream = current_stream
393+
self._stream_creator = stream_creator
394+
self._get_offset = get_offset
395+
self._last_offset: Optional[str] = None
396+
self._iterator = self.__stream__()
397+
398+
@property
399+
def response(self) -> httpx.Response:
400+
return self._current_stream.response
401+
402+
def __next__(self) -> _T:
403+
return self._iterator.__next__()
404+
405+
def __iter__(self) -> Iterator[_T]:
406+
for item in self._iterator:
407+
yield item
408+
409+
def __enter__(self) -> "ReconnectingStream[_T]":
410+
return self
411+
412+
def __exit__(
413+
self,
414+
exc_type: type[BaseException] | None,
415+
exc: BaseException | None,
416+
exc_tb: TracebackType | None,
417+
) -> None:
418+
self.close()
419+
420+
def close(self) -> None:
421+
self._current_stream.close()
422+
423+
def __stream__(self) -> Iterator[_T]:
424+
while True:
425+
try:
426+
for item in self._current_stream:
427+
offset = self._get_offset(item)
428+
if offset is not None:
429+
self._last_offset = offset
430+
yield item
431+
return
432+
except Exception as e:
433+
# Reconnect on timeouts
434+
should_reconnect = False
435+
if isinstance(e, APITimeoutError):
436+
should_reconnect = True
437+
elif isinstance(e, APIStatusError) and getattr(e, "status_code", None) == 408:
438+
should_reconnect = True
439+
elif isinstance(e, httpx.TimeoutException):
440+
should_reconnect = True
441+
442+
if should_reconnect:
443+
# Close existing response before reconnecting
444+
try:
445+
self._current_stream.close()
446+
except Exception:
447+
pass
448+
self._current_stream = self._stream_creator(self._last_offset)
449+
continue
450+
raise
451+
452+
453+
class AsyncReconnectingStream(Generic[_T]):
454+
"""Async variant of ReconnectingStream supporting auto-reconnect on timeouts."""
455+
456+
def __init__(
457+
self,
458+
*,
459+
current_stream: AsyncStream[_T],
460+
stream_creator: Callable[[Optional[str]], Awaitable[AsyncStream[_T]]],
461+
get_offset: Callable[[_T], Optional[str]],
462+
) -> None:
463+
self._current_stream = current_stream
464+
self._stream_creator = stream_creator
465+
self._get_offset = get_offset
466+
self._last_offset: Optional[str] = None
467+
self._iterator = self.__stream__()
468+
469+
@property
470+
def response(self) -> httpx.Response:
471+
return self._current_stream.response
472+
473+
async def __anext__(self) -> _T:
474+
return await self._iterator.__anext__()
475+
476+
async def __aiter__(self) -> AsyncIterator[_T]:
477+
async for item in self._iterator:
478+
yield item
479+
480+
async def __aenter__(self) -> "AsyncReconnectingStream[_T]":
481+
return self
482+
483+
async def __aexit__(
484+
self,
485+
exc_type: type[BaseException] | None,
486+
exc: BaseException | None,
487+
exc_tb: TracebackType | None,
488+
) -> None:
489+
await self.close()
490+
491+
async def close(self) -> None:
492+
await self._current_stream.close()
493+
494+
async def __stream__(self) -> AsyncIterator[_T]:
495+
while True:
496+
try:
497+
async for item in self._current_stream:
498+
offset = self._get_offset(item)
499+
if offset is not None:
500+
self._last_offset = offset
501+
yield item
502+
return
503+
except Exception as e:
504+
# Reconnect on timeouts
505+
should_reconnect = False
506+
if isinstance(e, APITimeoutError):
507+
should_reconnect = True
508+
elif isinstance(e, APIStatusError) and getattr(e, "status_code", None) == 408:
509+
should_reconnect = True
510+
elif isinstance(e, httpx.TimeoutException):
511+
should_reconnect = True
512+
513+
if should_reconnect:
514+
try:
515+
await self._current_stream.close()
516+
except Exception:
517+
pass
518+
self._current_stream = await self._stream_creator(self._last_offset)
519+
continue
520+
raise

src/runloop_api_client/resources/blueprints.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def create_and_await_build_complete(
249249
launch_parameters: Optional[LaunchParameters] | NotGiven = NOT_GIVEN,
250250
polling_config: PollingConfig | None = None,
251251
services: Optional[Iterable[blueprint_create_params.Service]] | NotGiven = NOT_GIVEN,
252-
system_setup_commands: Optional[List[str]] | NotGiven = NOT_GIVEN,
252+
system_setup_commands: Optional[SequenceNotStr[str]] | NotGiven = NOT_GIVEN,
253253
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
254254
# The extra values given here take precedence over values defined on the client or passed to this method.
255255
extra_headers: Headers | None = None,
@@ -766,7 +766,7 @@ async def create_and_await_build_complete(
766766
launch_parameters: Optional[LaunchParameters] | NotGiven = NOT_GIVEN,
767767
polling_config: PollingConfig | None = None,
768768
services: Optional[Iterable[blueprint_create_params.Service]] | NotGiven = NOT_GIVEN,
769-
system_setup_commands: Optional[List[str]] | NotGiven = NOT_GIVEN,
769+
system_setup_commands: Optional[SequenceNotStr[str]] | NotGiven = NOT_GIVEN,
770770
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
771771
# The extra values given here take precedence over values defined on the client or passed to this method.
772772
extra_headers: Headers | None = None,

0 commit comments

Comments
 (0)