diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 89d7d3a..2017105 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,7 +1,7 @@ name: "Testing package" on: - push: + pull_request: jobs: py-lint: diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index 1eca8e1..2da9429 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -1,6 +1,8 @@ from datetime import timedelta from typing import final, overload +from typing_extensions import Self + from .consumers import ( PullConsumer, PullConsumerConfig, @@ -13,13 +15,63 @@ from .object_store import ObjectStore, ObjectStoreConfig from .stream import Stream, StreamConfig __all__ = [ + "ConsumersIterator", "ConsumersManager", + "ConsumersNamesIterator", "CountersManager", "KVManager", "ObjectStoreManager", "StreamsManager", ] +@final +class ConsumersIterator: + """Async iterator over consumers subscribed to a stream. + + Returned by :meth:`ConsumersManager.list`. + Consumers can be received using ``async for`` or by calling :meth:`next` + directly. + + Consumer type is identified by its config. If it has deliver_subject set, + then PushConsumer is returned. + """ + + def __aiter__(self) -> Self: ... + async def __anext__(self) -> PullConsumer | PushConsumer: ... + async def next( + self, + timeout: float | timedelta | None = None, + ) -> PullConsumer | PushConsumer: + """Receive the next consumer from the stream. + + :param timeout: maximum time to wait for a message in seconds + or as a timedelta, defaults to None (wait indefinitely). + :return: the next consumer. + :raises StopAsyncIteration: when the subscription is drained or + unsubscribed. + """ + +@final +class ConsumersNamesIterator: + """Async iterator over names of consumers subscribed to a stream. + + Returned by :meth:`ConsumersManager.list_names`. + Consumer names can be received using ``async for`` or by calling :meth:`next` + directly. + """ + + def __aiter__(self) -> Self: ... + async def __anext__(self) -> str: ... + async def next(self, timeout: float | timedelta | None = None) -> str: + """Receive the next consumer name from the stream. + + :param timeout: maximum time to wait for a message in seconds + or as a timedelta, defaults to None (wait indefinitely). + :return: the next consumer name. + :raises StopAsyncIteration: when the subscription is drained or + unsubscribed. + """ + @final class StreamsManager: """Manager for JetStream stream CRUD operations.""" @@ -185,6 +237,27 @@ class ConsumersManager: :return: True if the consumer was resumed. """ + async def list(self) -> ConsumersIterator: + """List consumers subscribed to the stream. + + This method iterates over all consumers on a + stream and retunrns correct types, by looking + at their config. + + If you only need names, use :meth:`ConsumersManager.list_names` instead. + + :return: an async iterator over consumers. + """ + + async def list_names(self) -> ConsumersNamesIterator: + """List names of consumers subscribed to the stream. + + This method iterates over all consumer names on a + stream. + + :return: an async iterator over consumer names. + """ + @final class ObjectStoreManager: """Manager for object store bucket operations.""" diff --git a/python/natsrpy/_natsrpy_rs/js/stream.pyi b/python/natsrpy/_natsrpy_rs/js/stream.pyi index 36a19a6..7721e40 100644 --- a/python/natsrpy/_natsrpy_rs/js/stream.pyi +++ b/python/natsrpy/_natsrpy_rs/js/stream.pyi @@ -467,6 +467,10 @@ class Stream: accessing messages in the stream, as well as managing consumers. """ + @property + def consumers(self) -> ConsumersManager: + """Manager for consumers bound to this stream.""" + async def direct_get( self, sequence: int, @@ -479,6 +483,45 @@ class Stream: :return: the stream message. """ + async def direct_get_next_for_subject( + self, + subject: str, + sequence: int | None = None, + timeout: float | timedelta | None = None, + ) -> StreamMessage: + """Get the next message for a subject directly from the stream. + + :param subject: subject to get the next message for. + :param sequence: optional sequence number to start searching from. + If not provided, starts from the beginning of the stream. + :param timeout: operation timeout. + :return: the next stream message matching the subject filter. + """ + + async def direct_get_first_for_subject( + self, + subject: str, + timeout: float | timedelta | None = None, + ) -> StreamMessage: + """Get the first message for a subject directly from the stream. + + :param subject: subject to get the first message for. + :param timeout: operation timeout. + :return: the first stream message matching the subject filter. + """ + + async def direct_get_last_for_subject( + self, + subject: str, + timeout: float | timedelta | None = None, + ) -> StreamMessage: + """Get the last message for a subject directly from the stream. + + :param subject: subject to get the last message for. + :param timeout: operation timeout. + :return: the last stream message matching the subject filter. + """ + async def get_info(self, timeout: float | datetime | None = None) -> StreamInfo: """Get information about the stream. @@ -505,6 +548,13 @@ class Stream: :return: number of messages purged. """ - @property - def consumers(self) -> ConsumersManager: - """Manager for consumers bound to this stream.""" + async def delete_message( + self, + sequence: int, + timeout: float | datetime | None = None, + ) -> None: + """Delete a message from the stream by sequence number. + + :param sequence: sequence number of the message to delete. + :param timeout: operation timeout. + """ diff --git a/python/natsrpy/js.py b/python/natsrpy/js.py index 8c09b75..f4e8245 100644 --- a/python/natsrpy/js.py +++ b/python/natsrpy/js.py @@ -20,6 +20,7 @@ KVOperation, KVStatus, ) +from ._natsrpy_rs.js.managers import ConsumersIterator, ConsumersNamesIterator from ._natsrpy_rs.js.object_store import ( ObjectInfo, ObjectInfoIterator, @@ -54,6 +55,8 @@ "ClusterInfo", "Compression", "ConsumerLimits", + "ConsumersIterator", + "ConsumersNamesIterator", "CounterEntry", "Counters", "CountersConfig", diff --git a/python/tests/test_stream_new_methods.py b/python/tests/test_stream_new_methods.py new file mode 100644 index 0000000..fad9602 --- /dev/null +++ b/python/tests/test_stream_new_methods.py @@ -0,0 +1,185 @@ +import uuid + +from natsrpy.js import ( + JetStream, + PullConsumer, + PullConsumerConfig, + PushConsumer, + PushConsumerConfig, + StreamConfig, +) + + +async def test_stream_direct_get_next_for_subject(js: JetStream) -> None: + name = f"test-dgnfs-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True) + stream = await js.streams.create(config) + try: + await js.publish(f"{name}.a", b"msg-a-1", wait=True) + await js.publish(f"{name}.a", b"msg-a-2", wait=True) + await js.publish(f"{name}.b", b"msg-b-1", wait=True) + msg = await stream.direct_get_next_for_subject(f"{name}.a") + assert msg.payload == b"msg-a-1" + assert msg.subject == f"{name}.a" + finally: + await js.streams.delete(name) + + +async def test_stream_direct_get_next_for_subject_with_sequence( + js: JetStream, +) -> None: + name = f"test-dgnfss-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True) + stream = await js.streams.create(config) + try: + await js.publish(f"{name}.a", b"msg-a-1", wait=True) + await js.publish(f"{name}.a", b"msg-a-2", wait=True) + await js.publish(f"{name}.b", b"msg-b-1", wait=True) + msg = await stream.direct_get_next_for_subject(f"{name}.a", sequence=2) + assert msg.payload == b"msg-a-2" + finally: + await js.streams.delete(name) + + +async def test_stream_direct_get_first_for_subject(js: JetStream) -> None: + name = f"test-dgffs-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True) + stream = await js.streams.create(config) + try: + await js.publish(f"{name}.a", b"first-msg", wait=True) + await js.publish(f"{name}.a", b"second-msg", wait=True) + msg = await stream.direct_get_first_for_subject(f"{name}.a") + assert msg.payload == b"first-msg" + assert msg.subject == f"{name}.a" + assert msg.sequence == 1 + finally: + await js.streams.delete(name) + + +async def test_stream_direct_get_last_for_subject(js: JetStream) -> None: + name = f"test-dglfs-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True) + stream = await js.streams.create(config) + try: + await js.publish(f"{name}.a", b"first-msg", wait=True) + await js.publish(f"{name}.a", b"last-msg", wait=True) + msg = await stream.direct_get_last_for_subject(f"{name}.a") + assert msg.payload == b"last-msg" + assert msg.subject == f"{name}.a" + assert msg.sequence == 2 + finally: + await js.streams.delete(name) + + +async def test_stream_delete_message(js: JetStream) -> None: + name = f"test-delmsg-{uuid.uuid4().hex[:8]}" + subj = f"{name}.data" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"msg-1", wait=True) + await js.publish(subj, b"msg-2", wait=True) + await js.publish(subj, b"msg-3", wait=True) + info = await stream.get_info() + assert info.state.messages == 3 + + await stream.delete_message(sequence=2) + + info = await stream.get_info() + assert info.state.messages == 2 + finally: + await js.streams.delete(name) + + +async def test_consumers_list(js: JetStream) -> None: + name = f"test-clist-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + consumer_name1 = f"consumer-{uuid.uuid4().hex[:8]}" + consumer_name2 = f"consumer-{uuid.uuid4().hex[:8]}" + await stream.consumers.create(PullConsumerConfig(name=consumer_name1)) + await stream.consumers.create(PullConsumerConfig(name=consumer_name2)) + + consumers_iter = await stream.consumers.list() + found = [] + async for consumer in consumers_iter: + assert isinstance(consumer, (PullConsumer, PushConsumer)) + found.append(consumer) + assert len(found) == 2 + finally: + await js.streams.delete(name) + + +async def test_consumers_list_returns_correct_types(js: JetStream) -> None: + name = f"test-cltype-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + pull_name = f"pull-{uuid.uuid4().hex[:8]}" + push_name = f"push-{uuid.uuid4().hex[:8]}" + await stream.consumers.create(PullConsumerConfig(name=pull_name)) + deliver_subj = uuid.uuid4().hex + await stream.consumers.create( + PushConsumerConfig(deliver_subject=deliver_subj, name=push_name), + ) + + consumers_iter = await stream.consumers.list() + types_found: dict[str, type] = {} + async for consumer in consumers_iter: + if isinstance(consumer, PullConsumer): + types_found["pull"] = type(consumer) + elif isinstance(consumer, PushConsumer): + types_found["push"] = type(consumer) + assert types_found.get("pull") is PullConsumer + assert types_found.get("push") is PushConsumer + finally: + await js.streams.delete(name) + + +async def test_consumers_list_names(js: JetStream) -> None: + name = f"test-clnames-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + consumer_name1 = f"consumer-{uuid.uuid4().hex[:8]}" + consumer_name2 = f"consumer-{uuid.uuid4().hex[:8]}" + await stream.consumers.create(PullConsumerConfig(name=consumer_name1)) + await stream.consumers.create(PullConsumerConfig(name=consumer_name2)) + + names_iter = await stream.consumers.list_names() + found_names: list[str] = [] + async for cname in names_iter: + assert isinstance(cname, str) + found_names.append(cname) + assert sorted(found_names) == sorted([consumer_name1, consumer_name2]) + finally: + await js.streams.delete(name) + + +async def test_consumers_list_empty(js: JetStream) -> None: + name = f"test-clempty-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + consumers_iter = await stream.consumers.list() + found = [] + async for consumer in consumers_iter: + found.append(consumer) + assert len(found) == 0 + finally: + await js.streams.delete(name) + + +async def test_consumers_list_names_empty(js: JetStream) -> None: + name = f"test-clnempty-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + names_iter = await stream.consumers.list_names() + found_names: list[str] = [] + async for cname in names_iter: + found_names.append(cname) + assert len(found_names) == 0 + finally: + await js.streams.delete(name) diff --git a/src/exceptions/rust_err.rs b/src/exceptions/rust_err.rs index 12ccac4..99e6689 100644 --- a/src/exceptions/rust_err.rs +++ b/src/exceptions/rust_err.rs @@ -85,10 +85,12 @@ pub enum NatsrpyError { #[error(transparent)] StreamLastRawMessageError(#[from] async_nats::jetstream::stream::LastRawMessageError), #[error(transparent)] - PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError), + StreamDeleteMessageError(#[from] async_nats::jetstream::stream::DeleteMessageError), #[error(transparent)] ConsumerError(#[from] async_nats::jetstream::stream::ConsumerError), #[error(transparent)] + PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError), + #[error(transparent)] PullConsumerBatchError(#[from] async_nats::jetstream::consumer::pull::BatchError), #[error(transparent)] PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError), diff --git a/src/js/managers/consumers.rs b/src/js/managers/consumers.rs index 9433085..0500a40 100644 --- a/src/js/managers/consumers.rs +++ b/src/js/managers/consumers.rs @@ -1,14 +1,144 @@ use std::{sync::Arc, time::Duration}; -use pyo3::{Bound, FromPyObject, IntoPyObjectExt, PyAny, Python}; -use tokio::sync::RwLock; +use futures_util::StreamExt; +use pyo3::{Bound, FromPyObject, IntoPyObjectExt, PyAny, PyRef, Python}; +use tokio::sync::{Mutex, RwLock}; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, js::consumers::{self, pull::PullConsumer, push::PushConsumer}, - utils::{natsrpy_future, py_types::TimeValue}, + utils::{ + futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue, + streamer::Streamer, + }, }; +#[pyo3::pyclass] +pub struct ConsumersIterator { + streamer: Arc< + Mutex< + Streamer< + Result< + async_nats::jetstream::consumer::Info, + async_nats::jetstream::stream::ConsumersError, + >, + >, + >, + >, + stream: Arc>>, +} + +#[pyo3::pyclass] +pub struct ConsumersNamesIterator { + streamer: Arc>>>, +} + +impl ConsumersNamesIterator { + #[must_use] + pub fn new( + streamer: Streamer>, + ) -> Self { + Self { + streamer: Arc::new(Mutex::new(streamer)), + } + } +} + +#[pyo3::pymethods] +impl ConsumersNamesIterator { + #[must_use] + pub const fn __aiter__(slf: PyRef) -> PyRef { + slf + } + + #[pyo3(signature=(timeout=None))] + pub fn next<'py>( + &self, + py: Python<'py>, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.streamer.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let value = ctx.lock().await.next().await; + match value { + Some(name) => Ok(name?), + None => Err(NatsrpyError::AsyncStopIteration), + } + }) + } + + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + self.next(py, None) + } +} + +impl ConsumersIterator { + pub fn new( + stream: Arc< + RwLock>, + >, + streamer: Streamer< + Result< + async_nats::jetstream::consumer::Info, + async_nats::jetstream::stream::ConsumersError, + >, + >, + ) -> Self { + Self { + stream, + streamer: Arc::new(Mutex::new(streamer)), + } + } +} + +#[pyo3::pymethods] +impl ConsumersIterator { + #[must_use] + pub const fn __aiter__(slf: PyRef) -> PyRef { + slf + } + + #[pyo3(signature=(timeout=None))] + pub fn next<'py>( + &self, + py: Python<'py>, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.streamer.clone(); + let stream = self.stream.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let value = ctx.lock().await.next().await; + match value { + Some(info) => { + let info = info?; + let Some(consumer_name) = info.config.name else { + return Err(NatsrpyError::SessionError(String::from( + "Received consumer without a name.", + ))); + }; + // That means that the consumer is PushBased. + if info.config.deliver_subject.is_some() { + let consumer = consumers::push::consumer::PushConsumer::new( + stream.read().await.get_consumer(&consumer_name).await?, + ); + Ok(Python::attach(|py| consumer.into_py_any(py))?) + } else { + let consumer = consumers::pull::consumer::PullConsumer::new( + stream.read().await.get_consumer(&consumer_name).await?, + ); + Ok(Python::attach(|py| consumer.into_py_any(py))?) + } + } + None => Err(NatsrpyError::AsyncStopIteration), + } + }) + } + + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + self.next(py, None) + } +} + #[pyo3::pyclass] pub struct ConsumersManager { stream: Arc>>, @@ -146,4 +276,23 @@ impl ConsumersManager { Ok(ctx.read().await.delete_consumer(&name).await?.success) }) } + + pub fn list<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future(py, async move { + let consumers = ctx.read().await.consumers(); + Ok(ConsumersIterator::new( + ctx.clone(), + Streamer::new(consumers), + )) + }) + } + + pub fn list_names<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future(py, async move { + let consumers = ctx.read().await.consumer_names(); + Ok(ConsumersNamesIterator::new(Streamer::new(consumers))) + }) + } } diff --git a/src/js/managers/mod.rs b/src/js/managers/mod.rs index 671e48e..9f193fc 100644 --- a/src/js/managers/mod.rs +++ b/src/js/managers/mod.rs @@ -7,7 +7,7 @@ pub mod streams; #[pyo3::pymodule(submodule, name = "managers")] pub mod pymod { #[pymodule_export] - use super::consumers::ConsumersManager; + use super::consumers::{ConsumersIterator, ConsumersManager, ConsumersNamesIterator}; #[pymodule_export] use super::counters::CountersManager; #[pymodule_export] diff --git a/src/js/stream.rs b/src/js/stream.rs index 0ce2845..351910e 100644 --- a/src/js/stream.rs +++ b/src/js/stream.rs @@ -979,6 +979,67 @@ impl Stream { }) } + #[pyo3(signature=(subject, sequence=None, timeout=None))] + pub fn direct_get_next_for_subject<'py>( + &self, + py: Python<'py>, + subject: String, + sequence: Option, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let message = ctx + .read() + .await + .direct_get_next_for_subject(subject, sequence) + .await?; + let result = + Python::attach(move |gil| StreamMessage::from_nats_message(gil, &message))?; + Ok(result) + }) + } + + #[pyo3(signature=(subject, timeout=None))] + pub fn direct_get_first_for_subject<'py>( + &self, + py: Python<'py>, + subject: String, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let message = ctx + .read() + .await + .direct_get_first_for_subject(subject) + .await?; + let result = + Python::attach(move |gil| StreamMessage::from_nats_message(gil, &message))?; + Ok(result) + }) + } + + #[pyo3(signature=(subject, timeout=None))] + pub fn direct_get_last_for_subject<'py>( + &self, + py: Python<'py>, + subject: String, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let message = ctx + .read() + .await + .direct_get_last_for_subject(subject) + .await?; + let result = + Python::attach(move |gil| StreamMessage::from_nats_message(gil, &message))?; + Ok(result) + }) + } + #[pyo3(signature=(timeout=None))] pub fn get_info<'py>( &self, @@ -1030,6 +1091,20 @@ impl Stream { Ok(resp.purged) }) } + + #[pyo3(signature=(sequence, timeout=None))] + pub fn delete_message<'py>( + &self, + py: Python<'py>, + sequence: u64, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + ctx.read().await.delete_message(sequence).await?; + Ok(()) + }) + } } #[pyo3::pymodule(submodule, name = "stream")]