diff --git a/quest_test/test_external_actions.py b/quest_test/test_external_actions.py index 89fb2bb..de7ee69 100644 --- a/quest_test/test_external_actions.py +++ b/quest_test/test_external_actions.py @@ -310,35 +310,49 @@ async def test_step_specific_external(): async def test_multiqueue_default(): received = [] + p1_p2_ready = asyncio.Event() + before_exit = asyncio.Event() + async def player_workflow(): players = {'p1': 'user1', 'p2': 'user2'} async with MultiQueue('chat', players) as mq: + p1_p2_ready.set() + async for ident, msg in mq: received.append((ident, msg)) - # If player sends 'bye', remove their queue after their message is recorded + if msg == 'bye': await mq.remove(ident) - # Exit the Multiqueue when 3 messages are recorded + if len(received) == 3: + before_exit.set() + await asyncio.sleep(0.1) break + return received historian = Historian('test', player_workflow, [], serializer=NoopSerializer()) workflow = historian.run() - await asyncio.sleep(0.1) + await p1_p2_ready.wait() + + # Check both p1 and p2 are in the chat + assert ('chat', 'p1') in await historian.get_resources('p1') + assert ('chat', 'p2') in await historian.get_resources('p2') await historian.record_external_event('chat', 'p1', 'put', 'hello') await historian.record_external_event('chat', 'p2', 'put', 'hi') await historian.record_external_event('chat', 'p1', 'put', 'bye') - result = await workflow - assert result == [('p1', 'hello'), ('p2', 'hi'), ('p1', 'bye')] + await before_exit.wait() + # p1 should be removed after bye + assert ('chat', 'p1') not in await historian.get_resources('p1') + # p2 should still be in the chat + assert ('chat', 'p2') in await historian.get_resources('p2') - # After removing p1 -> when p1 tries to send message, it should raise KeyError - # with pytest.raises(KeyError): - # await historian.record_external_event('chat', 'p1', 'put', 'should not be received') + result = await workflow + assert set(result) == {('p1', 'hello'), ('p2', 'hi'), ('p1', 'bye')} @pytest.mark.asyncio @@ -360,8 +374,11 @@ async def player_workflow(): await historian.record_external_event('chat', 'p1', 'put', 'hello') await historian.record_external_event('chat', 'p2', 'put', 'hi') + + await asyncio.sleep(0.1) # Second message from p1 - should be ignored due to single_response = True - await historian.record_external_event('chat', 'p1', 'put', 'should not be received') + with pytest.raises(KeyError): + await historian.record_external_event('chat', 'p1', 'put', 'should not be received') result = await workflow assert result == {'p1': 'hello', 'p2': 'hi'} diff --git a/src/quest/external.py b/src/quest/external.py index 8a7e963..263865f 100644 --- a/src/quest/external.py +++ b/src/quest/external.py @@ -145,24 +145,22 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): # Cancel all pending tasks - context exits for task in self.task_to_ident: task.cancel() - # Exit all queues properly - for ident, wrapper in self.queues.items(): + # Exit all queues properly - suspended + for ident, wrapper in list(self.queues.items()): await wrapper.__aexit__(exc_type, exc_val, exc_tb) async def remove(self, ident: str): - # Stop listening to this identity queue - task = self.ident_to_task.pop(ident, None) - - if task is not None: - self.task_to_ident.pop(task) + # If identity is still active, cancel and clean up + if ident in self.ident_to_task: + task = self.ident_to_task[ident] + del self.ident_to_task[ident] + del self.task_to_ident[task] task.cancel() - # Call __aexit__ on the corresponding queue wrapper - wrapper = self.queues.pop(ident, None) - if wrapper: - await wrapper.__aexit__(None, None, None) + wrapper = self.queues.pop(ident) + await wrapper.__aexit__(None, None, None) - self.active_queues.pop(ident, None) + self.active_queues.pop(ident) async def __aiter__(self): while self.task_to_ident: