From c8c64a297adf2f51cc3f51a55acb236f3219ebbe Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Mon, 21 Apr 2025 03:34:07 -0600 Subject: [PATCH 1/3] multiqueue_modification --- quest_test/test_external_actions.py | 26 +++++++++++++++++--------- src/quest/external.py | 24 +++++++++++------------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/quest_test/test_external_actions.py b/quest_test/test_external_actions.py index 89fb2bb..cdbdd02 100644 --- a/quest_test/test_external_actions.py +++ b/quest_test/test_external_actions.py @@ -3,7 +3,7 @@ import pytest from quest.external import state, queue, event, wrap_as_state, wrap_as_queue, MultiQueue -from quest.historian import Historian +from quest.historian import Historian, find_historian from quest.wrappers import task, step from quest.serializer import NoopSerializer from .utils import timeout @@ -314,14 +314,23 @@ async def player_workflow(): players = {'p1': 'user1', 'p2': 'user2'} async with MultiQueue('chat', players) as mq: + # Both p1 and p2 have a chat + assert ('chat', 'p1') in await find_historian().get_resources('p1') + assert ('chat', 'p2') in await find_historian().get_resources('p2') + async for ident, msg in mq: received.append((ident, msg)) - # If player sends 'bye', remove their queue after their message is recorded + if msg == 'bye': await mq.remove(ident) - # Exit the Multiqueue when 3 messages are recorded + # After p1 says bye, p1 is removed from chat + assert ('chat', ident) not in await find_historian().get_resources(ident) + if len(received) == 3: + # Check if p2 is still in the chat before chat is getting closed + assert ('chat', 'p2') in await find_historian().get_resources('p2') break + return received historian = Historian('test', player_workflow, [], serializer=NoopSerializer()) @@ -334,11 +343,7 @@ async def player_workflow(): await historian.record_external_event('chat', 'p1', 'put', 'bye') result = await workflow - assert result == [('p1', 'hello'), ('p2', 'hi'), ('p1', 'bye')] - - # After removing p1 -> when p1 tries to send message, it should raise KeyError - # with pytest.raises(KeyError): - # await historian.record_external_event('chat', 'p1', 'put', 'should not be received') + assert set(result) == {('p1', 'hello'), ('p2', 'hi'), ('p1', 'bye')} @pytest.mark.asyncio @@ -360,8 +365,11 @@ async def player_workflow(): await historian.record_external_event('chat', 'p1', 'put', 'hello') await historian.record_external_event('chat', 'p2', 'put', 'hi') + + await asyncio.sleep(0.1) # Second message from p1 - should be ignored due to single_response = True - await historian.record_external_event('chat', 'p1', 'put', 'should not be received') + with pytest.raises(KeyError): + await historian.record_external_event('chat', 'p1', 'put', 'should not be received') result = await workflow assert result == {'p1': 'hello', 'p2': 'hi'} diff --git a/src/quest/external.py b/src/quest/external.py index 8a7e963..e7d8640 100644 --- a/src/quest/external.py +++ b/src/quest/external.py @@ -145,24 +145,22 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): # Cancel all pending tasks - context exits for task in self.task_to_ident: task.cancel() - # Exit all queues properly - for ident, wrapper in self.queues.items(): - await wrapper.__aexit__(exc_type, exc_val, exc_tb) + # Exit all queues properly - suspended + for ident, wrapper in list(self.queues.items()): + await wrapper.__aexit__(asyncio.CancelledError, asyncio.CancelledError(SUSPENDED), None) async def remove(self, ident: str): - # Stop listening to this identity queue - task = self.ident_to_task.pop(ident, None) - - if task is not None: - self.task_to_ident.pop(task) + # If identity is still active, cancel and clean up + if ident in self.ident_to_task: + task = self.ident_to_task[ident] + del self.ident_to_task[ident] + del self.task_to_ident[task] task.cancel() - # Call __aexit__ on the corresponding queue wrapper - wrapper = self.queues.pop(ident, None) - if wrapper: - await wrapper.__aexit__(None, None, None) + wrapper = self.queues.pop(ident) + await wrapper.__aexit__(asyncio.CancelledError, asyncio.CancelledError(SUSPENDED), None) - self.active_queues.pop(ident, None) + self.active_queues.pop(ident) async def __aiter__(self): while self.task_to_ident: From eddc41ccc632785b6c188ff0561bddd388e7f87d Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Tue, 22 Apr 2025 21:20:16 -0600 Subject: [PATCH 2/3] multiqueue fixed --- quest_test/test_external_actions.py | 25 +++++++++++++++++-------- src/quest/external.py | 4 ++-- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/quest_test/test_external_actions.py b/quest_test/test_external_actions.py index cdbdd02..a1616fb 100644 --- a/quest_test/test_external_actions.py +++ b/quest_test/test_external_actions.py @@ -310,25 +310,24 @@ async def test_step_specific_external(): async def test_multiqueue_default(): received = [] + p1_p2_ready = asyncio.Event() + before_exit = asyncio.Event() + async def player_workflow(): players = {'p1': 'user1', 'p2': 'user2'} async with MultiQueue('chat', players) as mq: - # Both p1 and p2 have a chat - assert ('chat', 'p1') in await find_historian().get_resources('p1') - assert ('chat', 'p2') in await find_historian().get_resources('p2') + p1_p2_ready.set() async for ident, msg in mq: received.append((ident, msg)) if msg == 'bye': await mq.remove(ident) - # After p1 says bye, p1 is removed from chat - assert ('chat', ident) not in await find_historian().get_resources(ident) if len(received) == 3: - # Check if p2 is still in the chat before chat is getting closed - assert ('chat', 'p2') in await find_historian().get_resources('p2') + before_exit.set() + await asyncio.sleep(0.1) break return received @@ -336,12 +335,22 @@ async def player_workflow(): historian = Historian('test', player_workflow, [], serializer=NoopSerializer()) workflow = historian.run() - await asyncio.sleep(0.1) + await p1_p2_ready.wait() + + # Check both p1 and p2 are in the chat + assert ('chat', 'p1') in await historian.get_resources('p1') + assert ('chat', 'p2') in await historian.get_resources('p2') await historian.record_external_event('chat', 'p1', 'put', 'hello') await historian.record_external_event('chat', 'p2', 'put', 'hi') await historian.record_external_event('chat', 'p1', 'put', 'bye') + await before_exit.wait() + # p1 should be removed after bye + assert ('chat', 'p1') not in await historian.get_resources('p1') + # p2 should still be in the chat + assert ('chat', 'p2') in await historian.get_resources('p2') + result = await workflow assert set(result) == {('p1', 'hello'), ('p2', 'hi'), ('p1', 'bye')} diff --git a/src/quest/external.py b/src/quest/external.py index e7d8640..263865f 100644 --- a/src/quest/external.py +++ b/src/quest/external.py @@ -147,7 +147,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): task.cancel() # Exit all queues properly - suspended for ident, wrapper in list(self.queues.items()): - await wrapper.__aexit__(asyncio.CancelledError, asyncio.CancelledError(SUSPENDED), None) + await wrapper.__aexit__(exc_type, exc_val, exc_tb) async def remove(self, ident: str): # If identity is still active, cancel and clean up @@ -158,7 +158,7 @@ async def remove(self, ident: str): task.cancel() wrapper = self.queues.pop(ident) - await wrapper.__aexit__(asyncio.CancelledError, asyncio.CancelledError(SUSPENDED), None) + await wrapper.__aexit__(None, None, None) self.active_queues.pop(ident) From 31b6eed214828b661f980ae345f2717e5e6e8a76 Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Tue, 22 Apr 2025 21:22:24 -0600 Subject: [PATCH 3/3] multiqueue fixed --- quest_test/test_external_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quest_test/test_external_actions.py b/quest_test/test_external_actions.py index a1616fb..de7ee69 100644 --- a/quest_test/test_external_actions.py +++ b/quest_test/test_external_actions.py @@ -3,7 +3,7 @@ import pytest from quest.external import state, queue, event, wrap_as_state, wrap_as_queue, MultiQueue -from quest.historian import Historian, find_historian +from quest.historian import Historian from quest.wrappers import task, step from quest.serializer import NoopSerializer from .utils import timeout