Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: "Testing package"

on:
push:
pull_request:

jobs:
py-lint:
Expand Down
73 changes: 73 additions & 0 deletions python/natsrpy/_natsrpy_rs/js/managers.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import timedelta
from typing import final, overload

from typing_extensions import Self

from .consumers import (
PullConsumer,
PullConsumerConfig,
Expand All @@ -13,13 +15,63 @@ from .object_store import ObjectStore, ObjectStoreConfig
from .stream import Stream, StreamConfig

__all__ = [
"ConsumersIterator",
"ConsumersManager",
"ConsumersNamesIterator",
"CountersManager",
"KVManager",
"ObjectStoreManager",
"StreamsManager",
]

@final
class ConsumersIterator:
"""Async iterator over consumers subscribed to a stream.

Returned by :meth:`ConsumersManager.list`.
Consumers can be received using ``async for`` or by calling :meth:`next`
directly.

Consumer type is identified by its config. If it has deliver_subject set,
then PushConsumer is returned.
"""

def __aiter__(self) -> Self: ...
async def __anext__(self) -> PullConsumer | PushConsumer: ...
async def next(
self,
timeout: float | timedelta | None = None,
) -> PullConsumer | PushConsumer:
"""Receive the next consumer from the stream.

:param timeout: maximum time to wait for a message in seconds
or as a timedelta, defaults to None (wait indefinitely).
:return: the next consumer.
:raises StopAsyncIteration: when the subscription is drained or
unsubscribed.
"""

@final
class ConsumersNamesIterator:
"""Async iterator over names of consumers subscribed to a stream.

Returned by :meth:`ConsumersManager.list_names`.
Consumer names can be received using ``async for`` or by calling :meth:`next`
directly.
"""

def __aiter__(self) -> Self: ...
async def __anext__(self) -> str: ...
async def next(self, timeout: float | timedelta | None = None) -> str:
"""Receive the next consumer name from the stream.

:param timeout: maximum time to wait for a message in seconds
or as a timedelta, defaults to None (wait indefinitely).
:return: the next consumer name.
:raises StopAsyncIteration: when the subscription is drained or
unsubscribed.
"""

@final
class StreamsManager:
"""Manager for JetStream stream CRUD operations."""
Expand Down Expand Up @@ -185,6 +237,27 @@ class ConsumersManager:
:return: True if the consumer was resumed.
"""

async def list(self) -> ConsumersIterator:
"""List consumers subscribed to the stream.

This method iterates over all consumers on a
stream and retunrns correct types, by looking
at their config.

If you only need names, use :meth:`ConsumersManager.list_names` instead.

:return: an async iterator over consumers.
"""

async def list_names(self) -> ConsumersNamesIterator:
"""List names of consumers subscribed to the stream.

This method iterates over all consumer names on a
stream.

:return: an async iterator over consumer names.
"""

@final
class ObjectStoreManager:
"""Manager for object store bucket operations."""
Expand Down
56 changes: 53 additions & 3 deletions python/natsrpy/_natsrpy_rs/js/stream.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,10 @@ class Stream:
accessing messages in the stream, as well as managing consumers.
"""

@property
def consumers(self) -> ConsumersManager:
"""Manager for consumers bound to this stream."""

async def direct_get(
self,
sequence: int,
Expand All @@ -479,6 +483,45 @@ class Stream:
:return: the stream message.
"""

async def direct_get_next_for_subject(
self,
subject: str,
sequence: int | None = None,
timeout: float | timedelta | None = None,
) -> StreamMessage:
"""Get the next message for a subject directly from the stream.

:param subject: subject to get the next message for.
:param sequence: optional sequence number to start searching from.
If not provided, starts from the beginning of the stream.
:param timeout: operation timeout.
:return: the next stream message matching the subject filter.
"""

async def direct_get_first_for_subject(
self,
subject: str,
timeout: float | timedelta | None = None,
) -> StreamMessage:
"""Get the first message for a subject directly from the stream.

:param subject: subject to get the first message for.
:param timeout: operation timeout.
:return: the first stream message matching the subject filter.
"""

async def direct_get_last_for_subject(
self,
subject: str,
timeout: float | timedelta | None = None,
) -> StreamMessage:
"""Get the last message for a subject directly from the stream.

:param subject: subject to get the last message for.
:param timeout: operation timeout.
:return: the last stream message matching the subject filter.
"""

async def get_info(self, timeout: float | datetime | None = None) -> StreamInfo:
"""Get information about the stream.

Expand All @@ -505,6 +548,13 @@ class Stream:
:return: number of messages purged.
"""

@property
def consumers(self) -> ConsumersManager:
"""Manager for consumers bound to this stream."""
async def delete_message(
self,
sequence: int,
timeout: float | datetime | None = None,
) -> None:
"""Delete a message from the stream by sequence number.

:param sequence: sequence number of the message to delete.
:param timeout: operation timeout.
"""
3 changes: 3 additions & 0 deletions python/natsrpy/js.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
KVOperation,
KVStatus,
)
from ._natsrpy_rs.js.managers import ConsumersIterator, ConsumersNamesIterator
from ._natsrpy_rs.js.object_store import (
ObjectInfo,
ObjectInfoIterator,
Expand Down Expand Up @@ -54,6 +55,8 @@
"ClusterInfo",
"Compression",
"ConsumerLimits",
"ConsumersIterator",
"ConsumersNamesIterator",
"CounterEntry",
"Counters",
"CountersConfig",
Expand Down
185 changes: 185 additions & 0 deletions python/tests/test_stream_new_methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import uuid

from natsrpy.js import (
JetStream,
PullConsumer,
PullConsumerConfig,
PushConsumer,
PushConsumerConfig,
StreamConfig,
)


async def test_stream_direct_get_next_for_subject(js: JetStream) -> None:
name = f"test-dgnfs-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
stream = await js.streams.create(config)
try:
await js.publish(f"{name}.a", b"msg-a-1", wait=True)
await js.publish(f"{name}.a", b"msg-a-2", wait=True)
await js.publish(f"{name}.b", b"msg-b-1", wait=True)
msg = await stream.direct_get_next_for_subject(f"{name}.a")
assert msg.payload == b"msg-a-1"
assert msg.subject == f"{name}.a"
finally:
await js.streams.delete(name)


async def test_stream_direct_get_next_for_subject_with_sequence(
js: JetStream,
) -> None:
name = f"test-dgnfss-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
stream = await js.streams.create(config)
try:
await js.publish(f"{name}.a", b"msg-a-1", wait=True)
await js.publish(f"{name}.a", b"msg-a-2", wait=True)
await js.publish(f"{name}.b", b"msg-b-1", wait=True)
msg = await stream.direct_get_next_for_subject(f"{name}.a", sequence=2)
assert msg.payload == b"msg-a-2"
finally:
await js.streams.delete(name)


async def test_stream_direct_get_first_for_subject(js: JetStream) -> None:
name = f"test-dgffs-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
stream = await js.streams.create(config)
try:
await js.publish(f"{name}.a", b"first-msg", wait=True)
await js.publish(f"{name}.a", b"second-msg", wait=True)
msg = await stream.direct_get_first_for_subject(f"{name}.a")
assert msg.payload == b"first-msg"
assert msg.subject == f"{name}.a"
assert msg.sequence == 1
finally:
await js.streams.delete(name)


async def test_stream_direct_get_last_for_subject(js: JetStream) -> None:
name = f"test-dglfs-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
stream = await js.streams.create(config)
try:
await js.publish(f"{name}.a", b"first-msg", wait=True)
await js.publish(f"{name}.a", b"last-msg", wait=True)
msg = await stream.direct_get_last_for_subject(f"{name}.a")
assert msg.payload == b"last-msg"
assert msg.subject == f"{name}.a"
assert msg.sequence == 2
finally:
await js.streams.delete(name)


async def test_stream_delete_message(js: JetStream) -> None:
name = f"test-delmsg-{uuid.uuid4().hex[:8]}"
subj = f"{name}.data"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"msg-1", wait=True)
await js.publish(subj, b"msg-2", wait=True)
await js.publish(subj, b"msg-3", wait=True)
info = await stream.get_info()
assert info.state.messages == 3

await stream.delete_message(sequence=2)

info = await stream.get_info()
assert info.state.messages == 2
finally:
await js.streams.delete(name)


async def test_consumers_list(js: JetStream) -> None:
name = f"test-clist-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
consumer_name1 = f"consumer-{uuid.uuid4().hex[:8]}"
consumer_name2 = f"consumer-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=consumer_name1))
await stream.consumers.create(PullConsumerConfig(name=consumer_name2))

consumers_iter = await stream.consumers.list()
found = []
async for consumer in consumers_iter:
assert isinstance(consumer, (PullConsumer, PushConsumer))
found.append(consumer)
assert len(found) == 2
finally:
await js.streams.delete(name)


async def test_consumers_list_returns_correct_types(js: JetStream) -> None:
name = f"test-cltype-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
pull_name = f"pull-{uuid.uuid4().hex[:8]}"
push_name = f"push-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=pull_name))
deliver_subj = uuid.uuid4().hex
await stream.consumers.create(
PushConsumerConfig(deliver_subject=deliver_subj, name=push_name),
)

consumers_iter = await stream.consumers.list()
types_found: dict[str, type] = {}
async for consumer in consumers_iter:
if isinstance(consumer, PullConsumer):
types_found["pull"] = type(consumer)
elif isinstance(consumer, PushConsumer):
types_found["push"] = type(consumer)
assert types_found.get("pull") is PullConsumer
assert types_found.get("push") is PushConsumer
finally:
await js.streams.delete(name)


async def test_consumers_list_names(js: JetStream) -> None:
name = f"test-clnames-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
consumer_name1 = f"consumer-{uuid.uuid4().hex[:8]}"
consumer_name2 = f"consumer-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=consumer_name1))
await stream.consumers.create(PullConsumerConfig(name=consumer_name2))

names_iter = await stream.consumers.list_names()
found_names: list[str] = []
async for cname in names_iter:
assert isinstance(cname, str)
found_names.append(cname)
assert sorted(found_names) == sorted([consumer_name1, consumer_name2])
finally:
await js.streams.delete(name)


async def test_consumers_list_empty(js: JetStream) -> None:
name = f"test-clempty-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
consumers_iter = await stream.consumers.list()
found = []
async for consumer in consumers_iter:
found.append(consumer)
assert len(found) == 0
finally:
await js.streams.delete(name)


async def test_consumers_list_names_empty(js: JetStream) -> None:
name = f"test-clnempty-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
names_iter = await stream.consumers.list_names()
found_names: list[str] = []
async for cname in names_iter:
found_names.append(cname)
assert len(found_names) == 0
finally:
await js.streams.delete(name)
Loading
Loading