diff --git a/api.md b/api.md index 0de2d614f..0c827fe6b 100644 --- a/api.md +++ b/api.md @@ -266,7 +266,7 @@ Methods: - client.devboxes.executions.execute_async(id, \*\*params) -> DevboxAsyncExecutionDetailView - client.devboxes.executions.execute_sync(id, \*\*params) -> DevboxExecutionDetailView - client.devboxes.executions.kill(execution_id, \*, devbox_id, \*\*params) -> DevboxAsyncExecutionDetailView -- client.devboxes.executions.stream_updates(execution_id, \*, devbox_id, \*\*params) -> DevboxAsyncExecutionDetailView +- client.devboxes.executions.stream_stdout_updates(execution_id, \*, devbox_id, \*\*params) -> DevboxAsyncExecutionDetailView # Scenarios diff --git a/src/runloop_api_client/_streaming.py b/src/runloop_api_client/_streaming.py index ded05ee4b..b894ae90e 100644 --- a/src/runloop_api_client/_streaming.py +++ b/src/runloop_api_client/_streaming.py @@ -4,12 +4,31 @@ import json import inspect from types import TracebackType -from typing import TYPE_CHECKING, Any, Generic, TypeVar, Iterator, AsyncIterator, cast -from typing_extensions import Self, Protocol, TypeGuard, override, get_origin, runtime_checkable +from typing import ( + TYPE_CHECKING, + Any, + Generic, + TypeVar, + Callable, + Iterator, + Optional, + Awaitable, + AsyncIterator, + cast, +) +from typing_extensions import ( + Self, + Protocol, + TypeGuard, + override, + get_origin, + runtime_checkable, +) import httpx from ._utils import extract_type_var_from_base +from ._exceptions import APIStatusError, APITimeoutError if TYPE_CHECKING: from ._client import Runloop, AsyncRunloop @@ -55,6 +74,17 @@ def __stream__(self) -> Iterator[_T]: iterator = self._iter_events() for sse in iterator: + # Surface server-sent error events as API errors to allow callers to handle/retry + if sse.event == "error": + try: + error_obj = json.loads(sse.data) + status_code = int(error_obj.get("code", 500)) + # Build a synthetic response to mirror normal error handling + fake_resp = httpx.Response(status_code, request=response.request, content=sse.data) + except Exception: + fake_resp = httpx.Response(500, request=response.request, content=sse.data) + raise self._client._make_status_error_from_response(fake_resp) + yield process_data(data=sse.json(), cast_to=cast_to, response=response) # Ensure the entire stream is consumed @@ -119,6 +149,17 @@ async def __stream__(self) -> AsyncIterator[_T]: iterator = self._iter_events() async for sse in iterator: + # Surface server-sent error events as API errors to allow callers to handle/retry + if sse.event == "error": + try: + error_obj = json.loads(sse.data) + status_code = int(error_obj.get("code", 500)) + # Build a synthetic response to mirror normal error handling + fake_resp = httpx.Response(status_code, request=response.request, content=sse.data) + except Exception: + fake_resp = httpx.Response(500, request=response.request, content=sse.data) + raise self._client._make_status_error_from_response(fake_resp) + yield process_data(data=sse.json(), cast_to=cast_to, response=response) # Ensure the entire stream is consumed @@ -331,3 +372,149 @@ class MyStream(Stream[bytes]): generic_bases=cast("tuple[type, ...]", (Stream, AsyncStream)), failure_message=failure_message, ) + + +class ReconnectingStream(Generic[_T]): + """Wraps a Stream with automatic reconnection on timeout (HTTP 408) or read timeouts. + + The reconnection uses the last observed offset from each item, as provided by + the given `get_offset` callback. The `stream_creator` will be called with the + last known offset to resume the stream. + """ + + def __init__( + self, + *, + current_stream: Stream[_T], + stream_creator: Callable[[Optional[str]], Stream[_T]], + get_offset: Callable[[_T], Optional[str]], + ) -> None: + self._current_stream = current_stream + self._stream_creator = stream_creator + self._get_offset = get_offset + self._last_offset: Optional[str] = None + self._iterator = self.__stream__() + + @property + def response(self) -> httpx.Response: + return self._current_stream.response + + def __next__(self) -> _T: + return self._iterator.__next__() + + def __iter__(self) -> Iterator[_T]: + for item in self._iterator: + yield item + + def __enter__(self) -> "ReconnectingStream[_T]": + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + self.close() + + def close(self) -> None: + self._current_stream.close() + + def __stream__(self) -> Iterator[_T]: + while True: + try: + for item in self._current_stream: + offset = self._get_offset(item) + if offset is not None: + self._last_offset = offset + yield item + return + except Exception as e: + # Reconnect on timeouts + should_reconnect = False + if isinstance(e, APITimeoutError): + should_reconnect = True + elif isinstance(e, APIStatusError) and getattr(e, "status_code", None) == 408: + should_reconnect = True + elif isinstance(e, httpx.TimeoutException): + should_reconnect = True + + if should_reconnect: + # Close existing response before reconnecting + try: + self._current_stream.close() + except Exception: + pass + self._current_stream = self._stream_creator(self._last_offset) + continue + raise + + +class AsyncReconnectingStream(Generic[_T]): + """Async variant of ReconnectingStream supporting auto-reconnect on timeouts.""" + + def __init__( + self, + *, + current_stream: AsyncStream[_T], + stream_creator: Callable[[Optional[str]], Awaitable[AsyncStream[_T]]], + get_offset: Callable[[_T], Optional[str]], + ) -> None: + self._current_stream = current_stream + self._stream_creator = stream_creator + self._get_offset = get_offset + self._last_offset: Optional[str] = None + self._iterator = self.__stream__() + + @property + def response(self) -> httpx.Response: + return self._current_stream.response + + async def __anext__(self) -> _T: + return await self._iterator.__anext__() + + async def __aiter__(self) -> AsyncIterator[_T]: + async for item in self._iterator: + yield item + + async def __aenter__(self) -> "AsyncReconnectingStream[_T]": + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + await self.close() + + async def close(self) -> None: + await self._current_stream.close() + + async def __stream__(self) -> AsyncIterator[_T]: + while True: + try: + async for item in self._current_stream: + offset = self._get_offset(item) + if offset is not None: + self._last_offset = offset + yield item + return + except Exception as e: + # Reconnect on timeouts + should_reconnect = False + if isinstance(e, APITimeoutError): + should_reconnect = True + elif isinstance(e, APIStatusError) and getattr(e, "status_code", None) == 408: + should_reconnect = True + elif isinstance(e, httpx.TimeoutException): + should_reconnect = True + + if should_reconnect: + try: + await self._current_stream.close() + except Exception: + pass + self._current_stream = await self._stream_creator(self._last_offset) + continue + raise diff --git a/src/runloop_api_client/resources/blueprints.py b/src/runloop_api_client/resources/blueprints.py index 443e71df8..dda7f7731 100644 --- a/src/runloop_api_client/resources/blueprints.py +++ b/src/runloop_api_client/resources/blueprints.py @@ -249,7 +249,7 @@ def create_and_await_build_complete( launch_parameters: Optional[LaunchParameters] | NotGiven = NOT_GIVEN, polling_config: PollingConfig | None = None, services: Optional[Iterable[blueprint_create_params.Service]] | NotGiven = NOT_GIVEN, - system_setup_commands: Optional[List[str]] | NotGiven = NOT_GIVEN, + system_setup_commands: Optional[SequenceNotStr[str]] | NotGiven = NOT_GIVEN, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, @@ -766,7 +766,7 @@ async def create_and_await_build_complete( launch_parameters: Optional[LaunchParameters] | NotGiven = NOT_GIVEN, polling_config: PollingConfig | None = None, services: Optional[Iterable[blueprint_create_params.Service]] | NotGiven = NOT_GIVEN, - system_setup_commands: Optional[List[str]] | NotGiven = NOT_GIVEN, + system_setup_commands: Optional[SequenceNotStr[str]] | NotGiven = NOT_GIVEN, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, diff --git a/src/runloop_api_client/resources/devboxes/executions.py b/src/runloop_api_client/resources/devboxes/executions.py index 2968aa13d..0002113ff 100755 --- a/src/runloop_api_client/resources/devboxes/executions.py +++ b/src/runloop_api_client/resources/devboxes/executions.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Optional +from typing import Optional, cast import httpx @@ -16,10 +16,10 @@ async_to_raw_response_wrapper, async_to_streamed_response_wrapper, ) -from ..._constants import DEFAULT_TIMEOUT +from ..._constants import DEFAULT_TIMEOUT, RAW_RESPONSE_HEADER +from ..._streaming import Stream, AsyncStream, ReconnectingStream, AsyncReconnectingStream from ..._exceptions import APIStatusError, APITimeoutError from ...lib.polling import PollingConfig, poll_until -from ..._streaming import Stream, AsyncStream from ..._base_client import make_request_options from ...types.devboxes import ( execution_kill_params, @@ -326,7 +326,7 @@ def kill( cast_to=DevboxAsyncExecutionDetailView, ) - def stream_updates( + def stream_stdout_updates( self, execution_id: str, *, @@ -340,7 +340,7 @@ def stream_updates( timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, ) -> Stream[ExecutionUpdateChunk]: """ - Tails the logs for the given execution with SSE streaming + Tails the stdout logs for the given execution with SSE streaming Args: offset: The byte offset to start the stream from @@ -357,18 +357,131 @@ def stream_updates( raise ValueError(f"Expected a non-empty value for `devbox_id` but received {devbox_id!r}") if not execution_id: raise ValueError(f"Expected a non-empty value for `execution_id` but received {execution_id!r}") - return self._get( - f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_updates", - options=make_request_options( - extra_headers=extra_headers, - extra_query=extra_query, - extra_body=extra_body, - timeout=timeout, - query=maybe_transform({"offset": offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams), - ), - cast_to=DevboxAsyncExecutionDetailView, - stream=True, - stream_cls=Stream[ExecutionUpdateChunk], + if extra_headers and extra_headers.get(RAW_RESPONSE_HEADER): + return self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stdout_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=maybe_transform( + {"offset": offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=Stream[ExecutionUpdateChunk], + ) + + def create_stream(last_offset: str | None) -> Stream[ExecutionUpdateChunk]: + new_offset = last_offset if last_offset is not None else (None if isinstance(offset, NotGiven) else offset) + return self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stdout_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=maybe_transform( + {"offset": new_offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=Stream[ExecutionUpdateChunk], + ) + + initial_stream = create_stream(None) + + def get_offset(item: ExecutionUpdateChunk) -> str | None: + value = getattr(item, "offset", None) + if value is None: + return None + return str(value) + + return cast( + Stream[ExecutionUpdateChunk], + ReconnectingStream(current_stream=initial_stream, stream_creator=create_stream, get_offset=get_offset), + ) + + def stream_stderr_updates( + self, + execution_id: str, + *, + devbox_id: str, + offset: str | NotGiven = NOT_GIVEN, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> Stream[ExecutionUpdateChunk]: + """ + Tails the stderr logs for the given execution with SSE streaming + + Args: + offset: The byte offset to start the stream from + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + timeout: Override the client-level default timeout for this request, in seconds + """ + if not devbox_id: + raise ValueError(f"Expected a non-empty value for `devbox_id` but received {devbox_id!r}") + if not execution_id: + raise ValueError(f"Expected a non-empty value for `execution_id` but received {execution_id!r}") + if extra_headers and extra_headers.get(RAW_RESPONSE_HEADER): + return self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stderr_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=maybe_transform( + {"offset": offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=Stream[ExecutionUpdateChunk], + ) + + def create_stream(last_offset: str | None) -> Stream[ExecutionUpdateChunk]: + new_offset = last_offset if last_offset is not None else (None if isinstance(offset, NotGiven) else offset) + return self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stderr_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=maybe_transform( + {"offset": new_offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=Stream[ExecutionUpdateChunk], + ) + + initial_stream = create_stream(None) + + def get_offset(item: ExecutionUpdateChunk) -> str | None: + value = getattr(item, "offset", None) + if value is None: + return None + return str(value) + + return cast( + Stream[ExecutionUpdateChunk], + ReconnectingStream(current_stream=initial_stream, stream_creator=create_stream, get_offset=get_offset), ) @@ -683,20 +796,211 @@ async def stream_updates( raise ValueError(f"Expected a non-empty value for `devbox_id` but received {devbox_id!r}") if not execution_id: raise ValueError(f"Expected a non-empty value for `execution_id` but received {execution_id!r}") - return await self._get( - f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_updates", - options=make_request_options( - extra_headers=extra_headers, - extra_query=extra_query, - extra_body=extra_body, - timeout=timeout, - query=await async_maybe_transform( - {"offset": offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + # If caller requested a raw or streaming response wrapper, return the underlying stream as-is + if extra_headers and extra_headers.get(RAW_RESPONSE_HEADER): + return await self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=await async_maybe_transform( + {"offset": offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), ), - ), - cast_to=DevboxAsyncExecutionDetailView, - stream=True, - stream_cls=AsyncStream[ExecutionUpdateChunk], + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=AsyncStream[ExecutionUpdateChunk], + ) + + async def create_stream(last_offset: str | None) -> AsyncStream[ExecutionUpdateChunk]: + new_offset = last_offset if last_offset is not None else (None if isinstance(offset, NotGiven) else offset) + return await self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=await async_maybe_transform( + {"offset": new_offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=AsyncStream[ExecutionUpdateChunk], + ) + + initial_stream = await create_stream(None) + + def get_offset(item: ExecutionUpdateChunk) -> str | None: + value = getattr(item, "offset", None) + if value is None: + return None + return str(value) + + return cast( + AsyncStream[ExecutionUpdateChunk], + AsyncReconnectingStream(current_stream=initial_stream, stream_creator=create_stream, get_offset=get_offset), + ) + + async def stream_stdout_updates( + self, + execution_id: str, + *, + devbox_id: str, + offset: str | NotGiven = NOT_GIVEN, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> AsyncStream[ExecutionUpdateChunk]: + """ + Tails the stdout logs for the given execution with SSE streaming + + Args: + offset: The byte offset to start the stream from + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + timeout: Override the client-level default timeout for this request, in seconds + """ + if not devbox_id: + raise ValueError(f"Expected a non-empty value for `devbox_id` but received {devbox_id!r}") + if not execution_id: + raise ValueError(f"Expected a non-empty value for `execution_id` but received {execution_id!r}") + if extra_headers and extra_headers.get(RAW_RESPONSE_HEADER): + return await self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stdout_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=await async_maybe_transform( + {"offset": offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=AsyncStream[ExecutionUpdateChunk], + ) + + async def create_stream(last_offset: str | None) -> AsyncStream[ExecutionUpdateChunk]: + new_offset = last_offset if last_offset is not None else (None if isinstance(offset, NotGiven) else offset) + return await self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stdout_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=await async_maybe_transform( + {"offset": new_offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=AsyncStream[ExecutionUpdateChunk], + ) + + initial_stream = await create_stream(None) + + def get_offset(item: ExecutionUpdateChunk) -> str | None: + value = getattr(item, "offset", None) + if value is None: + return None + return str(value) + + return cast( + AsyncStream[ExecutionUpdateChunk], + AsyncReconnectingStream(current_stream=initial_stream, stream_creator=create_stream, get_offset=get_offset), + ) + + async def stream_stderr_updates( + self, + execution_id: str, + *, + devbox_id: str, + offset: str | NotGiven = NOT_GIVEN, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> AsyncStream[ExecutionUpdateChunk]: + """ + Tails the stderr logs for the given execution with SSE streaming + + Args: + offset: The byte offset to start the stream from + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + timeout: Override the client-level default timeout for this request, in seconds + """ + if not devbox_id: + raise ValueError(f"Expected a non-empty value for `devbox_id` but received {devbox_id!r}") + if not execution_id: + raise ValueError(f"Expected a non-empty value for `execution_id` but received {execution_id!r}") + if extra_headers and extra_headers.get(RAW_RESPONSE_HEADER): + return await self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stderr_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=await async_maybe_transform( + {"offset": offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=AsyncStream[ExecutionUpdateChunk], + ) + + async def create_stream(last_offset: str | None) -> AsyncStream[ExecutionUpdateChunk]: + new_offset = last_offset if last_offset is not None else (None if isinstance(offset, NotGiven) else offset) + return await self._get( + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/stream_stderr_updates", + options=make_request_options( + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + query=await async_maybe_transform( + {"offset": new_offset}, execution_stream_updates_params.ExecutionStreamUpdatesParams + ), + ), + cast_to=DevboxAsyncExecutionDetailView, + stream=True, + stream_cls=AsyncStream[ExecutionUpdateChunk], + ) + + initial_stream = await create_stream(None) + + def get_offset(item: ExecutionUpdateChunk) -> str | None: + value = getattr(item, "offset", None) + if value is None: + return None + return str(value) + + return cast( + AsyncStream[ExecutionUpdateChunk], + AsyncReconnectingStream(current_stream=initial_stream, stream_creator=create_stream, get_offset=get_offset), ) @@ -716,8 +1020,11 @@ def __init__(self, executions: ExecutionsResource) -> None: self.kill = to_raw_response_wrapper( executions.kill, ) - self.stream_updates = to_raw_response_wrapper( - executions.stream_updates, + self.stream_stdout_updates = to_raw_response_wrapper( + executions.stream_stdout_updates, + ) + self.stream_stderr_updates = to_raw_response_wrapper( + executions.stream_stderr_updates, ) @@ -737,8 +1044,11 @@ def __init__(self, executions: AsyncExecutionsResource) -> None: self.kill = async_to_raw_response_wrapper( executions.kill, ) - self.stream_updates = async_to_raw_response_wrapper( - executions.stream_updates, + self.stream_stdout_updates = async_to_raw_response_wrapper( + executions.stream_stdout_updates, + ) + self.stream_stderr_updates = async_to_raw_response_wrapper( + executions.stream_stderr_updates, ) @@ -758,8 +1068,11 @@ def __init__(self, executions: ExecutionsResource) -> None: self.kill = to_streamed_response_wrapper( executions.kill, ) - self.stream_updates = to_streamed_response_wrapper( - executions.stream_updates, + self.stream_stdout_updates = to_streamed_response_wrapper( + executions.stream_stdout_updates, + ) + self.stream_stderr_updates = to_streamed_response_wrapper( + executions.stream_stderr_updates, ) @@ -779,6 +1092,9 @@ def __init__(self, executions: AsyncExecutionsResource) -> None: self.kill = async_to_streamed_response_wrapper( executions.kill, ) - self.stream_updates = async_to_streamed_response_wrapper( - executions.stream_updates, + self.stream_stdout_updates = async_to_streamed_response_wrapper( + executions.stream_stdout_updates, + ) + self.stream_stderr_updates = async_to_streamed_response_wrapper( + executions.stream_stderr_updates, ) diff --git a/tests/api_resources/devboxes/test_executions.py b/tests/api_resources/devboxes/test_executions.py index e4597808d..9b932b160 100755 --- a/tests/api_resources/devboxes/test_executions.py +++ b/tests/api_resources/devboxes/test_executions.py @@ -6,7 +6,9 @@ from typing import Any, cast from unittest.mock import Mock, patch +import httpx import pytest +from respx import MockRouter from tests.utils import assert_matches_type from runloop_api_client import Runloop, AsyncRunloop @@ -237,16 +239,22 @@ def test_path_params_kill(self, client: Runloop) -> None: ) @parametrize - def test_method_stream_updates(self, client: Runloop) -> None: - execution_stream = client.devboxes.executions.stream_updates( + def test_method_stream_stdout_updates(self, client: Runloop, respx_mock: MockRouter) -> None: + respx_mock.get("/v1/devboxes/devbox_id/executions/execution_id/stream_stdout_updates").mock( + return_value=httpx.Response(200) + ) + execution_stream = client.devboxes.executions.stream_stdout_updates( execution_id="execution_id", devbox_id="devbox_id", ) execution_stream.response.close() @parametrize - def test_method_stream_updates_with_all_params(self, client: Runloop) -> None: - execution_stream = client.devboxes.executions.stream_updates( + def test_method_stream_stdout_updates_with_all_params(self, client: Runloop, respx_mock: MockRouter) -> None: + respx_mock.get("/v1/devboxes/devbox_id/executions/execution_id/stream_stdout_updates").mock( + return_value=httpx.Response(200) + ) + execution_stream = client.devboxes.executions.stream_stdout_updates( execution_id="execution_id", devbox_id="devbox_id", offset="offset", @@ -254,8 +262,11 @@ def test_method_stream_updates_with_all_params(self, client: Runloop) -> None: execution_stream.response.close() @parametrize - def test_raw_response_stream_updates(self, client: Runloop) -> None: - response = client.devboxes.executions.with_raw_response.stream_updates( + def test_raw_response_stream_stdout_updates(self, client: Runloop, respx_mock: MockRouter) -> None: + respx_mock.get("/v1/devboxes/devbox_id/executions/execution_id/stream_stdout_updates").mock( + return_value=httpx.Response(200) + ) + response = client.devboxes.executions.with_raw_response.stream_stdout_updates( execution_id="execution_id", devbox_id="devbox_id", ) @@ -265,8 +276,11 @@ def test_raw_response_stream_updates(self, client: Runloop) -> None: stream.close() @parametrize - def test_streaming_response_stream_updates(self, client: Runloop) -> None: - with client.devboxes.executions.with_streaming_response.stream_updates( + def test_streaming_response_stream_stdout_updates(self, client: Runloop, respx_mock: MockRouter) -> None: + respx_mock.get("/v1/devboxes/devbox_id/executions/execution_id/stream_stdout_updates").mock( + return_value=httpx.Response(200) + ) + with client.devboxes.executions.with_streaming_response.stream_stdout_updates( execution_id="execution_id", devbox_id="devbox_id", ) as response: @@ -279,15 +293,77 @@ def test_streaming_response_stream_updates(self, client: Runloop) -> None: assert cast(Any, response.is_closed) is True @parametrize - def test_path_params_stream_updates(self, client: Runloop) -> None: + def test_path_params_stream_stdout_updates(self, client: Runloop) -> None: with pytest.raises(ValueError, match=r"Expected a non-empty value for `devbox_id` but received ''"): - client.devboxes.executions.with_raw_response.stream_updates( + client.devboxes.executions.with_raw_response.stream_stdout_updates( execution_id="execution_id", devbox_id="", ) + @parametrize + def test_stream_stdout_updates_auto_reconnect_on_timeout(self, client: Runloop) -> None: + """Verify stream reconnects on timeout using last seen offset (sync).""" + + # Minimal stream stub compatible with ReconnectingStream expectations + class IteratorStream: + def __init__(self, items: list[object], exc: Exception | None = None) -> None: + self._items = list(items) + self._exc = exc + self._raised = False + self.response = httpx.Response(200, request=httpx.Request("GET", "https://example.com")) + + def __iter__(self): + for item in self._items: + yield item + if self._exc is not None and not self._raised: + self._raised = True + raise self._exc + + def close(self) -> None: # called by reconnect wrapper + pass + + # Items with offsets + item1 = type("X", (), {"offset": "5"})() + item2 = type("X", (), {"offset": "9"})() + item3 = type("X", (), {"offset": "10"})() + + timeout_err = APITimeoutError(request=httpx.Request("GET", "https://example.com")) + + calls: list[str | None] = [] + + def fake_get(_path: str, *, options: Any, **_kwargs: Any): + from typing import Dict + + options_dict: Dict[str, object] = cast(Dict[str, object], options) + params = cast("dict[str, object]", options_dict.get("params", {})) + from typing import Optional + + calls.append(cast(Optional[str], params.get("offset"))) + # first call -> yields two items then timeout; second call -> yields one more and completes + if len(calls) == 1: + return IteratorStream([item1, item2], timeout_err) + elif len(calls) == 2: + return IteratorStream([item3], None) + raise AssertionError("Unexpected extra call to _get during auto-reconnect test") + + with patch.object(client.devboxes.executions, "_get", side_effect=fake_get): + stream = client.devboxes.executions.stream_stdout_updates( + execution_id="execution_id", + devbox_id="devbox_id", + ) + + seen_offsets: list[str] = [] + for chunk in stream: + # items are simple objects with an offset attribute + seen_offsets.append(getattr(chunk, "offset", "")) + stream.close() + + # Should have retried once using the last known offset + assert calls[1] is not None + assert seen_offsets == ["5", "9", "10"] + with pytest.raises(ValueError, match=r"Expected a non-empty value for `execution_id` but received ''"): - client.devboxes.executions.with_raw_response.stream_updates( + client.devboxes.executions.with_raw_response.stream_stdout_updates( execution_id="", devbox_id="devbox_id", ) @@ -458,10 +534,9 @@ def test_method_await_completed_various_statuses(self, client: Runloop) -> None: assert result.status == "completed" assert mock_post.call_count == 2 + class TestAsyncExecutions: - parametrize = pytest.mark.parametrize( - "async_client", [False, True, {"http_client": "aiohttp"}], indirect=True, ids=["loose", "strict", "aiohttp"] - ) + parametrize = pytest.mark.parametrize("async_client", [False, True], indirect=True, ids=["loose", "strict"]) @parametrize async def test_method_retrieve(self, async_client: AsyncRunloop) -> None: @@ -680,16 +755,24 @@ async def test_path_params_kill(self, async_client: AsyncRunloop) -> None: ) @parametrize - async def test_method_stream_updates(self, async_client: AsyncRunloop) -> None: - execution_stream = await async_client.devboxes.executions.stream_updates( + async def test_method_stream_stdout_updates(self, async_client: AsyncRunloop, respx_mock: MockRouter) -> None: + respx_mock.get("/v1/devboxes/devbox_id/executions/execution_id/stream_stdout_updates").mock( + return_value=httpx.Response(200) + ) + execution_stream = await async_client.devboxes.executions.stream_stdout_updates( execution_id="execution_id", devbox_id="devbox_id", ) await execution_stream.response.aclose() @parametrize - async def test_method_stream_updates_with_all_params(self, async_client: AsyncRunloop) -> None: - execution_stream = await async_client.devboxes.executions.stream_updates( + async def test_method_stream_stdout_updates_with_all_params( + self, async_client: AsyncRunloop, respx_mock: MockRouter + ) -> None: + respx_mock.get("/v1/devboxes/devbox_id/executions/execution_id/stream_stdout_updates").mock( + return_value=httpx.Response(200) + ) + execution_stream = await async_client.devboxes.executions.stream_stdout_updates( execution_id="execution_id", devbox_id="devbox_id", offset="offset", @@ -697,8 +780,11 @@ async def test_method_stream_updates_with_all_params(self, async_client: AsyncRu await execution_stream.response.aclose() @parametrize - async def test_raw_response_stream_updates(self, async_client: AsyncRunloop) -> None: - response = await async_client.devboxes.executions.with_raw_response.stream_updates( + async def test_raw_response_stream_stdout_updates(self, async_client: AsyncRunloop, respx_mock: MockRouter) -> None: + respx_mock.get("/v1/devboxes/devbox_id/executions/execution_id/stream_stdout_updates").mock( + return_value=httpx.Response(200) + ) + response = await async_client.devboxes.executions.with_raw_response.stream_stdout_updates( execution_id="execution_id", devbox_id="devbox_id", ) @@ -708,8 +794,13 @@ async def test_raw_response_stream_updates(self, async_client: AsyncRunloop) -> await stream.close() @parametrize - async def test_streaming_response_stream_updates(self, async_client: AsyncRunloop) -> None: - async with async_client.devboxes.executions.with_streaming_response.stream_updates( + async def test_streaming_response_stream_stdout_updates( + self, async_client: AsyncRunloop, respx_mock: MockRouter + ) -> None: + respx_mock.get("/v1/devboxes/devbox_id/executions/execution_id/stream_stdout_updates").mock( + return_value=httpx.Response(200) + ) + async with async_client.devboxes.executions.with_streaming_response.stream_stdout_updates( execution_id="execution_id", devbox_id="devbox_id", ) as response: @@ -722,15 +813,78 @@ async def test_streaming_response_stream_updates(self, async_client: AsyncRunloo assert cast(Any, response.is_closed) is True @parametrize - async def test_path_params_stream_updates(self, async_client: AsyncRunloop) -> None: + async def test_path_params_stream_stdout_updates(self, async_client: AsyncRunloop) -> None: with pytest.raises(ValueError, match=r"Expected a non-empty value for `devbox_id` but received ''"): - await async_client.devboxes.executions.with_raw_response.stream_updates( + await async_client.devboxes.executions.with_raw_response.stream_stdout_updates( execution_id="execution_id", devbox_id="", ) + @parametrize + async def test_stream_stdout_updates_auto_reconnect_on_timeout(self, async_client: AsyncRunloop) -> None: + """Verify stream reconnects on timeout using last seen offset (async).""" + + class AsyncIteratorStream: + def __init__(self, items: list[object], exc: Exception | None = None) -> None: + self._items = list(items) + self._exc = exc + self._raised = False + self.response = httpx.Response(200, request=httpx.Request("GET", "https://example.com")) + + def __aiter__(self): + self._iter = iter(self._items) + return self + + async def __anext__(self): + try: + return next(self._iter) + except StopIteration: + if self._exc is not None and not self._raised: + self._raised = True + raise self._exc from None + raise StopAsyncIteration from None + + async def close(self) -> None: + pass + + item1 = type("X", (), {"offset": "5"})() + item2 = type("X", (), {"offset": "9"})() + item3 = type("X", (), {"offset": "10"})() + + timeout_err = APITimeoutError(request=httpx.Request("GET", "https://example.com")) + + calls: list[str | None] = [] + + async def fake_get(_path: str, *, options: Any, **_kwargs: Any): + from typing import Dict + + options_dict: Dict[str, object] = cast(Dict[str, object], options) + params = cast("dict[str, object]", options_dict.get("params", {})) + from typing import Optional + + calls.append(cast(Optional[str], params.get("offset"))) + if len(calls) == 1: + return AsyncIteratorStream([item1, item2], timeout_err) + elif len(calls) == 2: + return AsyncIteratorStream([item3], None) + raise AssertionError("Unexpected extra call to _get during auto-reconnect test") + + with patch.object(async_client.devboxes.executions, "_get", side_effect=fake_get): + stream = await async_client.devboxes.executions.stream_stdout_updates( + execution_id="execution_id", + devbox_id="devbox_id", + ) + + seen_offsets: list[str] = [] + async for chunk in stream: + seen_offsets.append(getattr(chunk, "offset", "")) + await stream.close() + + assert calls[1] is not None + assert seen_offsets == ["5", "9", "10"] + with pytest.raises(ValueError, match=r"Expected a non-empty value for `execution_id` but received ''"): - await async_client.devboxes.executions.with_raw_response.stream_updates( + await async_client.devboxes.executions.with_raw_response.stream_stdout_updates( execution_id="", devbox_id="devbox_id", )