Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""
``tractor`` testing!!
"""
import sys
import subprocess
import os
import random
import signal
import platform
import time

import pytest
import tractor
Expand All @@ -16,6 +20,19 @@
_arb_addr = '127.0.0.1', random.randint(1000, 9999)


# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows':
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT
_INT_SIGNAL = signal.CTRL_C_EVENT
_INT_RETURN_CODE = 3221225786
_PROC_SPAWN_WAIT = 2
else:
_KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4


no_windows = pytest.mark.skipif(
platform.system() == "Windows",
reason="Test is unsupported on windows",
Expand Down Expand Up @@ -89,3 +106,43 @@ def pytest_generate_tests(metafunc):
methods = ['trio']

metafunc.parametrize("start_method", methods, scope='module')


def sig_prog(proc, sig):
"Kill the actor-process with ``sig``."
proc.send_signal(sig)
time.sleep(0.1)
if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL)
ret = proc.wait()
assert ret


@pytest.fixture
def daemon(loglevel, testdir, arb_addr):
"""Run a daemon actor as a "remote arbiter".
"""
cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)
]
kwargs = dict()
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP

proc = testdir.popen(
cmdargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
time.sleep(_PROC_SPAWN_WAIT)
yield proc
sig_prog(proc, _INT_SIGNAL)
148 changes: 148 additions & 0 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
"""
Actor "discovery" testing
"""
import os
import signal
import platform
from functools import partial
import itertools

import pytest
import tractor
import trio
Expand Down Expand Up @@ -80,3 +86,145 @@ async def test_trynamic_trio(func, start_method):
print(await gretchen.result())
print(await donny.result())
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")


async def stream_forever():
for i in itertools.count():
yield i
await trio.sleep(0.01)


async def cancel(use_signal, delay=0):
# hold on there sally
await trio.sleep(delay)

# trigger cancel
if use_signal:
if platform.system() == 'Windows':
pytest.skip("SIGINT not supported on windows")
os.kill(os.getpid(), signal.SIGINT)
else:
raise KeyboardInterrupt


async def stream_from(portal):
async for value in await portal.result():
print(value)


async def spawn_and_check_registry(
arb_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,
) -> None:
actor = tractor.current_actor()

if remote_arbiter:
assert not actor.is_arbiter

async with tractor.get_arbiter(*arb_addr) as portal:
if actor.is_arbiter:
async def get_reg():
return actor._registry
extra = 1 # arbiter is local root actor
else:
get_reg = partial(portal.run, 'self', 'get_registry')
extra = 2 # local root actor + remote arbiter

# ensure current actor is registered
registry = await get_reg()
assert actor.uid in registry

if with_streaming:
to_run = stream_forever
else:
to_run = trio.sleep_forever

async with trio.open_nursery() as trion:
try:
async with tractor.open_nursery() as n:
portals = {}
for i in range(3):
name = f'a{i}'
portals[name] = await n.run_in_actor(name, to_run)

# wait on last actor to come up
async with tractor.wait_for_actor(name):
registry = await get_reg()
for uid in n._children:
assert uid in registry

assert len(portals) + extra == len(registry)

if with_streaming:
await trio.sleep(0.1)

pts = list(portals.values())
for p in pts[:-1]:
trion.start_soon(stream_from, p)

# stream for 1 sec
trion.start_soon(cancel, use_signal, 1)

last_p = pts[-1]
async for value in await last_p.result():
print(value)
else:
await cancel(use_signal)

finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)

# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry


@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel(
start_method,
use_signal,
arb_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
spawn_and_check_registry,
arb_addr,
use_signal,
False,
with_streaming,
arbiter_addr=arb_addr
)


@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method,
use_signal,
arb_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local process
tree) arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
spawn_and_check_registry,
arb_addr,
use_signal,
True,
with_streaming,
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
)
61 changes: 7 additions & 54 deletions tests/test_multi_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,24 @@
Multiple python programs invoking ``tractor.run()``
"""
import platform
import sys
import time
import signal
import subprocess

import pytest
import tractor
from conftest import tractor_test

# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows':
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT
_INT_SIGNAL = signal.CTRL_C_EVENT
_INT_RETURN_CODE = 3221225786
_PROC_SPAWN_WAIT = 2
else:
_KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4


def sig_prog(proc, sig):
"Kill the actor-process with ``sig``."
proc.send_signal(sig)
time.sleep(0.1)
if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL)
ret = proc.wait()
assert ret


@pytest.fixture
def daemon(loglevel, testdir, arb_addr):
cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)
]
kwargs = dict()
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP

proc = testdir.popen(
cmdargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
time.sleep(_PROC_SPAWN_WAIT)
yield proc
sig_prog(proc, _INT_SIGNAL)
from conftest import (
tractor_test,
sig_prog,
_INT_SIGNAL,
_INT_RETURN_CODE,
)


def test_abort_on_sigint(daemon):
assert daemon.returncode is None
time.sleep(0.1)
sig_prog(daemon, _INT_SIGNAL)
assert daemon.returncode == _INT_RETURN_CODE

# XXX: oddly, couldn't get capfd.readouterr() to work here?
if platform.system() != 'Windows':
# don't check stderr on windows as its empty when sending CTRL_C_EVENT
Expand Down
16 changes: 15 additions & 1 deletion tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async def _process_messages(
f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks:
if channel is chan:
self._cancel_task(cid, channel)
await self._cancel_task(cid, channel)
log.debug(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
Expand Down Expand Up @@ -678,7 +678,10 @@ async def _async_main(

finally:
if registered_with_arbiter:
# with trio.move_on_after(3) as cs:
# cs.shield = True
await self._do_unreg(self._arb_addr)

# terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared
if not self._no_more_peers.is_set():
Expand Down Expand Up @@ -863,6 +866,17 @@ def find_actor(self, name: str) -> Optional[Tuple[str, int]]:

return None

async def get_registry(
self
) -> Dict[str, Tuple[str, str]]:
"""Return current name registry.
"""
# NOTE: requires ``strict_map_key=False`` to the msgpack
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return self._registry

async def wait_for_actor(
self, name: str
) -> List[Tuple[str, int]]:
Expand Down
4 changes: 3 additions & 1 deletion tractor/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async def get_arbiter(
arbiter.
"""
actor = current_actor()

if not actor:
raise RuntimeError("No actor instance has been defined yet?")

Expand All @@ -38,7 +39,8 @@ async def get_arbiter(

@asynccontextmanager
async def find_actor(
name: str, arbiter_sockaddr: Tuple[str, int] = None
name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]:
"""Ask the arbiter to find actor(s) by name.

Expand Down
6 changes: 4 additions & 2 deletions tractor/_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ def _trio_main(
actor._async_main,
parent_addr=parent_addr
)

try:
trio.run(trio_main)
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.info(f"Actor {actor.uid} terminated")
log.warning(f"Actor {actor.uid} received KBI")

log.info(f"Actor {actor.uid} terminated")
Loading