diff --git a/python/natsrpy/_natsrpy_rs/js/__init__.pyi b/python/natsrpy/_natsrpy_rs/js/__init__.pyi index 70b9c28..f09bb44 100644 --- a/python/natsrpy/_natsrpy_rs/js/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/js/__init__.pyi @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import datetime, timedelta from typing import Any from .managers import KVManager, ObjectStoreManager, StreamsManager @@ -29,6 +29,26 @@ class JetStreamMessage: def payload(self) -> bytes: ... @property def headers(self) -> dict[str, Any]: ... + @property + def domain(self) -> str | None: ... + @property + def acc_hash(self) -> str | None: ... + @property + def stream(self) -> str: ... + @property + def consumer(self) -> str: ... + @property + def stream_sequence(self) -> int: ... + @property + def consumer_sequence(self) -> int: ... + @property + def delivered(self) -> int: ... + @property + def pending(self) -> int: ... + @property + def published(self) -> datetime: ... + @property + def token(self) -> str | None: ... async def ack(self, double: bool = False) -> None: """ Acknowledge that a message was handled. diff --git a/src/js/message.rs b/src/js/message.rs index 4c6a356..d8045dd 100644 --- a/src/js/message.rs +++ b/src/js/message.rs @@ -1,18 +1,53 @@ use pyo3::{ Bound, Py, PyAny, Python, - types::{PyBytes, PyDict}, + types::{PyBytes, PyDateTime, PyDict}, }; use std::sync::Arc; use tokio::sync::RwLock; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, - utils::{natsrpy_future, py_types::TimeValue}, + utils::{ + natsrpy_future, + py_types::{TimeValue, ToPyDate}, + }, }; +#[derive(Debug, Clone)] +pub struct JSInfo { + pub domain: Option, + pub acc_hash: Option, + pub stream: String, + pub consumer: String, + pub stream_sequence: u64, + pub consumer_sequence: u64, + pub delivered: i64, + pub pending: u64, + pub published: time::OffsetDateTime, + pub token: Option, +} + +impl From> for JSInfo { + fn from(value: async_nats::jetstream::message::Info) -> Self { + Self { + domain: value.domain.map(ToString::to_string), + acc_hash: value.acc_hash.map(ToString::to_string), + stream: value.stream.to_string(), + consumer: value.consumer.to_string(), + stream_sequence: value.stream_sequence, + consumer_sequence: value.consumer_sequence, + delivered: value.delivered, + pending: value.pending, + published: value.published, + token: value.token.map(ToString::to_string), + } + } +} + #[pyo3::pyclass] pub struct JetStreamMessage { message: crate::message::Message, + info: JSInfo, acker: Arc>, } @@ -20,9 +55,11 @@ impl TryFrom for JetStreamMessage { type Error = NatsrpyError; fn try_from(value: async_nats::jetstream::Message) -> Result { + let js_info = JSInfo::from(value.info()?); let (message, acker) = value.split(); Ok(Self { message: message.try_into()?, + info: js_info, acker: Arc::new(RwLock::new(acker)), }) } @@ -69,6 +106,57 @@ impl JetStreamMessage { &self.message.headers } + #[getter] + pub const fn domain(&mut self) -> &Option { + &self.info.domain + } + + #[getter] + #[must_use] + pub const fn acc_hash(&self) -> &Option { + &self.info.acc_hash + } + #[getter] + #[must_use] + pub const fn stream(&self) -> &str { + self.info.stream.as_str() + } + #[getter] + #[must_use] + pub const fn consumer(&self) -> &str { + self.info.consumer.as_str() + } + #[getter] + #[must_use] + pub const fn stream_sequence(&self) -> u64 { + self.info.stream_sequence + } + #[getter] + #[must_use] + pub const fn consumer_sequence(&self) -> u64 { + self.info.consumer_sequence + } + #[getter] + #[must_use] + pub const fn delivered(&self) -> i64 { + self.info.delivered + } + #[getter] + #[must_use] + pub const fn pending(&self) -> u64 { + self.info.pending + } + #[getter] + pub fn published<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + Ok(self.info.published.to_py_date(py)?) + } + + #[getter] + #[must_use] + pub const fn token(&self) -> &Option { + &self.info.token + } + #[pyo3(signature=(double=false))] pub fn ack<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult> { self.inner_ack(py, async_nats::jetstream::message::AckKind::Ack, double) diff --git a/src/js/stream.rs b/src/js/stream.rs index 5196331..24ddbb1 100644 --- a/src/js/stream.rs +++ b/src/js/stream.rs @@ -1,13 +1,13 @@ use pyo3::{ Py, - types::{PyBytes, PyDateTime, PyDict, PyTzInfo}, + types::{PyBytes, PyDateTime, PyDict}, }; use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration}; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, js::managers::consumers::ConsumersManager, - utils::{headers::NatsrpyHeadermapExt, natsrpy_future}, + utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::ToPyDate}, }; use pyo3::{Bound, PyAny, Python}; use tokio::sync::RwLock; @@ -801,25 +801,12 @@ impl StreamMessage { py: Python, msg: &async_nats::jetstream::message::StreamMessage, ) -> NatsrpyResult { - let time = msg.time.to_utc(); - let tz_info = PyTzInfo::utc(py)?; - let time = PyDateTime::new( - py, - time.year(), - time.month().into(), - time.day(), - time.hour(), - time.minute(), - time.second(), - time.microsecond(), - Some(&*tz_info), - )?; Ok(Self { subject: msg.subject.to_string(), payload: PyBytes::new(py, &msg.payload).unbind(), headers: msg.headers.to_pydict(py)?.unbind(), sequence: msg.sequence, - time: time.unbind(), + time: msg.time.to_py_date(py)?.unbind(), }) } } diff --git a/src/utils/py_types.rs b/src/utils/py_types.rs index 4aeadad..524ee91 100644 --- a/src/utils/py_types.rs +++ b/src/utils/py_types.rs @@ -1,8 +1,8 @@ use std::time::Duration; use pyo3::{ - FromPyObject, - types::{PyBytes, PyBytesMethods}, + Bound, FromPyObject, PyResult, Python, + types::{PyBytes, PyBytesMethods, PyDateTime, PyTzInfo}, }; use crate::exceptions::rust_err::NatsrpyError; @@ -74,3 +74,25 @@ impl<'py> FromPyObject<'_, 'py> for TimeValue { } } } + +pub trait ToPyDate { + fn to_py_date<'py>(&self, py: Python<'py>) -> PyResult>; +} + +impl ToPyDate for time::OffsetDateTime { + fn to_py_date<'py>(&self, py: Python<'py>) -> PyResult> { + let time = self.to_utc(); + let tz_info = PyTzInfo::utc(py)?; + PyDateTime::new( + py, + time.year(), + time.month().into(), + time.day(), + time.hour(), + time.minute(), + time.second(), + time.microsecond(), + Some(&*tz_info), + ) + } +}