diff --git a/python/natsrpy/_natsrpy_rs/__init__.pyi b/python/natsrpy/_natsrpy_rs/__init__.pyi index 8634ca6..7ecd342 100644 --- a/python/natsrpy/_natsrpy_rs/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/__init__.pyi @@ -100,8 +100,8 @@ class Nats: *, domain: str | None = None, api_prefix: str | None = None, - timeout: timedelta | None = None, - ack_timeout: timedelta | None = None, + timeout: float | timedelta | None = None, + ack_timeout: float | timedelta | None = None, concurrency_limit: int | None = None, max_ack_inflight: int | None = None, backpressure_on_inflight: bool | None = None, diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index fceec93..ff7408a 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -83,7 +83,7 @@ class PullConsumerConfig: delivery_start_sequence: int | None = None, delivery_start_time: int | None = None, ack_policy: AckPolicy | None = None, - ack_wait: timedelta | None = None, + ack_wait: float | timedelta | None = None, max_deliver: int | None = None, filter_subject: str | None = None, filter_subjects: list[str] | None = None, @@ -95,12 +95,12 @@ class PullConsumerConfig: headers_only: bool | None = None, max_batch: int | None = None, max_bytes: int | None = None, - max_expires: timedelta | None = None, - inactive_threshold: timedelta | None = None, + max_expires: float | timedelta | None = None, + inactive_threshold: float | timedelta | None = None, num_replicas: int | None = None, memory_storage: bool | None = None, metadata: dict[str, str] | None = None, - backoff: list[timedelta] | None = None, + backoff: list[float | timedelta] | None = None, priority_policy: PriorityPolicy | None = None, priority_groups: list[str] | None = None, pause_until: int | None = None, @@ -147,7 +147,7 @@ class PushConsumerConfig: delivery_start_sequence: int | None = None, delivery_start_time: int | None = None, ack_policy: AckPolicy | None = None, - ack_wait: timedelta | None = None, + ack_wait: float | timedelta | None = None, max_deliver: int | None = None, filter_subject: str | None = None, filter_subjects: list[str] | None = None, @@ -158,12 +158,12 @@ class PushConsumerConfig: max_ack_pending: int | None = None, headers_only: bool | None = None, flow_control: bool | None = None, - idle_heartbeat: timedelta | None = None, + idle_heartbeat: float | timedelta | None = None, num_replicas: int | None = None, memory_storage: bool | None = None, metadata: dict[str, str] | None = None, - backoff: list[timedelta] | None = None, - inactive_threshold: timedelta | None = None, + backoff: list[float | timedelta] | None = None, + inactive_threshold: float | timedelta | None = None, pause_until: int | None = None, ) -> Self: ... @@ -188,8 +188,8 @@ class PullConsumer: group: str | None = None, priority: int | None = None, max_bytes: int | None = None, - heartbeat: timedelta | None = None, - expires: timedelta | None = None, + heartbeat: float | timedelta | None = None, + expires: float | timedelta | None = None, min_pending: int | None = None, min_ack_pending: int | None = None, timeout: float | timedelta | None = None, diff --git a/python/natsrpy/_natsrpy_rs/js/kv.pyi b/python/natsrpy/_natsrpy_rs/js/kv.pyi index b460265..db65439 100644 --- a/python/natsrpy/_natsrpy_rs/js/kv.pyi +++ b/python/natsrpy/_natsrpy_rs/js/kv.pyi @@ -93,7 +93,7 @@ class KVConfig: description: str | None = None, max_value_size: int | None = None, history: int | None = None, - max_age: float | None = None, + max_age: float | timedelta | None = None, max_bytes: int | None = None, storage: StorageType | None = None, num_replicas: int | None = None, @@ -103,7 +103,7 @@ class KVConfig: mirror_direct: bool | None = None, compression: bool | None = None, placement: Placement | None = None, - limit_markers: float | None = None, + limit_markers: float | timedelta | None = None, ) -> Self: ... @final diff --git a/python/natsrpy/_natsrpy_rs/js/stream.pyi b/python/natsrpy/_natsrpy_rs/js/stream.pyi index b2cb633..afc0489 100644 --- a/python/natsrpy/_natsrpy_rs/js/stream.pyi +++ b/python/natsrpy/_natsrpy_rs/js/stream.pyi @@ -167,12 +167,12 @@ class StreamConfig: discard_new_per_subject: bool | None = None, retention: RetentionPolicy | None = None, max_consumers: int | None = None, - max_age: timedelta | None = None, + max_age: float | timedelta | None = None, max_message_size: int | None = None, storage: StorageType | None = None, num_replicas: int | None = None, no_ack: bool | None = None, - duplicate_window: timedelta | None = None, + duplicate_window: float | timedelta | None = None, template_owner: str | None = None, sealed: bool | None = None, description: str | None = None, @@ -193,7 +193,7 @@ class StreamConfig: persist_mode: PersistenceMode | None = None, pause_until: int | None = None, allow_message_ttl: bool | None = None, - subject_delete_marker_ttl: timedelta | None = None, + subject_delete_marker_ttl: float | timedelta | None = None, allow_atomic_publish: bool | None = None, allow_message_schedules: bool | None = None, allow_message_counter: bool | None = None, diff --git a/src/js/consumers/pull/config.rs b/src/js/consumers/pull/config.rs index fc8584a..f3c4f1d 100644 --- a/src/js/consumers/pull/config.rs +++ b/src/js/consumers/pull/config.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, time::Duration}; use crate::{ exceptions::rust_err::NatsrpyError, js::consumers::common::{AckPolicy, DeliverPolicy, PriorityPolicy, ReplayPolicy}, + utils::py_types::TimeValue, }; #[pyo3::pyclass(from_py_object, get_all, set_all)] @@ -80,7 +81,7 @@ impl PullConsumerConfig { delivery_start_sequence: Option, delivery_start_time: Option, ack_policy: Option, - ack_wait: Option, + ack_wait: Option, max_deliver: Option, filter_subject: Option, filter_subjects: Option>, @@ -92,12 +93,12 @@ impl PullConsumerConfig { headers_only: Option, max_batch: Option, max_bytes: Option, - max_expires: Option, - inactive_threshold: Option, + max_expires: Option, + inactive_threshold: Option, num_replicas: Option, memory_storage: Option, metadata: Option>, - backoff: Option>, + backoff: Option>, priority_policy: Option, priority_groups: Option>, pause_until: Option, @@ -114,7 +115,7 @@ impl PullConsumerConfig { conf.deliver_policy = deliver_policy.unwrap_or_default(); conf.ack_policy = ack_policy.unwrap_or_default(); - conf.ack_wait = ack_wait.unwrap_or_default(); + conf.ack_wait = ack_wait.unwrap_or_default().into(); conf.max_deliver = max_deliver.unwrap_or_default(); conf.filter_subject = filter_subject.unwrap_or_default(); conf.filter_subjects = filter_subjects.unwrap_or_default(); @@ -126,12 +127,16 @@ impl PullConsumerConfig { conf.headers_only = headers_only.unwrap_or_default(); conf.max_batch = max_batch.unwrap_or_default(); conf.max_bytes = max_bytes.unwrap_or_default(); - conf.max_expires = max_expires.unwrap_or_default(); - conf.inactive_threshold = inactive_threshold.unwrap_or_default(); + conf.max_expires = max_expires.unwrap_or_default().into(); + conf.inactive_threshold = inactive_threshold.unwrap_or_default().into(); conf.num_replicas = num_replicas.unwrap_or_default(); conf.memory_storage = memory_storage.unwrap_or_default(); conf.metadata = metadata.unwrap_or_default(); - conf.backoff = backoff.unwrap_or_default(); + conf.backoff = backoff + .unwrap_or_default() + .into_iter() + .map(Into::into) + .collect(); conf.priority_policy = priority_policy.unwrap_or_default(); conf.priority_groups = priority_groups.unwrap_or_default(); diff --git a/src/js/consumers/pull/consumer.rs b/src/js/consumers/pull/consumer.rs index 30264b6..8570e35 100644 --- a/src/js/consumers/pull/consumer.rs +++ b/src/js/consumers/pull/consumer.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use futures_util::StreamExt; use pyo3::{Bound, PyAny, Python}; @@ -47,8 +47,8 @@ impl PullConsumer { group: Option, priority: Option, max_bytes: Option, - heartbeat: Option, - expires: Option, + heartbeat: Option, + expires: Option, min_pending: Option, min_ack_pending: Option, timeout: Option, @@ -74,10 +74,10 @@ impl PullConsumer { fetch_builder = fetch_builder.max_bytes(max_bytes); } if let Some(heartbeat) = heartbeat { - fetch_builder = fetch_builder.heartbeat(heartbeat); + fetch_builder = fetch_builder.heartbeat(heartbeat.into()); } if let Some(expires) = expires { - fetch_builder = fetch_builder.expires(expires); + fetch_builder = fetch_builder.expires(expires.into()); } if let Some(min_pending) = min_pending { fetch_builder = fetch_builder.min_pending(min_pending); diff --git a/src/js/consumers/push/config.rs b/src/js/consumers/push/config.rs index 7b4df9f..1b516b9 100644 --- a/src/js/consumers/push/config.rs +++ b/src/js/consumers/push/config.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, time::Duration}; use crate::{ exceptions::rust_err::NatsrpyError, js::consumers::common::{AckPolicy, DeliverPolicy, ReplayPolicy}, + utils::py_types::TimeValue, }; #[pyo3::pyclass(from_py_object, get_all, set_all)] @@ -81,7 +82,7 @@ impl PushConsumerConfig { delivery_start_sequence: Option, delivery_start_time: Option, ack_policy: Option, - ack_wait: Option, + ack_wait: Option, max_deliver: Option, filter_subject: Option, filter_subjects: Option>, @@ -92,12 +93,12 @@ impl PushConsumerConfig { max_ack_pending: Option, headers_only: Option, flow_control: Option, - idle_heartbeat: Option, + idle_heartbeat: Option, num_replicas: Option, memory_storage: Option, metadata: Option>, - backoff: Option>, - inactive_threshold: Option, + backoff: Option>, + inactive_threshold: Option, pause_until: Option, ) -> Self { Self { @@ -112,7 +113,7 @@ impl PushConsumerConfig { deliver_policy: deliver_policy.unwrap_or_default(), ack_policy: ack_policy.unwrap_or_default(), - ack_wait: ack_wait.unwrap_or_default(), + ack_wait: ack_wait.unwrap_or_default().into(), max_deliver: max_deliver.unwrap_or_default(), filter_subject: filter_subject.unwrap_or_default(), filter_subjects: filter_subjects.unwrap_or_default(), @@ -123,12 +124,16 @@ impl PushConsumerConfig { max_ack_pending: max_ack_pending.unwrap_or_default(), headers_only: headers_only.unwrap_or_default(), flow_control: flow_control.unwrap_or_default(), - idle_heartbeat: idle_heartbeat.unwrap_or_default(), + idle_heartbeat: idle_heartbeat.unwrap_or_default().into(), num_replicas: num_replicas.unwrap_or_default(), memory_storage: memory_storage.unwrap_or_default(), metadata: metadata.unwrap_or_default(), - backoff: backoff.unwrap_or_default(), - inactive_threshold: inactive_threshold.unwrap_or_default(), + backoff: backoff + .unwrap_or_default() + .into_iter() + .map(Into::into) + .collect(), + inactive_threshold: inactive_threshold.unwrap_or_default().into(), } } } diff --git a/src/js/kv.rs b/src/js/kv.rs index a34ead4..59bdd92 100644 --- a/src/js/kv.rs +++ b/src/js/kv.rs @@ -61,12 +61,12 @@ impl KVConfig { limit_markers=None, ))] #[must_use] - pub const fn __new__( + pub fn __new__( bucket: String, description: Option, max_value_size: Option, history: Option, - max_age: Option, + max_age: Option, max_bytes: Option, storage: Option, num_replicas: Option, @@ -76,14 +76,14 @@ impl KVConfig { mirror_direct: Option, compression: Option, placement: Option, - limit_markers: Option, + limit_markers: Option, ) -> Self { Self { bucket, description, max_value_size, history, - max_age, + max_age: max_age.map(Into::into), max_bytes, storage, num_replicas, @@ -93,7 +93,7 @@ impl KVConfig { mirror_direct, compression, placement, - limit_markers, + limit_markers: limit_markers.map(Into::into), } } } diff --git a/src/js/stream.rs b/src/js/stream.rs index f78f891..0ce2845 100644 --- a/src/js/stream.rs +++ b/src/js/stream.rs @@ -160,9 +160,9 @@ pub struct ConsumerLimits { impl ConsumerLimits { #[new] #[must_use] - pub const fn __new__(inactive_threshold: Duration, max_ack_pending: i64) -> Self { + pub fn __new__(inactive_threshold: TimeValue, max_ack_pending: i64) -> Self { Self { - inactive_threshold, + inactive_threshold: inactive_threshold.into(), max_ack_pending, } } @@ -595,12 +595,12 @@ impl StreamConfig { discard_new_per_subject: Option, retention: Option, max_consumers: Option, - max_age: Option, + max_age: Option, max_message_size: Option, storage: Option, num_replicas: Option, no_ack: Option, - duplicate_window: Option, + duplicate_window: Option, template_owner: Option, sealed: Option, description: Option, @@ -621,7 +621,7 @@ impl StreamConfig { persist_mode: Option, pause_until: Option, allow_message_ttl: Option, - subject_delete_marker_ttl: Option, + subject_delete_marker_ttl: Option, allow_atomic_publish: Option, allow_message_schedules: Option, allow_message_counter: Option, @@ -640,8 +640,8 @@ impl StreamConfig { placement, persist_mode, pause_until, - subject_delete_marker_ttl, + subject_delete_marker_ttl: subject_delete_marker_ttl.map(Into::into), max_bytes: max_bytes.unwrap_or_default(), max_messages: max_messages.unwrap_or_default(), max_messages_per_subject: max_messages_per_subject.unwrap_or_default(), @@ -649,12 +649,12 @@ impl StreamConfig { discard_new_per_subject: discard_new_per_subject.unwrap_or_default(), retention: retention.unwrap_or_default(), max_consumers: max_consumers.unwrap_or_default(), - max_age: max_age.unwrap_or_default(), + max_age: max_age.unwrap_or_default().into(), max_message_size: max_message_size.unwrap_or_default(), storage: storage.unwrap_or_default(), num_replicas: num_replicas.unwrap_or_default(), no_ack: no_ack.unwrap_or_default(), - duplicate_window: duplicate_window.unwrap_or_default(), + duplicate_window: duplicate_window.unwrap_or_default().into(), template_owner: template_owner.unwrap_or_default(), sealed: sealed.unwrap_or_default(), allow_rollup: allow_rollup.unwrap_or_default(), diff --git a/src/nats_cls.rs b/src/nats_cls.rs index f6c4bfb..ee48c51 100644 --- a/src/nats_cls.rs +++ b/src/nats_cls.rs @@ -3,7 +3,7 @@ use pyo3::{ Bound, IntoPyObjectExt, Py, PyAny, Python, types::{PyBytes, PyBytesMethods, PyDict}, }; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tokio::sync::RwLock; use crate::{ @@ -161,7 +161,7 @@ impl NatsCls { payload: Option>, headers: Option>, inbox: Option, - timeout: Option, + timeout: Option, ) -> NatsrpyResult> { let session = self.nats_session.clone(); let data = payload.map(|inner| bytes::Bytes::from(inner.as_bytes().to_vec())); @@ -174,7 +174,7 @@ impl NatsCls { payload: data, headers: headermap, inbox, - timeout: timeout.map(Some), + timeout: timeout.map(Into::into).map(Some), }; session.send_request(subject, request).await?; Ok(()) @@ -236,8 +236,8 @@ impl NatsCls { py: Python<'py>, domain: Option, api_prefix: Option, - timeout: Option, - ack_timeout: Option, + timeout: Option, + ack_timeout: Option, concurrency_limit: Option, max_ack_inflight: Option, backpressure_on_inflight: Option, @@ -248,10 +248,10 @@ impl NatsCls { let mut builder = async_nats::jetstream::ContextBuilder::new().concurrency_limit(concurrency_limit); if let Some(timeout) = ack_timeout { - builder = builder.ack_timeout(timeout); + builder = builder.ack_timeout(timeout.into()); } if let Some(timeout) = timeout { - builder = builder.timeout(timeout); + builder = builder.timeout(timeout.into()); } if let Some(max_ack_inflight) = max_ack_inflight { builder = builder.max_ack_inflight(max_ack_inflight); diff --git a/src/utils/py_types.rs b/src/utils/py_types.rs index 99a4561..a712ac6 100644 --- a/src/utils/py_types.rs +++ b/src/utils/py_types.rs @@ -64,6 +64,12 @@ impl From for Duration { } } +impl Default for TimeValue { + fn default() -> Self { + Self::Duration(Duration::default()) + } +} + impl<'py> FromPyObject<'_, 'py> for TimeValue { type Error = NatsrpyError;