Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions quest_test/test_external_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,35 +310,49 @@ async def test_step_specific_external():
async def test_multiqueue_default():
received = []

p1_p2_ready = asyncio.Event()
before_exit = asyncio.Event()

async def player_workflow():
players = {'p1': 'user1', 'p2': 'user2'}

async with MultiQueue('chat', players) as mq:
p1_p2_ready.set()

async for ident, msg in mq:
received.append((ident, msg))
# If player sends 'bye', remove their queue after their message is recorded

if msg == 'bye':
await mq.remove(ident)
# Exit the Multiqueue when 3 messages are recorded

if len(received) == 3:
before_exit.set()
await asyncio.sleep(0.1)
break

return received

historian = Historian('test', player_workflow, [], serializer=NoopSerializer())
workflow = historian.run()

await asyncio.sleep(0.1)
await p1_p2_ready.wait()

# Check both p1 and p2 are in the chat
assert ('chat', 'p1') in await historian.get_resources('p1')
assert ('chat', 'p2') in await historian.get_resources('p2')

await historian.record_external_event('chat', 'p1', 'put', 'hello')
await historian.record_external_event('chat', 'p2', 'put', 'hi')
await historian.record_external_event('chat', 'p1', 'put', 'bye')

result = await workflow
assert result == [('p1', 'hello'), ('p2', 'hi'), ('p1', 'bye')]
await before_exit.wait()
# p1 should be removed after bye
assert ('chat', 'p1') not in await historian.get_resources('p1')
# p2 should still be in the chat
assert ('chat', 'p2') in await historian.get_resources('p2')

# After removing p1 -> when p1 tries to send message, it should raise KeyError
# with pytest.raises(KeyError):
# await historian.record_external_event('chat', 'p1', 'put', 'should not be received')
result = await workflow
assert set(result) == {('p1', 'hello'), ('p2', 'hi'), ('p1', 'bye')}


@pytest.mark.asyncio
Expand All @@ -360,8 +374,11 @@ async def player_workflow():

await historian.record_external_event('chat', 'p1', 'put', 'hello')
await historian.record_external_event('chat', 'p2', 'put', 'hi')

await asyncio.sleep(0.1)
# Second message from p1 - should be ignored due to single_response = True
await historian.record_external_event('chat', 'p1', 'put', 'should not be received')
with pytest.raises(KeyError):
await historian.record_external_event('chat', 'p1', 'put', 'should not be received')

result = await workflow
assert result == {'p1': 'hello', 'p2': 'hi'}
Expand Down
22 changes: 10 additions & 12 deletions src/quest/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,24 +145,22 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
# Cancel all pending tasks - context exits
for task in self.task_to_ident:
task.cancel()
# Exit all queues properly
for ident, wrapper in self.queues.items():
# Exit all queues properly - suspended
for ident, wrapper in list(self.queues.items()):
await wrapper.__aexit__(exc_type, exc_val, exc_tb)

async def remove(self, ident: str):
# Stop listening to this identity queue
task = self.ident_to_task.pop(ident, None)

if task is not None:
self.task_to_ident.pop(task)
# If identity is still active, cancel and clean up
if ident in self.ident_to_task:
task = self.ident_to_task[ident]
del self.ident_to_task[ident]
del self.task_to_ident[task]
task.cancel()

# Call __aexit__ on the corresponding queue wrapper
wrapper = self.queues.pop(ident, None)
if wrapper:
await wrapper.__aexit__(None, None, None)
wrapper = self.queues.pop(ident)
await wrapper.__aexit__(None, None, None)

self.active_queues.pop(ident, None)
self.active_queues.pop(ident)

async def __aiter__(self):
while self.task_to_ident:
Expand Down
Loading