From 14d9ca00c609fa7fa9150fc4333646b00d32d586 Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Mon, 31 Mar 2025 09:54:51 -0600 Subject: [PATCH 1/9] multiqueue --- demos/multi_guess_server.py | 63 +++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/demos/multi_guess_server.py b/demos/multi_guess_server.py index 8fc56876..a35de008 100644 --- a/demos/multi_guess_server.py +++ b/demos/multi_guess_server.py @@ -1,12 +1,51 @@ import asyncio import random from pathlib import Path +from typing import Any from quest import (step, queue, state, identity_queue, create_filesystem_manager, these) from scratch.websocket_scratch.server import serve +class MultiQueue: + def __init__(self, queues: dict[str, Any]): + self.queues = queues + self.gets = {} # asyncio tasks -> identity + self._active = set(queues.keys()) # Track identities that didn't get removed + + async def __aenter__(self): + # Listen on all queues -> create a task for each queue.get() + self.gets = { + asyncio.create_task(q.get()): ident for ident, q in self.queues.items() + } + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + # Cancel all pending tasks - context exits + for task in self.gets: + task.cancel() + + def remove(self, ident: str): + # Done waiting for this identity -> remove + self._active.discard(ident) + + async def __anext__(self): + # All identities removed -> done + if not self._active: + raise StopAsyncIteration + + # Wait until any of the current task is done + done, _ = await asyncio.wait(self.gets.keys(), return_when=asyncio.FIRST_COMPLETED) + + for task in done: + ident = self.gets.pop(task) + if ident not in self._active: + continue # Skip - already removed + + result = await task + return ident, result + # TODO - write a websocket server that wraps # an existing workflow manager @@ -38,7 +77,6 @@ async def get_secret(): @step async def get_guesses(players: dict[str, str], message) -> dict[str, int]: guesses = {} - status_message = [] # TODO - the following code sequence is a little verbose @@ -51,28 +89,20 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: # it easy and clear async with ( - # Create a guess queue for each player - these({ - ident: queue('guess', ident) - for ident in players - }) as guess_queues + these({ident: queue('guess', ident) for ident in players}) as guess_queues, + MultiQueue(guess_queues) as mq ): - # Wait for guesses to come in. - # As they do, remove their queue so they can't guess again. - guess_gets = {q.get(): ident for ident, q in guess_queues.items()} - for guess_get in asyncio.as_completed(guess_gets): - guess = await guess_get - ident = guess_gets[guess] + # Iterate guesses one at a time + async for ident, guess in mq: guesses[ident] = guess - # Update the status + # Status message name = players[ident] status_message.append(f'{name} guessed {guess}') message.set('\n'.join(status_message)) - # Remove the queue - # The user will no longer see it - guess_queues.remove(ident) + # Stop listening from this identity + mq.remove(ident) return guesses @@ -100,6 +130,7 @@ async def multi_guess(): players = await get_players() await play_game(players) + # TODO: Rewrite this function to import and use serve from server.py async def main(): async with ( From 7c691e31f7abd3dc409a7c109cadc63744bb3c8e Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Wed, 2 Apr 2025 09:00:57 -0600 Subject: [PATCH 2/9] Modified MultiQueue, added Single ResponseMultiqueue --- demos/multi_guess_server.py | 75 ++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/demos/multi_guess_server.py b/demos/multi_guess_server.py index a35de008..77dce004 100644 --- a/demos/multi_guess_server.py +++ b/demos/multi_guess_server.py @@ -6,19 +6,21 @@ from quest import (step, queue, state, identity_queue, create_filesystem_manager, these) from scratch.websocket_scratch.server import serve +from quest.external import Queue class MultiQueue: - def __init__(self, queues: dict[str, Any]): + def __init__(self, queues: dict[str, Queue]): self.queues = queues - self.gets = {} # asyncio tasks -> identity - self._active = set(queues.keys()) # Track identities that didn't get removed + self.gets: dict[asyncio.Task, str] = {} + self.reverse: dict[str, asyncio.Task] = {} async def __aenter__(self): # Listen on all queues -> create a task for each queue.get() - self.gets = { - asyncio.create_task(q.get()): ident for ident, q in self.queues.items() - } + for ident, q in self.queues.items(): + task = asyncio.create_task(q.get()) + self.gets[task] = ident + self.reverse[ident] = task return self async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -27,24 +29,47 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): task.cancel() def remove(self, ident: str): - # Done waiting for this identity -> remove - self._active.discard(ident) + # Stop listening to this identity queue + if ident not in self.reverse: + raise KeyError(f"Identity '{ident}' does not exist in MultiQueue.") - async def __anext__(self): - # All identities removed -> done - if not self._active: - raise StopAsyncIteration + task = self.reverse.pop(ident) + self.gets.pop(task) + task.cancel() - # Wait until any of the current task is done - done, _ = await asyncio.wait(self.gets.keys(), return_when=asyncio.FIRST_COMPLETED) + async def __aiter__(self): + while self.gets: + # Wait until any of the current task is done + done, _ = await asyncio.wait(self.gets.keys(), return_when=asyncio.FIRST_COMPLETED) - for task in done: - ident = self.gets.pop(task) - if ident not in self._active: - continue # Skip - already removed + for task in done: + ident = self.gets.pop(task) + # Stop listening to this identity + self.reverse.pop(ident, None) + + try: + result = await task + yield ident, result + + except asyncio.CancelledError: + continue + + +class SingleResponseMultiQueue: + def __init__(self, queues: dict[str, Queue]): + self._mq = MultiQueue(queues) + + async def __aenter__(self): + return await self._mq.__aenter__() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + return await self._mq.__aexit__(exc_type, exc_val, exc_tb) + + async def __aiter__(self): + async for ident, item in self._mq: + self._mq.remove(ident) # Remove after one response from the identity + yield ident, item - result = await task - return ident, result # TODO - write a websocket server that wraps # an existing workflow manager @@ -79,6 +104,8 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: guesses = {} status_message = [] + queues: dict[str, Queue] = {ident: Queue() for ident in players} + # TODO - the following code sequence is a little verbose # We need to: # - create a queue for each player @@ -88,10 +115,7 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: # This pattern should be common enough we should make # it easy and clear - async with ( - these({ident: queue('guess', ident) for ident in players}) as guess_queues, - MultiQueue(guess_queues) as mq - ): + async with SingleResponseMultiQueue(queues) as mq: # Iterate guesses one at a time async for ident, guess in mq: guesses[ident] = guess @@ -101,9 +125,6 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: status_message.append(f'{name} guessed {guess}') message.set('\n'.join(status_message)) - # Stop listening from this identity - mq.remove(ident) - return guesses From 7cec7c473b35c6c514e48e67b596fa16c05e802e Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Wed, 9 Apr 2025 06:11:47 -0600 Subject: [PATCH 3/9] multiqueue --- demos/multi_guess_server.py | 63 +++++++++---- demos/test_multiqueue.py | 182 ++++++++++++++++++++++++++++++++++++ 2 files changed, 225 insertions(+), 20 deletions(-) create mode 100644 demos/test_multiqueue.py diff --git a/demos/multi_guess_server.py b/demos/multi_guess_server.py index 77dce004..5d45eb3d 100644 --- a/demos/multi_guess_server.py +++ b/demos/multi_guess_server.py @@ -5,71 +5,94 @@ from quest import (step, queue, state, identity_queue, create_filesystem_manager, these) -from scratch.websocket_scratch.server import serve from quest.external import Queue +from quest.historian import find_historian +# single response for now class MultiQueue: - def __init__(self, queues: dict[str, Queue]): + def __init__(self, queues: dict[str, Queue], single_response: bool = False): self.queues = queues - self.gets: dict[asyncio.Task, str] = {} - self.reverse: dict[str, asyncio.Task] = {} + self.single_response = single_response + self.task_to_ident: dict[asyncio.Task, str] = {} + self.ident_to_task: dict[str, asyncio.Task] = {} + + def _add_task(self, ident: str, q: Queue): + historian = find_historian() + task = historian.start_task( + q.get, + name=f"mq-get-{ident}" + ) + + self.task_to_ident[task] = ident + self.ident_to_task[ident] = task async def __aenter__(self): # Listen on all queues -> create a task for each queue.get() for ident, q in self.queues.items(): - task = asyncio.create_task(q.get()) - self.gets[task] = ident - self.reverse[ident] = task + self._add_task(ident, q) return self async def __aexit__(self, exc_type, exc_val, exc_tb): # Cancel all pending tasks - context exits - for task in self.gets: + for task in self.task_to_ident: task.cancel() def remove(self, ident: str): # Stop listening to this identity queue - if ident not in self.reverse: + if ident not in self.ident_to_task: raise KeyError(f"Identity '{ident}' does not exist in MultiQueue.") - task = self.reverse.pop(ident) - self.gets.pop(task) + task = self.ident_to_task.pop(ident) + self.task_to_ident.pop(task) task.cancel() async def __aiter__(self): - while self.gets: + while self.task_to_ident: # Wait until any of the current task is done - done, _ = await asyncio.wait(self.gets.keys(), return_when=asyncio.FIRST_COMPLETED) + done, _ = await asyncio.wait(self.task_to_ident.keys(), return_when=asyncio.FIRST_COMPLETED) for task in done: - ident = self.gets.pop(task) + ident = self.task_to_ident.pop(task) # Stop listening to this identity - self.reverse.pop(ident, None) + del self.ident_to_task[ident] try: result = await task yield ident, result + # Start listening again + if not self.single_response: + self._add_task(ident, self.queues[ident]) + except asyncio.CancelledError: continue class SingleResponseMultiQueue: def __init__(self, queues: dict[str, Queue]): - self._mq = MultiQueue(queues) + self._mq = MultiQueue(queues, single_response=True) + self._received: set[str] = set() + self._total = len(queues) async def __aenter__(self): - return await self._mq.__aenter__() + await self._mq.__aenter__() + return self async def __aexit__(self, exc_type, exc_val, exc_tb): return await self._mq.__aexit__(exc_type, exc_val, exc_tb) async def __aiter__(self): async for ident, item in self._mq: - self._mq.remove(ident) # Remove after one response from the identity + # Skip identities that already gave one response + if ident in self._received: + continue + self._received.add(ident) yield ident, item + if len(self._received) == self._total: + return + # TODO - write a websocket server that wraps # an existing workflow manager @@ -104,7 +127,7 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: guesses = {} status_message = [] - queues: dict[str, Queue] = {ident: Queue() for ident in players} + queues: dict[str, Queue] = {ident: queue('guess', ident) for ident in players} # TODO - the following code sequence is a little verbose # We need to: @@ -115,8 +138,8 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: # This pattern should be common enough we should make # it easy and clear + # Iterate guesses one at a time async with SingleResponseMultiQueue(queues) as mq: - # Iterate guesses one at a time async for ident, guess in mq: guesses[ident] = guess diff --git a/demos/test_multiqueue.py b/demos/test_multiqueue.py new file mode 100644 index 00000000..c833f585 --- /dev/null +++ b/demos/test_multiqueue.py @@ -0,0 +1,182 @@ +import pytest +import asyncio +from multi_guess_server import MultiQueue, SingleResponseMultiQueue +from quest.external import Queue +from quest_test.utils import create_in_memory_workflow_manager + + +@pytest.mark.asyncio +async def test_multiqueue_multiple_responses(): + players = ['a', 'b'] + responses_per_player = 2 + + queues = {ident: Queue() for ident in players} + for i in range(responses_per_player): + for ident in players: + await queues[ident].put(f'{ident}-{i}') + + results = [] + expected_responses = len(players) * responses_per_player + + async def dummy_workflow(): + async with MultiQueue(queues) as mq: + async for ident, val in mq: + results.append((ident, val)) + if len(results) == expected_responses: # 2 responses per 2 players + break + + workflows = {"dummy_workflow": dummy_workflow} + manager = create_in_memory_workflow_manager(workflows=workflows) + + async with manager: + manager.start_workflow("dummy_workflow", "dummy-id") + await manager.get_workflow_result("dummy-id") + + assert len(results) == expected_responses + + +@pytest.mark.asyncio +async def test_singleresponsemultiqueue_one_response_only(): + players = ['x', 'y', 'z'] + queues = {ident: Queue() for ident in players} + + for ident in players: + await queues[ident].put(f'{ident}-guess') + await queues[ident].put(f'{ident}-No') # should not be received to results + + results = [] + + async def dummy_workflow(): + async with SingleResponseMultiQueue(queues) as mq: + async for ident, val in mq: + results.append((ident, val)) + + workflows = {"dummy_workflow": dummy_workflow} + manager = create_in_memory_workflow_manager(workflows=workflows) + + async with manager: + manager.start_workflow("dummy_workflow", "dummy-id") + await manager.get_workflow_result("dummy-id") + + # One response per player + assert len(results) == len(players) + + # Ensure unnecessary guesses were not received + assert all("-No" not in val for _, val in results) + + +@pytest.mark.asyncio +async def test_multiqueue_remove(): + players = ['x', 'y'] + responses_per_player = 2 + queues = {ident: Queue() for ident in players} + + for i in range(responses_per_player): + for ident in players: + await queues[ident].put(f'{ident}-{i}') + + results = [] + + async def dummy_workflow(): + async with MultiQueue(queues) as mq: + async for ident, val in mq: + results.append((ident, val)) + + if ident == 'x' and 'x' in mq.ident_to_task: + mq.remove('x') + + if len(results) == 3: + break + + workflows = {"dummy_workflow": dummy_workflow} + manager = create_in_memory_workflow_manager(workflows=workflows) + + async with manager: + manager.start_workflow("dummy_workflow", "dummy-id") + await manager.get_workflow_result("dummy-id") + + assert ('x', 'x-0') in results + assert ('x', 'x-1') not in results + y_vals = [val for ident, val in results if ident == 'y'] + assert len(y_vals) == 2 +# +# @pytest.mark.asyncio +# async def test_multiqueue_multiple_responses(): +# players = ['a', 'b'] +# responses_per_player = 2 +# queues = {ident: Queue() for ident in players} +# +# # Put multiple items into each queue +# for i in range(responses_per_player): +# for ident in players: +# await queues[ident].put(f'{ident}-{i}') +# +# results = [] +# +# expected_responses = len(players) * responses_per_player +# +# async with MultiQueue(queues) as mq: +# async for ident, val in mq: +# results.append((ident, val)) +# if len(results) == expected_responses: # 2 responses per 2 players +# break +# +# assert len(results) == 4 +# +# +# @pytest.mark.asyncio +# async def test_singleresponsemultiqueue_one_response_only(): +# players = ['x', 'y', 'z'] +# queues = {ident: Queue() for ident in players} +# +# for ident in players: +# await queues[ident].put(f'{ident}-guess') +# await queues[ident].put(f'{ident}-No') # should not be received to results +# +# results = [] +# +# async with SingleResponseMultiQueue(queues) as mq: +# async for ident, val in mq: +# results.append((ident, val)) +# +# # One response per player +# assert len(results) == len(players) +# +# # Ensure unnecessary guesses were not received +# assert all("-No" not in val for _, val in results) +# +# +# @pytest.mark.asyncio +# async def test_multiqueue_remove(): +# players = ['x', 'y'] +# responses_per_player = 2 +# queues = {ident: Queue() for ident in players} +# +# for i in range(responses_per_player): +# for ident in players: +# await queues[ident].put(f'{ident}-{i}') +# +# results = [] +# remove_next_time = False # flag for delaying removal +# +# async with MultiQueue(queues) as mq: +# async for ident, val in mq: +# results.append((ident, val)) +# +# # Delay remove -> 'x' has been re-added +# if remove_next_time: +# mq.remove('x') +# remove_next_time = False +# +# # After first x message, mark x for removal +# if ident == 'x': +# remove_next_time = True +# +# if len(results) == 3: +# break +# +# assert ('x', 'x-0') in results +# assert ('x', 'x-1') not in results +# y_vals = [val for ident, val in results if ident == 'y'] +# assert len(y_vals) == 2 +# From 0eba30847a41d2d15b7132ec6e241244f1a2bd77 Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Wed, 9 Apr 2025 10:00:05 -0600 Subject: [PATCH 4/9] test --- demos/multi_guess_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demos/multi_guess_server.py b/demos/multi_guess_server.py index 5d45eb3d..79d6c49a 100644 --- a/demos/multi_guess_server.py +++ b/demos/multi_guess_server.py @@ -1,10 +1,10 @@ import asyncio import random from pathlib import Path -from typing import Any from quest import (step, queue, state, identity_queue, create_filesystem_manager, these) +from scratch.websocket_scratch.server import serve from quest.external import Queue from quest.historian import find_historian From 437038658b8e1f11a013a40d20c30a07525a3ab0 Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Wed, 9 Apr 2025 10:34:43 -0600 Subject: [PATCH 5/9] multiqueue --- demos/multi_guess_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demos/multi_guess_server.py b/demos/multi_guess_server.py index 79d6c49a..281dbd6c 100644 --- a/demos/multi_guess_server.py +++ b/demos/multi_guess_server.py @@ -9,7 +9,7 @@ from quest.historian import find_historian -# single response for now +# Multiple response per user class MultiQueue: def __init__(self, queues: dict[str, Queue], single_response: bool = False): self.queues = queues From e92e4f15c00e928fc295b3c707b7583bd06e65e3 Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Wed, 9 Apr 2025 18:42:55 -0600 Subject: [PATCH 6/9] multiqueue implemented without test --- demos/multi_guess_server.py | 93 +----------------- demos/test_multiqueue.py | 182 ------------------------------------ src/quest/external.py | 60 ++++++++++++ 3 files changed, 62 insertions(+), 273 deletions(-) delete mode 100644 demos/test_multiqueue.py diff --git a/demos/multi_guess_server.py b/demos/multi_guess_server.py index 281dbd6c..2d684ee7 100644 --- a/demos/multi_guess_server.py +++ b/demos/multi_guess_server.py @@ -5,94 +5,7 @@ from quest import (step, queue, state, identity_queue, create_filesystem_manager, these) from scratch.websocket_scratch.server import serve -from quest.external import Queue -from quest.historian import find_historian - - -# Multiple response per user -class MultiQueue: - def __init__(self, queues: dict[str, Queue], single_response: bool = False): - self.queues = queues - self.single_response = single_response - self.task_to_ident: dict[asyncio.Task, str] = {} - self.ident_to_task: dict[str, asyncio.Task] = {} - - def _add_task(self, ident: str, q: Queue): - historian = find_historian() - task = historian.start_task( - q.get, - name=f"mq-get-{ident}" - ) - - self.task_to_ident[task] = ident - self.ident_to_task[ident] = task - - async def __aenter__(self): - # Listen on all queues -> create a task for each queue.get() - for ident, q in self.queues.items(): - self._add_task(ident, q) - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - # Cancel all pending tasks - context exits - for task in self.task_to_ident: - task.cancel() - - def remove(self, ident: str): - # Stop listening to this identity queue - if ident not in self.ident_to_task: - raise KeyError(f"Identity '{ident}' does not exist in MultiQueue.") - - task = self.ident_to_task.pop(ident) - self.task_to_ident.pop(task) - task.cancel() - - async def __aiter__(self): - while self.task_to_ident: - # Wait until any of the current task is done - done, _ = await asyncio.wait(self.task_to_ident.keys(), return_when=asyncio.FIRST_COMPLETED) - - for task in done: - ident = self.task_to_ident.pop(task) - # Stop listening to this identity - del self.ident_to_task[ident] - - try: - result = await task - yield ident, result - - # Start listening again - if not self.single_response: - self._add_task(ident, self.queues[ident]) - - except asyncio.CancelledError: - continue - - -class SingleResponseMultiQueue: - def __init__(self, queues: dict[str, Queue]): - self._mq = MultiQueue(queues, single_response=True) - self._received: set[str] = set() - self._total = len(queues) - - async def __aenter__(self): - await self._mq.__aenter__() - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - return await self._mq.__aexit__(exc_type, exc_val, exc_tb) - - async def __aiter__(self): - async for ident, item in self._mq: - # Skip identities that already gave one response - if ident in self._received: - continue - self._received.add(ident) - yield ident, item - - if len(self._received) == self._total: - return - +from quest.external import MultiQueue # TODO - write a websocket server that wraps # an existing workflow manager @@ -127,8 +40,6 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: guesses = {} status_message = [] - queues: dict[str, Queue] = {ident: queue('guess', ident) for ident in players} - # TODO - the following code sequence is a little verbose # We need to: # - create a queue for each player @@ -139,7 +50,7 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: # it easy and clear # Iterate guesses one at a time - async with SingleResponseMultiQueue(queues) as mq: + async with MultiQueue('guess', players, single_response=True) as mq: async for ident, guess in mq: guesses[ident] = guess diff --git a/demos/test_multiqueue.py b/demos/test_multiqueue.py deleted file mode 100644 index c833f585..00000000 --- a/demos/test_multiqueue.py +++ /dev/null @@ -1,182 +0,0 @@ -import pytest -import asyncio -from multi_guess_server import MultiQueue, SingleResponseMultiQueue -from quest.external import Queue -from quest_test.utils import create_in_memory_workflow_manager - - -@pytest.mark.asyncio -async def test_multiqueue_multiple_responses(): - players = ['a', 'b'] - responses_per_player = 2 - - queues = {ident: Queue() for ident in players} - for i in range(responses_per_player): - for ident in players: - await queues[ident].put(f'{ident}-{i}') - - results = [] - expected_responses = len(players) * responses_per_player - - async def dummy_workflow(): - async with MultiQueue(queues) as mq: - async for ident, val in mq: - results.append((ident, val)) - if len(results) == expected_responses: # 2 responses per 2 players - break - - workflows = {"dummy_workflow": dummy_workflow} - manager = create_in_memory_workflow_manager(workflows=workflows) - - async with manager: - manager.start_workflow("dummy_workflow", "dummy-id") - await manager.get_workflow_result("dummy-id") - - assert len(results) == expected_responses - - -@pytest.mark.asyncio -async def test_singleresponsemultiqueue_one_response_only(): - players = ['x', 'y', 'z'] - queues = {ident: Queue() for ident in players} - - for ident in players: - await queues[ident].put(f'{ident}-guess') - await queues[ident].put(f'{ident}-No') # should not be received to results - - results = [] - - async def dummy_workflow(): - async with SingleResponseMultiQueue(queues) as mq: - async for ident, val in mq: - results.append((ident, val)) - - workflows = {"dummy_workflow": dummy_workflow} - manager = create_in_memory_workflow_manager(workflows=workflows) - - async with manager: - manager.start_workflow("dummy_workflow", "dummy-id") - await manager.get_workflow_result("dummy-id") - - # One response per player - assert len(results) == len(players) - - # Ensure unnecessary guesses were not received - assert all("-No" not in val for _, val in results) - - -@pytest.mark.asyncio -async def test_multiqueue_remove(): - players = ['x', 'y'] - responses_per_player = 2 - queues = {ident: Queue() for ident in players} - - for i in range(responses_per_player): - for ident in players: - await queues[ident].put(f'{ident}-{i}') - - results = [] - - async def dummy_workflow(): - async with MultiQueue(queues) as mq: - async for ident, val in mq: - results.append((ident, val)) - - if ident == 'x' and 'x' in mq.ident_to_task: - mq.remove('x') - - if len(results) == 3: - break - - workflows = {"dummy_workflow": dummy_workflow} - manager = create_in_memory_workflow_manager(workflows=workflows) - - async with manager: - manager.start_workflow("dummy_workflow", "dummy-id") - await manager.get_workflow_result("dummy-id") - - assert ('x', 'x-0') in results - assert ('x', 'x-1') not in results - y_vals = [val for ident, val in results if ident == 'y'] - assert len(y_vals) == 2 -# -# @pytest.mark.asyncio -# async def test_multiqueue_multiple_responses(): -# players = ['a', 'b'] -# responses_per_player = 2 -# queues = {ident: Queue() for ident in players} -# -# # Put multiple items into each queue -# for i in range(responses_per_player): -# for ident in players: -# await queues[ident].put(f'{ident}-{i}') -# -# results = [] -# -# expected_responses = len(players) * responses_per_player -# -# async with MultiQueue(queues) as mq: -# async for ident, val in mq: -# results.append((ident, val)) -# if len(results) == expected_responses: # 2 responses per 2 players -# break -# -# assert len(results) == 4 -# -# -# @pytest.mark.asyncio -# async def test_singleresponsemultiqueue_one_response_only(): -# players = ['x', 'y', 'z'] -# queues = {ident: Queue() for ident in players} -# -# for ident in players: -# await queues[ident].put(f'{ident}-guess') -# await queues[ident].put(f'{ident}-No') # should not be received to results -# -# results = [] -# -# async with SingleResponseMultiQueue(queues) as mq: -# async for ident, val in mq: -# results.append((ident, val)) -# -# # One response per player -# assert len(results) == len(players) -# -# # Ensure unnecessary guesses were not received -# assert all("-No" not in val for _, val in results) -# -# -# @pytest.mark.asyncio -# async def test_multiqueue_remove(): -# players = ['x', 'y'] -# responses_per_player = 2 -# queues = {ident: Queue() for ident in players} -# -# for i in range(responses_per_player): -# for ident in players: -# await queues[ident].put(f'{ident}-{i}') -# -# results = [] -# remove_next_time = False # flag for delaying removal -# -# async with MultiQueue(queues) as mq: -# async for ident, val in mq: -# results.append((ident, val)) -# -# # Delay remove -> 'x' has been re-added -# if remove_next_time: -# mq.remove('x') -# remove_next_time = False -# -# # After first x message, mark x for removal -# if ident == 'x': -# remove_next_time = True -# -# if len(results) == 3: -# break -# -# assert ('x', 'x-0') in results -# assert ('x', 'x-1') not in results -# y_vals = [val for ident, val in results if ident == 'y'] -# assert len(y_vals) == 2 -# diff --git a/src/quest/external.py b/src/quest/external.py index 80702c3a..6e45caa1 100644 --- a/src/quest/external.py +++ b/src/quest/external.py @@ -63,6 +63,7 @@ def value(self): class IdentityQueue: """Put and Get return and identity + the value""" + def __init__(self, *args, **kwargs): self._queue = asyncio.Queue(*args, **kwargs) @@ -95,6 +96,65 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._historian.delete_resource(self._name, self._identity, suspending=suspending) +class MultiQueue: + def __init__(self, name: str, players: dict[str, str], single_response: bool = False): + self.queues: dict[str, Queue] = {ident: queue(name, ident) for ident in players} + self.single_response = single_response + self.task_to_ident: dict[asyncio.Task, str] = {} + self.ident_to_task: dict[str, asyncio.Task] = {} + + def _add_task(self, ident: str, q: Queue): + historian = find_historian() + task = historian.start_task( + q.get, + name=f"mq-get-{ident}" + ) + + self.task_to_ident[task] = ident + self.ident_to_task[ident] = task + + async def __aenter__(self): + # Listen on all queues -> create a task for each queue.get() + for ident, q in self.queues.items(): + self._add_task(ident, q) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + # Cancel all pending tasks - context exits + for task in self.task_to_ident: + task.cancel() + + def remove(self, ident: str): + # Stop listening to this identity queue + if ident not in self.ident_to_task: + raise KeyError(f"Identity '{ident}' does not exist in MultiQueue.") + + task = self.ident_to_task.pop(ident) + self.task_to_ident.pop(task) + task.cancel() + + async def __aiter__(self): + while self.task_to_ident: + # Wait until any of the current task is done + done, _ = await asyncio.wait(self.task_to_ident.keys(), return_when=asyncio.FIRST_COMPLETED) + + for task in done: + ident = self.task_to_ident.pop(task) + # Stop listening to this identity + del self.ident_to_task[ident] + + try: + result = await task + yield ident, result + + # Start listening again + if not self.single_response: + self._add_task(ident, self.queues[ident]) + + except asyncio.CancelledError: + continue + + def queue(name, identity): return InternalResource(name, identity, Queue()) From 58f6cbdaded61b8691f079e0c68fe8203c486cbc Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Wed, 9 Apr 2025 18:44:08 -0600 Subject: [PATCH 7/9] multiqueue implemented without test --- demos/multi_guess_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demos/multi_guess_server.py b/demos/multi_guess_server.py index 2d684ee7..680e5de5 100644 --- a/demos/multi_guess_server.py +++ b/demos/multi_guess_server.py @@ -54,7 +54,7 @@ async def get_guesses(players: dict[str, str], message) -> dict[str, int]: async for ident, guess in mq: guesses[ident] = guess - # Status message + # Update the status name = players[ident] status_message.append(f'{name} guessed {guess}') message.set('\n'.join(status_message)) From d84824125b0e456bce753310c5345724d322a3c5 Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Wed, 9 Apr 2025 18:54:02 -0600 Subject: [PATCH 8/9] merge conflict resolved --- demos/multi_guess_server.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/demos/multi_guess_server.py b/demos/multi_guess_server.py index 51e450c4..29dac27a 100644 --- a/demos/multi_guess_server.py +++ b/demos/multi_guess_server.py @@ -7,8 +7,6 @@ from quest.server import Server from quest.external import MultiQueue -# TODO - write a websocket server that wraps -# an existing workflow manager @step async def get_players(): From 83a295f5d37964e04480561850a681a6e91243f6 Mon Sep 17 00:00:00 2001 From: YoungWoo Song Date: Wed, 16 Apr 2025 09:13:46 -0600 Subject: [PATCH 9/9] multiqueue test --- quest_test/test_external_actions.py | 64 ++++++++++++++++++++++++- src/quest/external.py | 74 +++++++++++++++++++---------- 2 files changed, 111 insertions(+), 27 deletions(-) diff --git a/quest_test/test_external_actions.py b/quest_test/test_external_actions.py index 59247cd0..89fb2bb0 100644 --- a/quest_test/test_external_actions.py +++ b/quest_test/test_external_actions.py @@ -2,7 +2,7 @@ import pytest -from quest.external import state, queue, event, wrap_as_state, wrap_as_queue +from quest.external import state, queue, event, wrap_as_state, wrap_as_queue, MultiQueue from quest.historian import Historian from quest.wrappers import task, step from quest.serializer import NoopSerializer @@ -305,6 +305,68 @@ async def test_step_specific_external(): assert (await workflow) == 3 +@pytest.mark.asyncio +@timeout(3) +async def test_multiqueue_default(): + received = [] + + async def player_workflow(): + players = {'p1': 'user1', 'p2': 'user2'} + + async with MultiQueue('chat', players) as mq: + async for ident, msg in mq: + received.append((ident, msg)) + # If player sends 'bye', remove their queue after their message is recorded + if msg == 'bye': + await mq.remove(ident) + # Exit the Multiqueue when 3 messages are recorded + if len(received) == 3: + break + return received + + historian = Historian('test', player_workflow, [], serializer=NoopSerializer()) + workflow = historian.run() + + await asyncio.sleep(0.1) + + await historian.record_external_event('chat', 'p1', 'put', 'hello') + await historian.record_external_event('chat', 'p2', 'put', 'hi') + await historian.record_external_event('chat', 'p1', 'put', 'bye') + + result = await workflow + assert result == [('p1', 'hello'), ('p2', 'hi'), ('p1', 'bye')] + + # After removing p1 -> when p1 tries to send message, it should raise KeyError + # with pytest.raises(KeyError): + # await historian.record_external_event('chat', 'p1', 'put', 'should not be received') + + +@pytest.mark.asyncio +@timeout(3) +async def test_multiqueue_single_response(): + received = {} + + async def player_workflow(): + players = {'p1': 'user1', 'p2': 'user2'} + async with MultiQueue('chat', players, single_response=True) as mq: + async for ident, msg in mq: + received[ident] = msg + return received + + historian = Historian('test', player_workflow, [], serializer=NoopSerializer()) + workflow = historian.run() + + await asyncio.sleep(0.1) + + await historian.record_external_event('chat', 'p1', 'put', 'hello') + await historian.record_external_event('chat', 'p2', 'put', 'hi') + # Second message from p1 - should be ignored due to single_response = True + await historian.record_external_event('chat', 'p1', 'put', 'should not be received') + + result = await workflow + assert result == {'p1': 'hello', 'p2': 'hi'} + + """ gate = asyncio.Event() diff --git a/src/quest/external.py b/src/quest/external.py index 69827287..8a7e9633 100644 --- a/src/quest/external.py +++ b/src/quest/external.py @@ -96,13 +96,32 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._historian.delete_resource(self._name, self._identity, suspending=suspending) +def queue(name, identity): + return InternalResource(name, identity, Queue()) + + +def event(name, identity): + return InternalResource(name, identity, Event()) + + +def state(name, identity, value): + return InternalResource(name, identity, State(value)) + + +def identity_queue(name): + return InternalResource(name, None, IdentityQueue()) + + class MultiQueue: def __init__(self, name: str, players: dict[str, str], single_response: bool = False): - self.queues: dict[str, Queue] = {ident: queue(name, ident) for ident in players} + self.queues: dict[str, InternalResource[Queue]] = {ident: queue(name, ident) for ident in players} self.single_response = single_response self.task_to_ident: dict[asyncio.Task, str] = {} self.ident_to_task: dict[str, asyncio.Task] = {} + # Hold unwrapped Queue objects after __aenter__ + self.active_queues: dict[str, Queue] = {} + def _add_task(self, ident: str, q: Queue): historian = find_historian() task = historian.start_task( @@ -115,23 +134,35 @@ def _add_task(self, ident: str, q: Queue): async def __aenter__(self): # Listen on all queues -> create a task for each queue.get() - for ident, q in self.queues.items(): - self._add_task(ident, q) + for ident, wrapper in self.queues.items(): + # Unwrap queue object + queue_obj = await wrapper.__aenter__() + self.active_queues[ident] = queue_obj + self._add_task(ident, queue_obj) return self async def __aexit__(self, exc_type, exc_val, exc_tb): # Cancel all pending tasks - context exits for task in self.task_to_ident: task.cancel() + # Exit all queues properly + for ident, wrapper in self.queues.items(): + await wrapper.__aexit__(exc_type, exc_val, exc_tb) - def remove(self, ident: str): + async def remove(self, ident: str): # Stop listening to this identity queue - if ident not in self.ident_to_task: - raise KeyError(f"Identity '{ident}' does not exist in MultiQueue.") + task = self.ident_to_task.pop(ident, None) + + if task is not None: + self.task_to_ident.pop(task) + task.cancel() + + # Call __aexit__ on the corresponding queue wrapper + wrapper = self.queues.pop(ident, None) + if wrapper: + await wrapper.__aexit__(None, None, None) - task = self.ident_to_task.pop(ident) - self.task_to_ident.pop(task) - task.cancel() + self.active_queues.pop(ident, None) async def __aiter__(self): while self.task_to_ident: @@ -149,27 +180,14 @@ async def __aiter__(self): # Start listening again if not self.single_response: - self._add_task(ident, self.queues[ident]) + q = self.active_queues.get(ident) + if q: + self._add_task(ident, q) except asyncio.CancelledError: continue -def queue(name, identity): - return InternalResource(name, identity, Queue()) - - -def event(name, identity): - return InternalResource(name, identity, Event()) - - -def state(name, identity, value): - return InternalResource(name, identity, State(value)) - - -def identity_queue(name): - return InternalResource(name, None, IdentityQueue()) - class _ResourceWrapper: def __init__(self, name: str, identity: str | None, historian: 'Historian', resource_class): self._name = name @@ -189,14 +207,18 @@ async def wrapper(*args, _name=self._name, _identity=self._identity, **kwargs): return wrapper + def wrap_as_queue(name: str, identity: str | None, historian: Historian) -> Queue: return _ResourceWrapper(name, identity, historian, Queue) + def wrap_as_event(name: str, identity: str | None, historian: Historian) -> Event: return _ResourceWrapper(name, identity, historian, Event) + def wrap_as_state(name: str, identity: str | None, historian: Historian) -> State: return _ResourceWrapper(name, identity, historian, State) + def wrap_as_identity_queue(name: str, identity: str | None, historian: Historian) -> IdentityQueue: - return _ResourceWrapper(name, identity, historian, IdentityQueue) \ No newline at end of file + return _ResourceWrapper(name, identity, historian, IdentityQueue)