Skip to content

multiqueue#83

Merged
byubean merged 10 commits intomainfrom
multiqueue
Apr 16, 2025
Merged

multiqueue#83
byubean merged 10 commits intomainfrom
multiqueue

Conversation

@easyasme
Copy link
Copy Markdown
Contributor

No description provided.



class MultiQueue:
def __init__(self, queues: dict[str, Any]):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find the real type (not Any).

class MultiQueue:
def __init__(self, queues: dict[str, Any]):
self.queues = queues
self.gets = {} # asyncio tasks -> identity
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.gets: dict[asyncio.Task, str]

continue # Skip - already removed

result = await task
return ident, result
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yield ident, result

# Done waiting for this identity -> remove
self._active.discard(ident)

async def __anext__(self):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async def __aiter__

for ident in players
}) as guess_queues
these({ident: queue('guess', ident) for ident in players}) as guess_queues,
MultiQueue(guess_queues) as mq
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async with MultiQueue('guess', ['id1', 'id2', 'id3']) as guess_queues:

Comment on lines +15 to +17
self.gets: dict[asyncio.Task, str] = {}
self.reverse: dict[str, asyncio.Task] = {}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would name these something like

task_to_ident
ident_to_task

async def __aenter__(self):
# Listen on all queues -> create a task for each queue.get()
for ident, q in self.queues.items():
task = asyncio.create_task(q.get())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to use the quest version of create_task so the task get's recorded.

for task in done:
ident = self.gets.pop(task)
# Stop listening to this identity
self.reverse.pop(ident, None)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try del self.reverse[ident] instead

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might fix the await warning you are getting.

self.gets.pop(task)
task.cancel()

async def __aiter__(self):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic already only supports one response per identity.

We need some tests demonstrating how the MultiQueue should work.

for guess_get in asyncio.as_completed(guess_gets):
guess = await guess_get
ident = guess_gets[guess]
async with SingleResponseMultiQueue(queues) as mq:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async with queues('guess', players) as qs:

continue


class SingleResponseMultiQueue:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need anymore

results = []

async def dummy_workflow():
async with SingleResponseMultiQueue(queues) as mq:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MultiQueue('message', players, single_response=True)

Comment on lines +50 to +52
# Iterate guesses one at a time
async with MultiQueue('guess', players, single_response=True) as mq:
async for ident, guess in mq:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love how much more concise this is from what it used to be. :)

Comment on lines +116 to +120
async def __aenter__(self):
# Listen on all queues -> create a task for each queue.get()
for ident, q in self.queues.items():
self._add_task(ident, q)
return self
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the queues need to be __aenter__ed here or they won't appear in workflow history.

Comment on lines +127 to +134
def remove(self, ident: str):
# Stop listening to this identity queue
if ident not in self.ident_to_task:
raise KeyError(f"Identity '{ident}' does not exist in MultiQueue.")

task = self.ident_to_task.pop(ident)
self.task_to_ident.pop(task)
task.cancel()
Copy link
Copy Markdown
Collaborator

@byubean byubean Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to __aexit__ each queue as it is removed.

Comment on lines +122 to +125
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Cancel all pending tasks - context exits
for task in self.task_to_ident:
task.cancel()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to __aexit__ each queue as we exit.

Comment on lines +136 to +155
async def __aiter__(self):
while self.task_to_ident:
# Wait until any of the current task is done
done, _ = await asyncio.wait(self.task_to_ident.keys(), return_when=asyncio.FIRST_COMPLETED)

for task in done:
ident = self.task_to_ident.pop(task)
# Stop listening to this identity
del self.ident_to_task[ident]

try:
result = await task
yield ident, result

# Start listening again
if not self.single_response:
self._add_task(ident, self.queues[ident])

except asyncio.CancelledError:
continue
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking cleaner. Nice.

workflow = historian.run()

await asyncio.sleep(0.1)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get resources for both p1 and p2, and show they both have "chats"

Then p1 says "bye", and show that p1 no longer has chats.

show that p2 has "chats" until the end of the workflow.

await historian.record_external_event('chat', 'p1', 'put', 'hello')
await historian.record_external_event('chat', 'p2', 'put', 'hi')
# Second message from p1 - should be ignored due to single_response = True
await historian.record_external_event('chat', 'p1', 'put', 'should not be received')
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should throw an exception.

Comment on lines +161 to +164
wrapper = self.queues.pop(ident, None)
if wrapper:
await wrapper.__aexit__(None, None, None)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapper = self.queues.pop(ident)
await wrapper.__aexit__(None, None, None)

if wrapper:
await wrapper.__aexit__(None, None, None)

self.active_queues.pop(ident, None)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No None

@byubean byubean merged commit d6e53db into main Apr 16, 2025
1 check passed
@byubean byubean deleted the multiqueue branch April 16, 2025 16:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants