From 7db5b801d3b03bd00fed6a86985aaf31acdbc20c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Aug 2025 14:35:09 -0400 Subject: [PATCH 1/8] Add `ActorCancelled` as an runtime-wide-signal As in a layer "above" a KBI/SIGINT but "below" a `ContextCancelled` and generally signalling an interrupt which requests cancellation of the actor's `trio.run()`. Impl deats, - mk the new exc type inherit from our ctxc (for now) but overriding the `.canceller` impl to, * pull from the `RemoteActorError._extra_msgdata: dict` when no `._ipc_msg` is set (which is always to start, until we incorporate a new `CancelActor` msg type). * not allow a `None` value since we should key-error if not set per prev bullet. - Mk adjustments (related) to parent `RemoteActorError.pformat()` to accommodate showing the `.canceller` field in repr output, * change `.relay_uid` to not crash when `._ipc_msg` is unset. * support `.msg.types.Aid` and use its `.reprol()` from `._mk_fields_str()`. * always call `._mk_fields_str()`, not just when `tb_str` is provided, and for now use any `._message` in-place of a `tb_str` when undefined. --- tractor/_exceptions.py | 124 ++++++++++++++++++++++++++++++++--------- 1 file changed, 99 insertions(+), 25 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 418accc34..3c4692229 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -46,6 +46,7 @@ from tractor._state import current_actor from tractor.log import get_logger from tractor.msg import ( + Aid, Error, PayloadMsg, MsgType, @@ -479,9 +480,10 @@ def relay_path(self) -> list[tuple]: @property def relay_uid(self) -> tuple[str, str]|None: - return tuple( - self._ipc_msg.relay_path[-1] - ) + if msg := self._ipc_msg: + return tuple( + msg.relay_path[-1] + ) @property def src_uid(self) -> tuple[str, str]|None: @@ -521,7 +523,8 @@ def _mk_fields_str( for key in fields: if ( key == 'relay_uid' - and not self.is_inception() + and + not self.is_inception() ): continue @@ -534,6 +537,13 @@ def _mk_fields_str( None, ) ) + if ( + key == 'canceller' + and + isinstance(val, Aid) + ): + val: str = val.reprol(sin_uuid=False) + # TODO: for `.relay_path` on multiline? # if not isinstance(val, str): # val_str = pformat(val) @@ -623,25 +633,32 @@ def pformat( # IFF there is an embedded traceback-str we always # draw the ascii-box around it. body: str = '' - if tb_str := self.tb_str: - fields: str = self._mk_fields_str( - _body_fields - + - self.extra_body_fields, - ) - from tractor.devx import ( - pformat_boxed_tb, - ) - body: str = pformat_boxed_tb( - tb_str=tb_str, - fields_str=fields, - field_prefix=' |_', - # ^- is so that it's placed like so, - # just after tuple[str, str]|None: class ContextCancelled(RemoteActorError): ''' + IPC context cancellation signal/msg. + + Often reffed with the short-hand: "ctxc". + Inter-actor task context was cancelled by either a call to ``Portal.cancel_actor()`` or ``Context.cancel()``. @@ -737,8 +758,8 @@ def canceller(self) -> tuple[str, str]|None: - (simulating) an IPC transport network outage - a (malicious) pkt sent specifically to cancel an actor's - runtime non-gracefully without ensuring ongoing RPC tasks are - incrementally cancelled as is done with: + runtime non-gracefully without ensuring ongoing RPC tasks + are incrementally cancelled as is done with: `Actor` |_`.cancel()` |_`.cancel_soon()` @@ -759,6 +780,59 @@ def canceller(self) -> tuple[str, str]|None: # src_actor_uid = canceller +class ActorCancelled(ContextCancelled): + ''' + Runtime-layer cancellation signal/msg. + + Indicates a "graceful interrupt" of the machinery scheduled by + the py-proc's `trio.run()`. + + Often reffed with the short-hand: "actorc". + + Raised from within `an: ActorNursery` (via an `ExceptionGroup`) + when an actor has been "process wide" cancel-called using any of, + + - `ActorNursery.cancel()` + - `Portal.cancel_actor()` + + **and** that cancel request was part of a "non graceful" cancel + condition. + + That is, whenever an exception is to be raised outside an `an` + scope-block due to some error raised-in/relayed-to that scope. In + such cases for every subactor which was cancelledand subsequently + ( and according to the `an`'s supervision strat ) this is + normally raised per subactor portal. + + ''' + @property + def canceller(self) -> Aid: + ''' + Return the (maybe) `Actor.aid: Aid` for the requesting-author + of this actorc. + + Emit a warning msg when `.canceller` has not been set. + + See additional relevant notes in + `ContextCancelled.canceller`. + + ''' + value: tuple[str, str]|None + if msg := self._ipc_msg: + value = msg.canceller + else: + value = self._extra_msgdata['canceller'] + + if value: + return value + + log.warning( + 'IPC Context cancelled without a requesting actor?\n' + 'Maybe the IPC transport ended abruptly?\n\n' + f'{self}' + ) + + class MsgTypeError( RemoteActorError, ): From ddd8861fced679dd6ed0a342dcfc6ae251f4a15e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Aug 2025 17:19:35 -0400 Subject: [PATCH 2/8] Add an `actorc` test-driven-dev suite Defining how an actor-nursery should emit an eg based on non-graceful cancellation in a new `test_actor_nursery` module. Obviously fails atm until the implementation is completed. --- tests/test_actor_nursery.py | 98 +++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 tests/test_actor_nursery.py diff --git a/tests/test_actor_nursery.py b/tests/test_actor_nursery.py new file mode 100644 index 000000000..eb652b5d4 --- /dev/null +++ b/tests/test_actor_nursery.py @@ -0,0 +1,98 @@ +''' +Basic `ActorNursery` operations and closure semantics, +- basic remote error collection, +- basic multi-subactor cancellation. + +''' +# import os +# import signal +# import platform +# import time +# from itertools import repeat + +import pytest +import trio +import tractor +from tractor._exceptions import ActorCancelled +# from tractor._testing import ( +# tractor_test, +# ) +# from .conftest import no_windows + + +@pytest.mark.parametrize( + 'num_subs', + [ + 1, + 3, + ] +) +def test_one_cancels_all( + start_method: str, + loglevel: str, + debug_mode: bool, + num_subs: int, +): + ''' + Verify that ifa a single error bubbles to the an-scope the + nursery will be cancelled (just like in `trio`); this is a + one-cancels-all style strategy and are only supervision policy + at the moment. + + ''' + async def main(): + try: + rte = RuntimeError('Uh oh something bad in parent') + async with tractor.open_nursery( + start_method=start_method, + loglevel=loglevel, + debug_mode=debug_mode, + ) as an: + + # spawn the same number of deamon actors which should be cancelled + dactor_portals = [] + for i in range(num_subs): + name: str= f'sub_{i}' + ptl: tractor.Portal = await an.start_actor( + name=name, + enable_modules=[__name__], + ) + dactor_portals.append(ptl) + + # wait for booted + async with tractor.wait_for_actor(name): + print(f'{name!r} is up.') + + # simulate uncaught exc + raise rte + + # should error here with a ``RemoteActorError`` or ``MultiError`` + + except BaseExceptionGroup as _beg: + beg = _beg + + # ?TODO? why can't we do `is` on beg? + assert ( + beg.exceptions + == + an.maybe_error.exceptions + ) + + assert len(beg.exceptions) == ( + num_subs + + + 1 # rte from root + ) + + # all subactors should have been implicitly + # `Portal.cancel_actor()`ed. + excs = list(beg.exceptions) + excs.remove(rte) + for exc in excs: + assert isinstance(exc, ActorCancelled) + + assert an._scope_error is rte + assert not an._children + assert an.cancelled is True + + trio.run(main) From a59a3290aab5eff399eeca150a12e2e64e232806 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Aug 2025 01:05:46 -0400 Subject: [PATCH 3/8] TOSQUASH 313ad93: yeah dun use `._message` as tb-str.. --- tractor/_exceptions.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 3c4692229..24a9f29d5 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -639,26 +639,29 @@ def pformat( self.extra_body_fields, ) - # ?TODO, ensure the `.message` doesn't show up 2x in - # output ya? tb_str: str = ( self.tb_str - or - self._message - ) - from tractor.devx import ( - pformat_boxed_tb, - ) - body: str = pformat_boxed_tb( - tb_str=tb_str, - fields_str=fields, - field_prefix=' |_', - # ^- is so that it's placed like so, - # just after Date: Tue, 5 Aug 2025 11:55:45 -0400 Subject: [PATCH 4/8] Use `an` var name in nested subactor debugging ex. --- .../multi_nested_subactors_error_up_through_nurseries.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py index b63f1945c..622029a9c 100644 --- a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py +++ b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py @@ -21,12 +21,12 @@ async def breakpoint_forever(): async def spawn_until(depth=0): """"A nested nursery that triggers another ``NameError``. """ - async with tractor.open_nursery() as n: + async with tractor.open_nursery() as an: if depth < 1: - await n.run_in_actor(breakpoint_forever) + await an.run_in_actor(breakpoint_forever) - p = await n.run_in_actor( + p = await an.run_in_actor( name_error, name='name_error' ) @@ -38,7 +38,7 @@ async def spawn_until(depth=0): # recusrive call to spawn another process branching layer of # the tree depth -= 1 - await n.run_in_actor( + await an.run_in_actor( spawn_until, depth=depth, name=f'spawn_until_{depth}', From e08bf4812cfc1ff8afa74444ecd0cd29429484c9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Aug 2025 11:59:17 -0400 Subject: [PATCH 5/8] Add todo for `tn` to `gather_contexts()` from `find_actor()`? --- tractor/_discovery.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index bf4d066a4..833213c3c 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -27,7 +27,7 @@ ) from contextlib import asynccontextmanager as acm -from tractor.log import get_logger +from .log import get_logger from .trionics import ( gather_contexts, collapse_eg, @@ -225,7 +225,7 @@ async def find_actor( raise_on_none: bool = False, ) -> AsyncGenerator[ - Portal | list[Portal] | None, + Portal|list[Portal]|None, None, ]: ''' @@ -267,6 +267,7 @@ async def find_actor( collapse_eg(), gather_contexts( mngrs=maybe_portals, + # tn=tn, # ?TODO, helps to pass rent tn here? ) as portals, ): # log.runtime( From 7ee95cf709332f826f7989c3a1590bfa1ee83306 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Aug 2025 12:56:17 -0400 Subject: [PATCH 6/8] WIP, actor-nursery non-graceful-cancel raises EG Attempting a rework of the post-cancellation "raising semantics" such that subactors which are `ActorCancelled` as a result of a non-graceful in-scope error, are acked via a re-raised `ExceptionGroup[ActorCancelled*N, Exception]` *outside the an-block*. Eventually, the idea is to have `ActorCancelled` be relayed from each subactor in response to any `Actor.cancel()/Portal.cancel_actor()` request much like `Context.cancel()/ContextCancelled`. This is a WIP bc it does break a few tests and requires related `_spawn`-mod-machinery changes to match some of which I'm not yet sure are required; need to dig into to the details of the currently failing suites first. `._supervise` patch deats, - add `ActorNursery.maybe_error` which delivers the maybe-EG or `._scope_error` depending on `.errors` (now `._errors`, a mapping from `Aid`-keys) has entries seet for subs. - raise ^ if non-null in a new outer-`finally` in `_open_and_supervise_one_cancels_all_nursery()`; an "outer" block is added to ensure all sub-actor-excs are emited/captured as part of `ActorNursery.cancel()` being called (as prior) as well as the `da_nursery` being explicitly cancelled alongside it (to unblock the tn-block, but still not sure why this is necessary yet?..). - (now masked) tried injecting actorcs from `.cancel()` loop, but (again per more explanation in section below) seems to be suffering a race issue with RAE relay? - left in buncha notes obvi for all this.. `._spawn` patch deats, - as above, expect `errors: dict` to map from `Aid`-keys. - pass `errors: dict` into `soft_kill()` since it seemed like we'd want to (for now) inject `ActoreCancelled` in some cases (but now i'm not sure XD). - tried out a couple spots (which are now masked) to inject `ActorCancelled` after calling `Portal.cancel()` in various subactor-supervision routines whenev an RAE is not set.. - oddly seems to be overwriting actual errors (likely due to racing with RAE receive and/or actorc-request timeout?) despite the guard logic..which clearly doesn't resolve the issue.. - buncha `tn`-style renaming. --- tractor/_spawn.py | 116 ++++++++--- tractor/_supervise.py | 461 ++++++++++++++++++++++++++---------------- 2 files changed, 376 insertions(+), 201 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 8d3c2cf68..ec601d8da 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -50,7 +50,11 @@ from tractor._portal import Portal from tractor._runtime import Actor from tractor._entry import _mp_main -from tractor._exceptions import ActorFailure +from tractor._exceptions import ( + ActorCancelled, + ActorFailure, + # NoResult, +) from tractor.msg import ( types as msgtypes, pretty_struct, @@ -137,7 +141,6 @@ def try_set_start_method( async def exhaust_portal( - portal: Portal, actor: Actor @@ -185,10 +188,12 @@ async def exhaust_portal( async def cancel_on_completion( - portal: Portal, actor: Actor, - errors: dict[tuple[str, str], Exception], + errors: dict[ + msgtypes.Aid, + Exception, + ], ) -> None: ''' @@ -209,24 +214,57 @@ async def cancel_on_completion( portal, actor, ) + aid: msgtypes.Aid = actor.aid + repr_aid: str = aid.reprol(sin_uuid=False) + if isinstance(result, Exception): - errors[actor.uid]: Exception = result + errors[aid]: Exception = result log.cancel( - 'Cancelling subactor runtime due to error:\n\n' - f'Portal.cancel_actor() => {portal.channel.uid}\n\n' - f'error: {result}\n' + 'Cancelling subactor {repr_aid!r} runtime due to error\n' + f'\n' + f'Portal.cancel_actor() => {portal.channel.uid}\n' + f'\n' + f'{result!r}\n' ) else: - log.runtime( - 'Cancelling subactor gracefully:\n\n' - f'Portal.cancel_actor() => {portal.channel.uid}\n\n' - f'result: {result}\n' + report: str = ( + f'Cancelling subactor {repr_aid!r} gracefully..\n' + f'\n' + ) + canc_info: str = ( + f'Portal.cancel_actor() => {portal.chan.uid}\n' + f'\n' + f'final-result => {result!r}\n' + ) + log.cancel( + report + + + canc_info ) # cancel the process now that we have a final result await portal.cancel_actor() + if ( + not errors.get(aid) + # and + # result is NoResult + ): + pass + # await debug.pause(shield=True) + + # errors[aid] = ActorCancelled( + # message=( + # f'Cancelled subactor {repr_aid!r}\n' + # f'{canc_info}\n' + # ), + # canceller=current_actor().aid, + # # TODO? should we have a ack-msg? + # # ipc_msg=?? + # # boxed_type=trio.Cancelled, + # ) + async def hard_kill( proc: trio.Process, @@ -331,6 +369,10 @@ async def soft_kill( Awaitable, ], portal: Portal, + errors: dict[ + msgtypes.Aid, + Exception, + ], ) -> None: ''' @@ -374,8 +416,8 @@ async def soft_kill( # below. This means we try to do a graceful teardown # via sending a cancel message before getting out # zombie killing tools. - async with trio.open_nursery() as n: - n.cancel_scope.shield = True + async with trio.open_nursery() as tn: + tn.cancel_scope.shield = True async def cancel_on_proc_deth(): ''' @@ -385,24 +427,35 @@ async def cancel_on_proc_deth(): ''' await wait_func(proc) - n.cancel_scope.cancel() + tn.cancel_scope.cancel() # start a task to wait on the termination of the # process by itself waiting on a (caller provided) wait # function which should unblock when the target process # has terminated. - n.start_soon(cancel_on_proc_deth) + tn.start_soon(cancel_on_proc_deth) # send the actor-runtime a cancel request. await portal.cancel_actor() + # if not errors.get(peer_aid): + # errors[peer_aid] = ActorCancelled( + # message=( + # 'Sub-actor cancelled gracefully by parent\n' + # ), + # canceller=current_actor().aid, + # # TODO? should we have a ack-msg? + # # ipc_msg=?? + # # boxed_type=trio.Cancelled, + # ) + if proc.poll() is None: # type: ignore log.warning( 'Subactor still alive after cancel request?\n\n' f'uid: {peer_aid}\n' f'|_{proc}\n' ) - n.cancel_scope.cancel() + tn.cancel_scope.cancel() raise @@ -410,7 +463,10 @@ async def new_proc( name: str, actor_nursery: ActorNursery, subactor: Actor, - errors: dict[tuple[str, str], Exception], + errors: dict[ + msgtypes.Aid, + Exception, + ], # passed through to actor main bind_addrs: list[UnwrappedAddress], @@ -449,7 +505,10 @@ async def trio_proc( name: str, actor_nursery: ActorNursery, subactor: Actor, - errors: dict[tuple[str, str], Exception], + errors: dict[ + msgtypes.Aid, + Exception, + ], # passed through to actor main bind_addrs: list[UnwrappedAddress], @@ -572,9 +631,9 @@ async def trio_proc( with trio.CancelScope(shield=True): await actor_nursery._join_procs.wait() - async with trio.open_nursery() as nursery: + async with trio.open_nursery() as ptl_reaper_tn: if portal in actor_nursery._cancel_after_result_on_exit: - nursery.start_soon( + ptl_reaper_tn.start_soon( cancel_on_completion, portal, subactor, @@ -587,7 +646,8 @@ async def trio_proc( await soft_kill( proc, trio.Process.wait, # XXX, uses `pidfd_open()` below. - portal + portal, + errors, ) # cancel result waiter that may have been spawned in @@ -596,7 +656,7 @@ async def trio_proc( 'Cancelling portal result reaper task\n' f'c)> {subactor.aid.reprol()!r}\n' ) - nursery.cancel_scope.cancel() + ptl_reaper_tn.cancel_scope.cancel() finally: # XXX NOTE XXX: The "hard" reap since no actor zombies are @@ -669,7 +729,10 @@ async def mp_proc( name: str, actor_nursery: ActorNursery, # type: ignore # noqa subactor: Actor, - errors: dict[tuple[str, str], Exception], + errors: dict[ + msgtypes.Aid, + Exception, + ], # passed through to actor main bind_addrs: list[UnwrappedAddress], parent_addr: UnwrappedAddress, @@ -794,7 +857,7 @@ async def mp_proc( cancel_on_completion, portal, subactor, - errors + errors, ) # This is a "soft" (cancellable) join/reap which @@ -803,7 +866,8 @@ async def mp_proc( await soft_kill( proc, proc_waiter, - portal + portal, + errors, ) # cancel result waiter that may have been spawned in diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 301c44e85..27d26a67c 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -30,6 +30,9 @@ import trio +from .msg import ( + types as msgtypes, +) from .devx import ( debug, pformat as _pformat, @@ -48,6 +51,7 @@ ) from ._exceptions import ( ContextCancelled, + ActorCancelled, ) from ._root import ( open_root_actor, @@ -99,7 +103,10 @@ def __init__( actor: Actor, ria_nursery: trio.Nursery, da_nursery: trio.Nursery, - errors: dict[tuple[str, str], BaseException], + errors: dict[ + msgtypes.Aid, + BaseException, + ], ) -> None: # self.supervisor = supervisor # TODO @@ -117,9 +124,11 @@ def __init__( ] ] = {} + # signals when it is ok to start waiting o subactor procs + # for termination. self._join_procs = trio.Event() self._at_least_one_child_in_debug: bool = False - self.errors = errors + self._errors = errors self._scope_error: BaseException|None = None self.exited = trio.Event() @@ -260,7 +269,7 @@ async def start_actor( name, self, subactor, - self.errors, + self._errors, bind_addrs, parent_addr, _rtv, # run time vars @@ -364,7 +373,9 @@ async def cancel( # then `._children`.. children: dict = self._children child_count: int = len(children) - msg: str = f'Cancelling actor nursery with {child_count} children\n' + msg: str = ( + f'Cancelling actor-nursery with {child_count} children\n' + ) server: IPCServer = self._actor.ipc_server @@ -391,7 +402,9 @@ async def cancel( else: if portal is None: # actor hasn't fully spawned yet - event: trio.Event = server._peer_connected[subactor.uid] + event: trio.Event = server._peer_connected[ + subactor.uid + ] log.warning( f"{subactor.uid} never 't finished spawning?" ) @@ -416,7 +429,20 @@ async def cancel( # spawn cancel tasks for each sub-actor assert portal if portal.channel.connected(): - tn.start_soon(portal.cancel_actor) + + async def canc_subactor(): + await portal.cancel_actor() + # aid: msgtypes.Aid = subactor.aid + # reprol: str = aid.reprol(sin_uuid=False) + # if not self._errors.get(aid): + # self._errors[aid] = ActorCancelled( + # message=( + # f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n' + # ), + # canceller=self._actor.aid, + # ) + + tn.start_soon(canc_subactor) log.cancel(msg) # if we cancelled the cancel (we hung cancelling remote actors) @@ -442,6 +468,47 @@ async def cancel( # mark ourselves as having (tried to have) cancelled all subactors self._join_procs.set() + @property + def maybe_error(self) -> ( + BaseException| + BaseExceptionGroup| + None + ): + ''' + Deliver any captured scope errors including those relayed + from subactors such as `ActorCancelled` during a non-graceful + cancellation scenario. + + When more then a "graceful cancel" occurrs wrap all collected + sub-exceptions in a raised `ExceptionGroup`. + + ''' + scope_exc: BaseException|None = self._scope_error + + # XXX NOTE, only pack an eg if there i at least one + # non-actorc exception received from a subactor, OR + # return `._scope_error` verbatim. + if (errors := self._errors): + # use `BaseExceptionGroup` as needed + excs: list[BaseException] = list(errors.values()) + if ( + len(excs) > 1 + and + any( + type(exc) not in {ActorCancelled,} + for exc in excs + ) + ): + return ExceptionGroup( + 'ActorNursery multi-errored with', + tuple(excs), + ) + + # raise the lone subactor exc + return list(excs)[0] + + return scope_exc + @acm async def _open_and_supervise_one_cancels_all_nursery( @@ -457,7 +524,10 @@ async def _open_and_supervise_one_cancels_all_nursery( inner_err: BaseException|None = None # the collection of errors retreived from spawned sub-actors - errors: dict[tuple[str, str], BaseException] = {} + errors: dict[ + msgtypes.Aid, + BaseException, + ] = {} # This is the outermost level "deamon actor" nursery. It is awaited # **after** the below inner "run in actor nursery". This allows for @@ -467,176 +537,212 @@ async def _open_and_supervise_one_cancels_all_nursery( # `ActorNursery.start_actor()`). # errors from this daemon actor nursery bubble up to caller - async with ( - collapse_eg(), - trio.open_nursery() as da_nursery, - ): - try: - # This is the inner level "run in actor" nursery. It is - # awaited first since actors spawned in this way (using - # `ActorNusery.run_in_actor()`) are expected to only - # return a single result and then complete (i.e. be canclled - # gracefully). Errors collected from these actors are - # immediately raised for handling by a supervisor strategy. - # As such if the strategy propagates any error(s) upwards - # the above "daemon actor" nursery will be notified. - async with ( - collapse_eg(), - trio.open_nursery() as ria_nursery, - ): - an = ActorNursery( - actor, - ria_nursery, - da_nursery, - errors - ) - try: - # spawning of actors happens in the caller's scope - # after we yield upwards - yield an - - # When we didn't error in the caller's scope, - # signal all process-monitor-tasks to conduct - # the "hard join phase". - log.runtime( - 'Waiting on subactors to complete:\n' - f'>}} {len(an._children)}\n' - ) - an._join_procs.set() - - except BaseException as _inner_err: - inner_err = _inner_err - errors[actor.uid] = inner_err - - # If we error in the root but the debugger is - # engaged we don't want to prematurely kill (and - # thus clobber access to) the local tty since it - # will make the pdb repl unusable. - # Instead try to wait for pdb to be released before - # tearing down. - await debug.maybe_wait_for_debugger( - child_in_debug=an._at_least_one_child_in_debug + try: + async with ( + collapse_eg(), + trio.open_nursery() as da_nursery, + ): + try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # `ActorNusery.run_in_actor()`) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. + async with ( + collapse_eg(), + trio.open_nursery() as ria_nursery, + ): + an = ActorNursery( + actor, + ria_nursery, + da_nursery, + errors ) + try: + # spawning of actors happens in the caller's scope + # after we yield upwards + yield an - # if the caller's scope errored then we activate our - # one-cancels-all supervisor strategy (don't - # worry more are coming). - an._join_procs.set() + # When we didn't error in the caller's scope, + # signal all process-monitor-tasks to conduct + # the "hard join phase". + log.runtime( + 'Waiting on subactors to complete:\n' + f'>}} {len(an._children)}\n' + ) + an._join_procs.set() + + except BaseException as _inner_err: + inner_err = _inner_err + # errors[actor.aid] = inner_err + + # If we error in the root but the debugger is + # engaged we don't want to prematurely kill (and + # thus clobber access to) the local tty since it + # will make the pdb repl unusable. + # Instead try to wait for pdb to be released before + # tearing down. + await debug.maybe_wait_for_debugger( + child_in_debug=an._at_least_one_child_in_debug + ) + + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). + an._join_procs.set() + + # XXX NOTE XXX: hypothetically an error could + # be raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. + with trio.CancelScope(shield=True): + etype: type = type(inner_err) + if etype in ( + trio.Cancelled, + KeyboardInterrupt, + ) or ( + is_multi_cancelled(inner_err) + ): + log.cancel( + f'Actor-nursery cancelled by {etype}\n\n' + + f'{current_actor().uid}\n' + f' |_{an}\n\n' + + # TODO: show tb str? + # f'{tb_str}' + ) + elif etype in { + ContextCancelled, + }: + log.cancel( + 'Actor-nursery caught remote cancellation\n' + '\n' + f'{inner_err.tb_str}' + ) + else: + log.exception( + 'Nursery errored with:\n' + + # TODO: same thing as in + # `._invoke()` to compute how to + # place this div-line in the + # middle of the above msg + # content.. + # -[ ] prolly helper-func it too + # in our `.log` module.. + # '------ - ------' + ) + + # cancel all subactors + await an.cancel() + + # ria_nursery scope end + + # TODO: this is the handler around the ``.run_in_actor()`` + # nursery. Ideally we can drop this entirely in the future as + # the whole ``.run_in_actor()`` API should be built "on top of" + # this lower level spawn-request-cancel "daemon actor" API where + # a local in-actor task nursery is used with one-to-one task + # + `await Portal.run()` calls and the results/errors are + # handled directly (inline) and errors by the local nursery. + except ( + Exception, + BaseExceptionGroup, + trio.Cancelled + ) as _outer_err: + outer_err = _outer_err + + # XXX: yet another guard before allowing the cancel + # sequence in case a (single) child is in debug. + await debug.maybe_wait_for_debugger( + child_in_debug=an._at_least_one_child_in_debug + ) - # XXX NOTE XXX: hypothetically an error could - # be raised and then a cancel signal shows up - # slightly after in which case the `else:` - # block here might not complete? For now, - # shield both. + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). + if an._children: + log.cancel( + 'Actor-nursery cancelling due error type:\n' + f'{outer_err}\n' + ) with trio.CancelScope(shield=True): - etype: type = type(inner_err) - if etype in ( - trio.Cancelled, - KeyboardInterrupt, - ) or ( - is_multi_cancelled(inner_err) - ): - log.cancel( - f'Actor-nursery cancelled by {etype}\n\n' - - f'{current_actor().uid}\n' - f' |_{an}\n\n' - - # TODO: show tb str? - # f'{tb_str}' - ) - elif etype in { - ContextCancelled, - }: - log.cancel( - 'Actor-nursery caught remote cancellation\n' - '\n' - f'{inner_err.tb_str}' - ) - else: - log.exception( - 'Nursery errored with:\n' - - # TODO: same thing as in - # `._invoke()` to compute how to - # place this div-line in the - # middle of the above msg - # content.. - # -[ ] prolly helper-func it too - # in our `.log` module.. - # '------ - ------' - ) - - # cancel all subactors await an.cancel() - # ria_nursery scope end - - # TODO: this is the handler around the ``.run_in_actor()`` - # nursery. Ideally we can drop this entirely in the future as - # the whole ``.run_in_actor()`` API should be built "on top of" - # this lower level spawn-request-cancel "daemon actor" API where - # a local in-actor task nursery is used with one-to-one task - # + `await Portal.run()` calls and the results/errors are - # handled directly (inline) and errors by the local nursery. - except ( - Exception, - BaseExceptionGroup, - trio.Cancelled - ) as _outer_err: - outer_err = _outer_err - - an._scope_error = outer_err or inner_err - - # XXX: yet another guard before allowing the cancel - # sequence in case a (single) child is in debug. - await debug.maybe_wait_for_debugger( - child_in_debug=an._at_least_one_child_in_debug - ) + raise - # If actor-local error was raised while waiting on - # ".run_in_actor()" actors then we also want to cancel all - # remaining sub-actors (due to our lone strategy: - # one-cancels-all). - if an._children: - log.cancel( - 'Actor-nursery cancelling due error type:\n' - f'{outer_err}\n' - ) - with trio.CancelScope(shield=True): - await an.cancel() - raise - - finally: - # No errors were raised while awaiting ".run_in_actor()" - # actors but those actors may have returned remote errors as - # results (meaning they errored remotely and have relayed - # those errors back to this parent actor). The errors are - # collected in ``errors`` so cancel all actors, summarize - # all errors and re-raise. - if errors: - if an._children: - with trio.CancelScope(shield=True): - await an.cancel() + finally: + scope_exc = an._scope_error = outer_err or inner_err + # await debug.pause(shield=True) + # if scope_exc: + # errors[actor.aid] = scope_exc + + # show this frame on any internal error + if ( + not an.cancelled + and + scope_exc + ): + __tracebackhide__: bool = False + + # NOTE, it's possible no errors were raised while + # awaiting ".run_in_actor()" actors but those + # sub-actors may have delivered remote errors as + # results, normally captured via machinery in + # `._spawn.cancel_on_completion()`. + # + # Any such remote errors are collected in `an._errors` + # which is summarized via `ActorNursery.maybe_error` + # which is maybe re-raised in an outer block (below). + # + # So here we first cancel all subactors the summarize + # all errors and then later (in that outer block) + # maybe-raise on a "non-graceful" cancellation + # outcome, normally as a summary EG. + if ( + scope_exc + or + errors + ): + + if an._children: + with trio.CancelScope(shield=True): + await an.cancel() + + # cancel outer tn so we unblock outside this + # finally! + da_nursery.cance_scope.cancel() + # + # ^TODO? still don't get why needed? + # - an.cancel() should cause all spawn-subtasks + # to eventually exit? + # - also, could (instead) we sync to an event here before + # (ever) calling `an.cancel()`?? + + # `da_nursery` scope end, thus a checkpoint. + finally: - # use `BaseExceptionGroup` as needed - if len(errors) > 1: - raise BaseExceptionGroup( - 'tractor.ActorNursery errored with', - tuple(errors.values()), - ) - else: - raise list(errors.values())[0] + # raise any eg compiled from all subs + # ??TODO should we also adopt strict-egs here like + # `trio.Nursery`?? + # + # XXX justification notes, + # docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups + # anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888 + # gh: https://github.com/python-trio/trio/issues/611 + if an_exc := an.maybe_error: + raise an_exc - # show frame on any (likely) internal error - if ( - not an.cancelled - and an._scope_error - ): - __tracebackhide__: bool = False + if scope_exc := an._scope_error: + raise scope_exc - # da_nursery scope end - nursery checkpoint - # final exit + # @acm-fn scope exit _shutdown_msg: str = ( @@ -648,7 +754,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # @api_frame async def open_nursery( *, # named params only! - hide_tb: bool = True, + hide_tb: bool = False, **kwargs, # ^TODO, paramspec for `open_root_actor()` @@ -684,16 +790,21 @@ async def open_nursery( # mark us for teardown on exit implicit_runtime: bool = True - async with open_root_actor( - hide_tb=hide_tb, - **kwargs, - ) as actor: + async with ( + # collapse_eg(hide_tb=hide_tb), + open_root_actor( + hide_tb=hide_tb, + **kwargs, + ) as actor, + ): assert actor is current_actor() try: - async with _open_and_supervise_one_cancels_all_nursery( - actor - ) as an: + async with ( + _open_and_supervise_one_cancels_all_nursery( + actor + ) as an + ): # NOTE: mark this nursery as having # implicitly started the root actor so From 4943438780951492dcc1ddbaa9b2496d5268897b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Aug 2025 12:57:40 -0400 Subject: [PATCH 7/8] Adjust nested-subs debug test for tbs output Such that we don't require every single src/relay_uid in the final output but instead at some point in the pre-output of some prompt. Added some comments to match each actor sub-layer. --- tests/devx/test_debugger.py | 61 ++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/tests/devx/test_debugger.py b/tests/devx/test_debugger.py index d3f9fa5dc..9b1056c06 100644 --- a/tests/devx/test_debugger.py +++ b/tests/devx/test_debugger.py @@ -709,10 +709,41 @@ def test_multi_nested_subactors_error_through_nurseries( child = spawn('multi_nested_subactors_error_up_through_nurseries') # timed_out_early: bool = False + at_least_one: list[str] = [ + "bdb.BdbQuit", + + # leaf subs, which actually raise in "user code" + "src_uid=('breakpoint_forever'", + "src_uid=('name_error'", + + # 2nd layer subs + "src_uid=('spawn_until_1'", + "src_uid=('spawn_until_2'", + "src_uid=('spawn_until_3'", + "relay_uid=('spawn_until_0'", + + # 1st layer subs + "src_uid=('spawner0'", + "src_uid=('spawner1'", + ] - for send_char in itertools.cycle(['c', 'q']): + for i, send_char in enumerate( + itertools.cycle(['c', 'q']) + ): try: child.expect(PROMPT) + + for patt in at_least_one.copy(): + if in_prompt_msg( + child, + [patt], + ): + print( + f'Found patt in prompt {i}\n' + f'patt: {patt!r}\n' + ) + at_least_one.remove(patt) + child.sendline(send_char) time.sleep(0.01) @@ -721,27 +752,15 @@ def test_multi_nested_subactors_error_through_nurseries( assert_before( child, - [ # boxed source errors - "NameError: name 'doggypants' is not defined", + [ + # boxed source errors should show in final + # post-prompt tb to console. "tractor._exceptions.RemoteActorError:", - "('name_error'", - "bdb.BdbQuit", - - # first level subtrees - # "tractor._exceptions.RemoteActorError: ('spawner0'", - "src_uid=('spawner0'", - - # "tractor._exceptions.RemoteActorError: ('spawner1'", - - # propagation of errors up through nested subtrees - # "tractor._exceptions.RemoteActorError: ('spawn_until_0'", - # "tractor._exceptions.RemoteActorError: ('spawn_until_1'", - # "tractor._exceptions.RemoteActorError: ('spawn_until_2'", - # ^-NOTE-^ old RAE repr, new one is below with a field - # showing the src actor's uid. - "src_uid=('spawn_until_0'", - "relay_uid=('spawn_until_1'", - "src_uid=('spawn_until_2'", + "NameError: name 'doggypants' is not defined", + + # TODO? once we get more pedantic with `relay_uid` should + # prolly include all actor-IDs we expect to see in final + # tb? ] ) From f92355139155a431ac262180b52fe7172168175e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Aug 2025 15:49:16 -0400 Subject: [PATCH 8/8] Add timeout around inf-streamer suite Since with the new actorc injection seems to be hanging? Not sure what exactly the issue is but likely races again during teardown between the `.run_in_actor()` remote-exc capture and any actorc after the `portal.cancel()`.. Also tossed in a bp to figure out why actorcs aren't actually showing outside the `trio.run()`..? --- tests/test_cancellation.py | 151 ++++++++++++++++++++++--------------- 1 file changed, 90 insertions(+), 61 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 27fd59d7b..9a3bd09e8 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -11,6 +11,9 @@ import pytest import trio import tractor +from tractor._exceptions import ( + ActorCancelled, +) from tractor._testing import ( tractor_test, ) @@ -124,7 +127,10 @@ async def main(): ) as nursery: await nursery.run_in_actor(assert_err, name='errorer1') - portal2 = await nursery.run_in_actor(assert_err, name='errorer2') + portal2 = await nursery.run_in_actor( + assert_err, + name='errorer2', + ) # get result(s) from main task try: @@ -137,7 +143,15 @@ async def main(): # here we should get a ``BaseExceptionGroup`` containing exceptions # from both subactors - with pytest.raises(BaseExceptionGroup): + with pytest.raises( + expected_exception=( + tractor.RemoteActorError, + + # ?TODO, should it be this?? + # like `trio`'s strict egs? + BaseExceptionGroup, + ), + ): trio.run(main) @@ -233,8 +247,9 @@ async def stream_forever(): @tractor_test -async def test_cancel_infinite_streamer(start_method): - +async def test_cancel_infinite_streamer( + start_method: str, +): # stream for at most 1 seconds with ( trio.fail_after(4), @@ -291,6 +306,7 @@ async def test_some_cancels_all( num_actors_and_errs: tuple, start_method: str, loglevel: str, + debug_mode: bool, ): ''' Verify a subset of failed subactors causes all others in @@ -306,68 +322,81 @@ async def test_some_cancels_all( ria_func, da_func, ) = num_actors_and_errs - try: - async with tractor.open_nursery() as an: - - # spawn the same number of deamon actors which should be cancelled - dactor_portals = [] - for i in range(num_actors): - dactor_portals.append(await an.start_actor( - f'deamon_{i}', - enable_modules=[__name__], - )) - - func, kwargs = ria_func - riactor_portals = [] - for i in range(num_actors): - # start actor(s) that will fail immediately - riactor_portals.append( - await an.run_in_actor( - func, - name=f'actor_{i}', - **kwargs + with trio.fail_after( + 3 + if not debug_mode + else 999 + ): + try: + async with tractor.open_nursery() as an: + + # spawn the same number of deamon actors which should be cancelled + dactor_portals = [] + for i in range(num_actors): + dactor_portals.append(await an.start_actor( + f'deamon_{i}', + enable_modules=[__name__], + )) + + func, kwargs = ria_func + riactor_portals = [] + for i in range(num_actors): + # start actor(s) that will fail immediately + riactor_portals.append( + await an.run_in_actor( + func, + name=f'actor_{i}', + **kwargs + ) ) - ) - if da_func: - func, kwargs, expect_error = da_func - for portal in dactor_portals: - # if this function fails then we should error here - # and the nursery should teardown all other actors - try: - await portal.run(func, **kwargs) - - except tractor.RemoteActorError as err: - assert err.boxed_type == err_type - # we only expect this first error to propogate - # (all other daemons are cancelled before they - # can be scheduled) - num_actors = 1 - # reraise so nursery teardown is triggered - raise - else: - if expect_error: - pytest.fail( - "Deamon call should fail at checkpoint?") + if da_func: + func, kwargs, expect_error = da_func + for portal in dactor_portals: + # if this function fails then we should error here + # and the nursery should teardown all other actors + try: + await portal.run(func, **kwargs) + + except tractor.RemoteActorError as err: + assert err.boxed_type == err_type + # we only expect this first error to propogate + # (all other daemons are cancelled before they + # can be scheduled) + num_actors = 1 + # reraise so nursery teardown is triggered + raise + else: + if expect_error: + pytest.fail( + "Deamon call should fail at checkpoint?") - # should error here with a ``RemoteActorError`` or ``MultiError`` + # should error here with a ``RemoteActorError`` or ``MultiError`` - except first_err as _err: - err = _err - if isinstance(err, BaseExceptionGroup): - assert len(err.exceptions) == num_actors - for exc in err.exceptions: - if isinstance(exc, tractor.RemoteActorError): - assert exc.boxed_type == err_type - else: - assert isinstance(exc, trio.Cancelled) - elif isinstance(err, tractor.RemoteActorError): - assert err.boxed_type == err_type + except first_err as _err: + err = _err - assert an.cancelled is True - assert not an._children - else: - pytest.fail("Should have gotten a remote assertion error?") + if isinstance(err, BaseExceptionGroup): + + assert len(err.exceptions) == num_actors + for exc in err.exceptions: + + # TODO, figure out why these aren't being set? + if isinstance(exc, ActorCancelled): + breakpoint() + + if isinstance(exc, tractor.RemoteActorError): + assert exc.boxed_type == err_type + else: + assert isinstance(exc, trio.Cancelled) + + elif isinstance(err, tractor.RemoteActorError): + assert err.boxed_type == err_type + + assert an.cancelled is True + assert not an._children + else: + pytest.fail("Should have gotten a remote assertion error?") async def spawn_and_error(breadth, depth) -> None: