diff --git a/Cargo.toml b/Cargo.toml index ad0d018..5f597e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,3 +26,11 @@ serde_json = "1.0.149" thiserror = "2.0.18" time = "0.3.47" tokio = { version = "1.50.0", features = ["full"] } + +[profile.release] +lto = "fat" +codegen-units = 1 +opt-level = 3 +strip = true +debug = false +panic = "abort" diff --git a/examples/consumers.py b/examples/consumers.py new file mode 100644 index 0000000..f15a114 --- /dev/null +++ b/examples/consumers.py @@ -0,0 +1,66 @@ +import asyncio + +from natsrpy import Nats +from natsrpy.js import PullConsumerConfig, PushConsumerConfig, StreamConfig + + +async def main() -> None: + """Main function to run the example.""" + nats = Nats(["nats://localhost:4222"]) + await nats.startup() + + js = await nats.jetstream() + + stream = await js.streams.create_or_update( + StreamConfig( + name="stream-example", + subjects=["stream.example.>"], + description="Stream example", + ), + ) + + # Push and pull consumers have different configurations. + # If you supply PushConsumerConfig, you will get a push consumer, + # and otherwise you will get a PullConsumer. + # + # They have different APIs. + pull_consumer = await stream.consumers.create( + PullConsumerConfig( + name="example-pull", + durable_name="example-pull", + ), + ) + push_consumer = await stream.consumers.create( + PushConsumerConfig( + name="example-push", + deliver_subject="example-push", + durable_name="example-push", + ), + ) + + # We publish a single message + await js.publish("stream.example.test", "message for stream") + + # We use messages() to get async iterator which we + # use to get messages for push_consumer. + async for push_message in await push_consumer.messages(): + print(f"[FROM_PUSH] {push_message.payload}") # noqa: T201 + await push_message.ack() + break + + # Pull consumers have to request batches of messages. + for pull_message in await pull_consumer.fetch(max_messages=10): + print(f"[FROM_PULL] {pull_message.payload}") # noqa: T201 + await pull_message.ack() + + # Cleanup + await stream.consumers.delete(push_consumer.name) + await stream.consumers.delete(pull_consumer.name) + await js.streams.delete(stream.name) + + # Don't forget to call shutdown. + await nats.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/counters.py b/examples/counters.py new file mode 100644 index 0000000..bb4754f --- /dev/null +++ b/examples/counters.py @@ -0,0 +1,45 @@ +import asyncio + +from natsrpy import Nats +from natsrpy.js import CountersConfig + + +async def main() -> None: + """Main function to run the example.""" + nats = Nats(["nats://localhost:4222"]) + await nats.startup() + + js = await nats.jetstream() + # Counters store is basically a stream, + # but each subject is considered as a counter. + # You can read more about how it works here: + # https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-49.md + counters = await js.counters.create_or_update( + CountersConfig( + name="counters", + subjects=["counters.>"], + ), + ) + + # We have a nice interface for counters. + # Please note, that ADD accepts + # positive and NEGATIVE values. + # I'd rename this method, but this API is + # defined in ADR-49, so we won't change this. + # + # Each add\incr\decr returns current value of the counter. + await counters.add("counters.one", +8) + await counters.add("counters.one", -2) + # Increase by 1 + await counters.incr("counters.one") + # Decrease by 1 + await counters.decr("counters.one") + + print(await counters.get("counters.one")) # noqa: T201 + + # Don't forget to call shutdown. + await nats.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/kv.py b/examples/kv.py new file mode 100644 index 0000000..860c6fd --- /dev/null +++ b/examples/kv.py @@ -0,0 +1,46 @@ +import asyncio + +from natsrpy import Nats +from natsrpy.js import KVConfig + + +async def main() -> None: + """Main function to run the example.""" + nats = Nats(["nats://localhost:4222"]) + await nats.startup() + + js = await nats.jetstream() + + kv = await js.kv.create_or_update(KVConfig(bucket="kv-example")) + + watcher = await kv.watch("test-key") + + await kv.put("test-key", "one") + await kv.put("test-key", b"two") + + # To obtain bytes value. + value = await kv.get("test-key") + if value: + print("[VALUE]", value.decode()) # noqa: T201 + # To get kv-entry with all + # the metadata. + entry = await kv.entry("test-key") + if entry: + print("[ENTRY]", entry) # noqa: T201 + + await kv.delete("test-key") + + # Alternatively you can + # use await watcher.next() + async for event in watcher: + print("[EVENT]", event) # noqa: T201 + break + + await js.kv.delete(kv.name) + + # Don't forget to call shutdown. + await nats.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/object_store.py b/examples/object_store.py new file mode 100644 index 0000000..a9779dc --- /dev/null +++ b/examples/object_store.py @@ -0,0 +1,45 @@ +import asyncio +import io +from datetime import timedelta +from pathlib import Path + +from natsrpy import Nats +from natsrpy.js import ObjectStoreConfig + + +async def main() -> None: + """Main function to run the example.""" + nats = Nats(["nats://localhost:4222"]) + await nats.startup() + + js = await nats.jetstream() + + store = await js.object_store.create( + ObjectStoreConfig( + bucket="example-bucket", + max_age=timedelta(minutes=1), + ), + ) + await store.put("test2.py", Path(__file__).read_bytes()) + await store.put("test.py", str(Path(__file__))) + + async for obj in await store.list(): + print(obj) # noqa: T201 + # We use BytesIO, since + # get takes writer as it's argument. + # + # That happens because files can be very large, + # and this approach allows us to stream directly + # to files. using `open('file', 'wb') as output:` + with io.BytesIO() as output: + await store.get(obj.name, output) + assert obj.size == len(output.getvalue()) # noqa: S101 + + await store.delete(obj.name) + + # Don't forget to call shutdown. + await nats.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/request_reply.py b/examples/request_reply.py new file mode 100644 index 0000000..f90b6a4 --- /dev/null +++ b/examples/request_reply.py @@ -0,0 +1,42 @@ +import asyncio + +from natsrpy import Message, Nats + + +async def main() -> None: + """Main function to run the example.""" + nats = Nats(["nats://localhost:4222"]) + await nats.startup() + subj = "request-reply" + + # Here we create responder, that will be + # answering to our requests. + async def responder(message: Message) -> None: + print(f"[REQUEST]: {message.payload}, headers={message.headers}") # noqa: T201 + if message.reply: + await nats.publish( + message.reply, + f"reply to {message.payload}", + headers=message.headers, + ) + + # Start responder using callback-based subsciption. + sub = await nats.subscribe(subj, callback=responder) + # Send 3 concurrent requests. + responses = await asyncio.gather( + nats.request(subj, "request1"), + nats.request(subj, "request2", headers={"header": "value"}), + nats.request(subj, "request3", inbox="test-inbox"), + ) + # Disconnect resonder. + await sub.drain() + + # Iterate over replies. + for resp in responses: + print(f"[RESPONSE]: {resp}") # noqa: T201 + + await nats.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/simple_publish.py b/examples/simple_publish.py new file mode 100644 index 0000000..3ae5005 --- /dev/null +++ b/examples/simple_publish.py @@ -0,0 +1,35 @@ +import asyncio + +from natsrpy import Nats + + +async def main() -> None: + """Main function to run the example.""" + nats = Nats(["nats://localhost:4222"]) + await nats.startup() + + # Here we initiate subscription. + # We do it before sending messages, + # in order to catch them once we will start reading. + subscription = await nats.subscribe("hello") + + # Publish accepts str | bytes | bytearray | memoryview + await nats.publish("hello", "str world") + await nats.publish("hello", b"bytes world") + await nats.publish("hello", bytearray(b"bytearray world")) + await nats.publish("hello", "headers", headers={"one": "two"}) + await nats.publish("hello", "headers", headers={"one": ["two", "three"]}) + + # Calling this method will unsubscribe us, + # after `n` delivered messages. + # or immediately if `n` is not provided. + subscription.unsubscribe(limit=5) + async for message in subscription: + print(message) # noqa: T201 + + # Don't forget to call shutdown. + await nats.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/subscriptions.py b/examples/subscriptions.py new file mode 100644 index 0000000..f01dbb3 --- /dev/null +++ b/examples/subscriptions.py @@ -0,0 +1,56 @@ +import asyncio + +from natsrpy import Message, Nats + + +async def main() -> None: + """Main function to run the example.""" + nats = Nats(["nats://localhost:4222"]) + await nats.startup() + + cb_lock = asyncio.Event() + + async def callback(message: Message) -> None: + print(f"[FROM_CALLBACK] {message.payload}") # noqa: T201 + cb_lock.set() + + # When subscribing you can set callback. + # In that case CallbackSubscription is returned. + # This type of subscription cannot be iterated. + cb_sub = await nats.subscribe("cb-subj", callback=callback) + + # When callback is not set, you get a subscription + # that should be used along with `async for` + # loop, or alternatively you can call + # `await iter_sub.next()` to get a single message. + iter_sub = await nats.subscribe("iter-subj") + + # Subscriptions with queue argument create + # subscription with a queue group to distribute + # messages along all subscribers. + queue_sub = await nats.subscribe("queue-subj", queue="example-queue") + + await nats.publish("cb-subj", "message for callback") + await nats.publish("iter-subj", "message for iterator") + await nats.publish("queue-subj", "message for queue sub") + + # We can unsubscribe after a particular amount of messages. + await iter_sub.unsubscribe(limit=1) + await cb_sub.unsubscribe(limit=1) + await queue_sub.unsubscribe(limit=1) + + async for message in iter_sub: + print(f"[FROM_ITERATOR] {message.payload}") # noqa: T201 + + async for message in queue_sub: + print(f"[FROM_QUEUED] {message.payload}") # noqa: T201 + + # Making sure that the message in callback is received. + await cb_lock.wait() + + # Don't forget to call shutdown. + await nats.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/natsrpy/_natsrpy_rs/__init__.pyi b/python/natsrpy/_natsrpy_rs/__init__.pyi index e181442..1d01c29 100644 --- a/python/natsrpy/_natsrpy_rs/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/__init__.pyi @@ -206,12 +206,14 @@ class Nats: self, subject: str, callback: Callable[[Message], Awaitable[None]], + queue: str | None = None, ) -> Future[CallbackSubscription]: ... @overload def subscribe( self, subject: str, callback: None = None, + queue: str | None = None, ) -> Future[IteratorSubscription]: ... def jetstream( self, diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index 34ecf51..ee2aa78 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -300,6 +300,14 @@ class PushConsumer: Messages are delivered by the server to a specified subject. """ + @property + def name(self) -> str: + """Get consumer name.""" + + @property + def stream_name(self) -> str: + """Get stream name that this consumer attached to.""" + def messages(self) -> Future[MessagesIterator]: """Get an async iterator for consuming messages. @@ -313,6 +321,14 @@ class PullConsumer: Messages are fetched on demand in batches by the client. """ + @property + def name(self) -> str: + """Get consumer name.""" + + @property + def stream_name(self) -> str: + """Get stream name that this consumer attached to.""" + def fetch( self, max_messages: int | None = None, diff --git a/python/natsrpy/_natsrpy_rs/js/stream.pyi b/python/natsrpy/_natsrpy_rs/js/stream.pyi index 74bec7c..ca9c236 100644 --- a/python/natsrpy/_natsrpy_rs/js/stream.pyi +++ b/python/natsrpy/_natsrpy_rs/js/stream.pyi @@ -468,6 +468,10 @@ class Stream: accessing messages in the stream, as well as managing consumers. """ + @property + def name(self) -> str: + """Stream name.""" + @property def consumers(self) -> ConsumersManager: """Manager for consumers bound to this stream.""" diff --git a/src/js/consumers/pull/consumer.rs b/src/js/consumers/pull/consumer.rs index 8570e35..7956883 100644 --- a/src/js/consumers/pull/consumer.rs +++ b/src/js/consumers/pull/consumer.rs @@ -15,13 +15,20 @@ type NatsPullConsumer = #[pyo3::pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PullConsumer { + #[pyo3(get)] + name: String, + #[pyo3(get)] + stream_name: String, consumer: Arc>, } impl PullConsumer { #[must_use] pub fn new(consumer: NatsPullConsumer) -> Self { + let info = consumer.cached_info(); Self { + name: info.name.clone(), + stream_name: info.stream_name.clone(), consumer: Arc::new(RwLock::new(consumer)), } } @@ -94,4 +101,13 @@ impl PullConsumer { Ok(ret_messages) }) } + + #[must_use] + pub fn __repr__(&self) -> String { + format!( + "PullConsumer", + name = self.name, + stream_name = self.stream_name + ) + } } diff --git a/src/js/consumers/push/consumer.rs b/src/js/consumers/push/consumer.rs index 7426ec5..e79a276 100644 --- a/src/js/consumers/push/consumer.rs +++ b/src/js/consumers/push/consumer.rs @@ -16,13 +16,20 @@ type NatsPushConsumer = #[pyo3::pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PushConsumer { + #[pyo3(get)] + name: String, + #[pyo3(get)] + stream_name: String, consumer: Arc>, } impl PushConsumer { #[must_use] pub fn new(consumer: NatsPushConsumer) -> Self { + let info = consumer.cached_info(); Self { + name: info.name.clone(), + stream_name: info.stream_name.clone(), consumer: Arc::new(RwLock::new(consumer)), } } @@ -51,6 +58,15 @@ impl PushConsumer { )) }) } + + #[must_use] + pub fn __repr__(&self) -> String { + format!( + "PushConsumer", + name = self.name, + stream_name = self.stream_name + ) + } } #[pyo3::pymethods] diff --git a/src/js/kv.rs b/src/js/kv.rs index 59bdd92..e295d70 100644 --- a/src/js/kv.rs +++ b/src/js/kv.rs @@ -166,6 +166,21 @@ pub struct KVEntry { pub seen_current: bool, } +#[pyo3::pymethods] +impl KVEntry { + #[must_use] + pub fn __repr__(&self) -> String { + format!( + "KVEntry", + bucket = self.bucket, + key = self.key, + value = self.value, + revision = self.revision, + created = self.created, + ) + } +} + impl TryFrom for KVEntry { type Error = NatsrpyError; diff --git a/src/js/stream.rs b/src/js/stream.rs index 351910e..4892fcc 100644 --- a/src/js/stream.rs +++ b/src/js/stream.rs @@ -942,6 +942,8 @@ impl From for PurgeResponse { #[pyo3::pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct Stream { + #[pyo3(get)] + name: String, stream: Arc>>, } impl Stream { @@ -949,7 +951,9 @@ impl Stream { pub fn new( stream: async_nats::jetstream::stream::Stream, ) -> Self { + let info = stream.cached_info(); Self { + name: info.config.name.clone(), stream: Arc::new(RwLock::new(stream)), } } @@ -1105,6 +1109,11 @@ impl Stream { Ok(()) }) } + + #[must_use] + pub fn __repr__(&self) -> String { + format!("Stream", name = self.name) + } } #[pyo3::pymodule(submodule, name = "stream")] diff --git a/src/nats_cls.rs b/src/nats_cls.rs index 27fbe49..00fd8f1 100644 --- a/src/nats_cls.rs +++ b/src/nats_cls.rs @@ -1,8 +1,5 @@ use async_nats::{Subject, client::traits::Publisher, message::OutboundMessage}; -use pyo3::{ - Bound, IntoPyObjectExt, Py, PyAny, Python, - types::{PyBytes, PyBytesMethods, PyDict}, -}; +use pyo3::{Bound, IntoPyObjectExt, Py, PyAny, Python, types::PyDict}; use std::sync::Arc; use tokio::sync::RwLock; @@ -126,11 +123,15 @@ impl NatsCls { err_on_disconnect: bool, ) -> NatsrpyResult> { let session = self.nats_session.clone(); - log::info!("Payload: {payload:?}"); - let data = payload.into(); + let data = bytes::Bytes::from(payload); let headermap = headers .map(async_nats::HeaderMap::from_pydict) .transpose()?; + log::debug!( + "Nats.publish is called with a message to subject '{}' with payload of size {} bytes", + subject, + data.len() + ); natsrpy_future(py, async move { if let Some(session) = session.read().await.as_ref() { if err_on_disconnect @@ -158,16 +159,21 @@ impl NatsCls { &self, py: Python<'py>, subject: String, - payload: Option>, + payload: Option, headers: Option>, inbox: Option, timeout: Option, ) -> NatsrpyResult> { let session = self.nats_session.clone(); - let data = payload.map(|inner| bytes::Bytes::from(inner.as_bytes().to_vec())); + let data = payload.map(bytes::Bytes::from); let headermap = headers .map(async_nats::HeaderMap::from_pydict) .transpose()?; + log::debug!( + "Nats.request is called with a message to subject '{}' with payload of size {} bytes", + subject, + data.as_ref().map_or(0, bytes::Bytes::len) + ); natsrpy_future(py, async move { if let Some(session) = session.read().await.as_ref() { let request = async_nats::Request { @@ -196,22 +202,28 @@ impl NatsCls { }) } - #[pyo3(signature=(subject, callback=None))] + #[pyo3(signature=(subject, callback=None, queue=None))] pub fn subscribe<'py>( &self, py: Python<'py>, subject: String, callback: Option>, + queue: Option, ) -> NatsrpyResult> { log::debug!("Subscribing to '{subject}'"); let session = self.nats_session.clone(); natsrpy_future(py, async move { if let Some(session) = session.read().await.as_ref() { + let subscriber = if let Some(queue) = queue { + session.queue_subscribe(subject, queue).await? + } else { + session.subscribe(subject).await? + }; if let Some(cb) = callback { - let sub = CallbackSubscription::new(session.subscribe(subject).await?, cb)?; + let sub = CallbackSubscription::new(subscriber, cb)?; Ok(Python::attach(|gil| sub.into_py_any(gil))?) } else { - let sub = IteratorSubscription::new(session.subscribe(subject).await?); + let sub = IteratorSubscription::new(subscriber); Ok(Python::attach(|gil| sub.into_py_any(gil))?) } } else { diff --git a/src/subscriptions/callback.rs b/src/subscriptions/callback.rs index 0bc5cbe..f83d0bf 100644 --- a/src/subscriptions/callback.rs +++ b/src/subscriptions/callback.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use futures_util::StreamExt; use pyo3::{Bound, Py, PyAny, Python}; @@ -14,6 +14,7 @@ pub struct CallbackSubscription { async fn process_message(message: async_nats::message::Message, py_callback: Py) { let task = async || -> NatsrpyResult<()> { + log::debug!("Received message: {:?}. Processing ...", &message); let message = crate::message::Message::try_from(&message)?; let awaitable = Python::attach(|gil| -> NatsrpyResult<_> { let res = py_callback.call1(gil, (message,))?; @@ -21,6 +22,7 @@ async fn process_message(message: async_nats::message::Message, py_callback: Py< Ok(rust_task) })?; awaitable.await?; + log::debug!("Python callback successfully awaited."); Ok(()) }; if let Err(err) = task().await { @@ -33,13 +35,32 @@ async fn start_py_sub( py_callback: Py, locals: pyo3_async_runtimes::TaskLocals, ) { - while let Some(message) = sub.lock().await.next().await { + loop { + let message = { + let mut sub_guard = sub.lock().await; + // We wait up to 0.2 second for new messages. + // If this thing doesn't resolve in this period, + // we just release the lock. Otherwise it would be impossible to + // unsubscribe. + match tokio::time::timeout(Duration::from_millis(200), sub_guard.next()).await { + Ok(Some(message)) => message, + Ok(None) => break, + _ => continue, + } + }; let py_cb = Python::attach(|py| py_callback.clone_ref(py)); tokio::spawn(pyo3_async_runtimes::tokio::scope( locals.clone(), process_message(message, py_cb), )); } + // while let Some(message) = sub.lock().await.next().await { + // let py_cb = Python::attach(|py| py_callback.clone_ref(py)); + // tokio::spawn(pyo3_async_runtimes::tokio::scope( + // locals.clone(), + // process_message(message, py_cb), + // )); + // } } impl CallbackSubscription {