diff --git a/distributed/client.py b/distributed/client.py index b2e227b9604..c4abed3e29b 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -755,7 +755,11 @@ async def done_callback(future, callback): """ while future.status == "pending": await future._state.wait() - callback(future) + try: + callback(future) + except RuntimeError as e: + if "shutdown" not in str(e) and "interpreter" not in str(e): + raise class AllExit(Exception): diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index fbb08076613..3a1f08cfead 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -8435,3 +8435,24 @@ def reducer(futs, *, offset=0, **kwargs): result = await future.result() assert result == 30 if not offset else 31 + + +@gen_cluster(client=True) +async def test_done_callback_shutdown_runtime_error(c, s, a, b): + from distributed.client import done_callback + + future = c.submit(inc, 1) + await future + + # Shutdown-related RuntimeError is swallowed + def cb_shutdown(fut): + raise RuntimeError("cannot schedule new futures after interpreter shutdown") + + await done_callback(future, cb_shutdown) + + # Unrelated RuntimeError is re-raised + def cb_other(fut): + raise RuntimeError("something else") + + with pytest.raises(RuntimeError, match="something else"): + await done_callback(future, cb_other)