Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# tractor: a trionic actor model built on `multiprocessing` and `trio`
#
# Copyright (C) 2018 Tyler Goodlet
# Copyright (C) 2018-2020 Tyler Goodlet

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down
6 changes: 5 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,13 @@ def sig_prog(proc, sig):
def daemon(loglevel, testdir, arb_addr):
"""Run a daemon actor as a "remote arbiter".
"""
if loglevel in ('trace', 'debug'):
# too much logging will lock up the subproc (smh)
loglevel = 'info'

cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})"
"import tractor; tractor.run_daemon([], arbiter_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)
Expand Down
129 changes: 119 additions & 10 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,13 @@ def test_subactors_unregister_on_cancel(
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
spawn_and_check_registry,
arb_addr,
use_signal,
False,
with_streaming,
partial(
spawn_and_check_registry,
arb_addr,
use_signal,
remote_arbiter=False,
with_streaming=with_streaming,
),
arbiter_addr=arb_addr
)

Expand All @@ -220,11 +222,118 @@ def test_subactors_unregister_on_cancel_remote_daemon(
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
spawn_and_check_registry,
arb_addr,
use_signal,
True,
with_streaming,
partial(
spawn_and_check_registry,
arb_addr,
use_signal,
remote_arbiter=True,
with_streaming=with_streaming,
),
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
)


async def streamer(agen):
async for item in agen:
print(item)


async def close_chans_before_nursery(
arb_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
) -> None:

# logic for how many actors should still be
# in the registry at teardown.
if remote_arbiter:
entries_at_end = 2
else:
entries_at_end = 1

async with tractor.get_arbiter(*arb_addr) as aportal:
try:
get_reg = partial(aportal.run, 'self', 'get_registry')

async with tractor.open_nursery() as tn:
portal1 = await tn.run_in_actor('consumer1', stream_forever)
agen1 = await portal1.result()

portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
agen2 = await portal2.run(__name__, 'stream_forever')

async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(.5)

# all subactors should have de-registered
registry = await get_reg()
assert portal1.channel.uid not in registry
assert portal2.channel.uid not in registry
assert len(registry) == entries_at_end


@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit(
start_method,
use_signal,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
partial(
close_chans_before_nursery,
arb_addr,
use_signal,
remote_arbiter=False,
),
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
)


@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
daemon,
start_method,
use_signal,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
partial(
close_chans_before_nursery,
arb_addr,
use_signal,
remote_arbiter=True,
),
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
)
2 changes: 1 addition & 1 deletion tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_no_main():


@tractor_test
async def test_self_is_registered():
async def test_self_is_registered(arb_addr):
"Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
Expand Down
6 changes: 3 additions & 3 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,18 @@ async def main():

await trio.sleep(0.5)
await even_portal.cancel_actor()
await trio.sleep(0.5)
await trio.sleep(1)

if pub_actor == 'arbiter':
assert 'even' not in get_topics()

await odd_portal.cancel_actor()
await trio.sleep(1)
await trio.sleep(2)

if pub_actor == 'arbiter':
while get_topics():
await trio.sleep(0.1)
if time.time() - start > 1:
if time.time() - start > 2:
pytest.fail("odds subscription never dropped?")
else:
await master_portal.cancel_actor()
Expand Down
6 changes: 3 additions & 3 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
import importlib
from functools import partial
from typing import Tuple, Any, Optional
from typing import Tuple, Any, Optional, List
import typing

import trio # type: ignore
Expand Down Expand Up @@ -115,7 +115,7 @@ def run(


def run_daemon(
rpc_module_paths: Tuple[str],
rpc_module_paths: List[str],
**kwargs
) -> None:
"""Spawn daemon actor which will respond to RPC.
Expand All @@ -124,7 +124,7 @@ def run_daemon(
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests.
"""
kwargs['rpc_module_paths'] = rpc_module_paths
kwargs['rpc_module_paths'] = list(rpc_module_paths)

for path in rpc_module_paths:
importlib.import_module(path)
Expand Down
Loading