From 4a798413f746b8cee2b899ae5cb69a6e8fa0ee01 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Wed, 25 Mar 2026 22:33:16 +0100 Subject: [PATCH] Implemented KV. --- python/natsrpy/_natsrpy_rs/js/kv.pyi | 91 +++++- python/natsrpy/js.py | 31 +- src/exceptions/rust_err.rs | 28 +- src/js/kv.rs | 406 ++++++++++++++++++++++++++- src/utils/py_types.rs | 13 +- 5 files changed, 544 insertions(+), 25 deletions(-) diff --git a/python/natsrpy/_natsrpy_rs/js/kv.pyi b/python/natsrpy/_natsrpy_rs/js/kv.pyi index f303a61..b460265 100644 --- a/python/natsrpy/_natsrpy_rs/js/kv.pyi +++ b/python/natsrpy/_natsrpy_rs/js/kv.pyi @@ -1,13 +1,68 @@ +from datetime import datetime, timedelta from typing import final -from natsrpy._natsrpy_rs.js.stream import Placement, Republish, Source, StorageType from typing_extensions import Self +from .stream import ( + Placement, + Republish, + Source, + StorageType, + StreamInfo, +) + __all__ = [ "KVConfig", + "KVEntry", + "KVEntryIterator", + "KVOperation", + "KVStatus", "KeyValue", + "KeysIterator", ] +@final +class KVStatus: + info: StreamInfo + bucket: str + +@final +class KVOperation: + Put: KVOperation + Delete: KVOperation + Purge: KVOperation + +@final +class KVEntry: + @property + def bucket(self) -> str: ... + @property + def key(self) -> str: ... + @property + def value(self) -> bytes: ... + @property + def revision(self) -> int: ... + @property + def delta(self) -> int: ... + @property + def created(self) -> datetime: ... + @property + def operation(self) -> KVOperation: ... + @property + def seen_current(self) -> bool: ... + +@final +class KVEntryIterator: + def __aiter__(self) -> Self: ... + async def __anext__(self) -> KVEntry: ... + async def next(self, timeout: float | timedelta | None = None) -> KVEntry: ... + +@final +class KeysIterator: + def __aiter__(self) -> Self: ... + async def __anext__(self) -> str: ... + async def next(self, timeout: float | timedelta | None = None) -> str: ... + @final class KVConfig: """ @@ -63,6 +118,36 @@ class KeyValue: def use_jetstream_prefix(self) -> bool: ... @property def name(self) -> str: ... - async def put(self, key: str, value: bytes) -> int: ... async def get(self, key: str) -> bytes | None: ... - async def delete(self, key: str) -> int: ... + async def delete( + self, + key: str, + expect_revision: int | None = None, + ) -> int: ... + async def update(self, key: str, value: bytes | str, revision: int) -> None: ... + async def create( + self, + key: str, + value: bytes | str, + ttl: float | timedelta | None = None, + ) -> int: ... + async def put(self, key: str, value: bytes | str) -> int: ... + async def purge( + self, + key: str, + ttl: float | timedelta | None = None, + expect_revision: int | None = None, + ) -> None: ... + async def history(self, key: str) -> KVEntryIterator: ... + async def entry(self, key: str, revision: int | None = None) -> KVEntry | None: ... + async def watch( + self, + key: str, + from_revision: int | None = None, + ) -> KVEntryIterator: ... + async def watch_with_history(self, key: str) -> KVEntryIterator: ... + async def watch_all(self, from_revision: int | None = None) -> KVEntryIterator: ... + async def watch_many(self, keys: list[str]) -> KVEntryIterator: ... + async def watch_many_with_history(self, keys: list[str]) -> KVEntryIterator: ... + async def keys(self) -> KeysIterator: ... + async def status(self) -> KVStatus: ... diff --git a/python/natsrpy/js.py b/python/natsrpy/js.py index 5df71fc..9050186 100644 --- a/python/natsrpy/js.py +++ b/python/natsrpy/js.py @@ -1,7 +1,8 @@ -from ._natsrpy_rs.js import JetStream +from ._natsrpy_rs.js import JetStream, JetStreamMessage, Publication from ._natsrpy_rs.js.consumers import ( AckPolicy, DeliverPolicy, + MessagesIterator, PriorityPolicy, PullConsumer, PullConsumerConfig, @@ -9,7 +10,15 @@ PushConsumerConfig, ReplayPolicy, ) -from ._natsrpy_rs.js.kv import KeyValue, KVConfig +from ._natsrpy_rs.js.kv import ( + KeysIterator, + KeyValue, + KVConfig, + KVEntry, + KVEntryIterator, + KVOperation, + KVStatus, +) from ._natsrpy_rs.js.object_store import ( ObjectInfo, ObjectInfoIterator, @@ -18,40 +27,55 @@ ObjectStoreConfig, ) from ._natsrpy_rs.js.stream import ( + ClusterInfo, Compression, ConsumerLimits, DiscardPolicy, External, + PeerInfo, PersistenceMode, Placement, Republish, RetentionPolicy, Source, + SourceInfo, StorageType, Stream, StreamConfig, + StreamInfo, StreamMessage, + StreamState, SubjectTransform, ) __all__ = [ "AckPolicy", + "ClusterInfo", "Compression", "ConsumerLimits", "DeliverPolicy", "DiscardPolicy", "External", "JetStream", + "JetStreamMessage", "KVConfig", + "KVEntry", + "KVEntryIterator", + "KVOperation", + "KVStatus", "KeyValue", + "KeysIterator", + "MessagesIterator", "ObjectInfo", "ObjectInfoIterator", "ObjectLink", "ObjectStore", "ObjectStoreConfig", + "PeerInfo", "PersistenceMode", "Placement", "PriorityPolicy", + "Publication", "PullConsumer", "PullConsumerConfig", "PushConsumer", @@ -60,9 +84,12 @@ "Republish", "RetentionPolicy", "Source", + "SourceInfo", "StorageType", "Stream", "StreamConfig", + "StreamInfo", "StreamMessage", + "StreamState", "SubjectTransform", ] diff --git a/src/exceptions/rust_err.rs b/src/exceptions/rust_err.rs index 781b598..673f998 100644 --- a/src/exceptions/rust_err.rs +++ b/src/exceptions/rust_err.rs @@ -8,6 +8,8 @@ pub type NatsrpyResult = Result; pub enum NatsrpyError { #[error(transparent)] StdIOError(#[from] std::io::Error), + #[error(transparent)] + UnknownError(#[from] Box), #[error("NATS session error: {0}")] SessionError(String), #[error("Invalid arguemnt: {0}")] @@ -43,23 +45,33 @@ pub enum NatsrpyError { #[error(transparent)] UnsubscribeError(#[from] async_nats::UnsubscribeError), #[error(transparent)] - KeyValueError(#[from] async_nats::jetstream::context::KeyValueError), + JSKVError(#[from] async_nats::jetstream::context::KeyValueError), #[error(transparent)] - CreateKeyValueError(#[from] async_nats::jetstream::context::CreateKeyValueError), + JSKVCreateError(#[from] async_nats::jetstream::context::CreateKeyValueError), #[error(transparent)] - CreateStreamError(#[from] async_nats::jetstream::context::CreateStreamError), + JSStreamCreateError(#[from] async_nats::jetstream::context::CreateStreamError), #[error(transparent)] - GetStreamError(#[from] async_nats::jetstream::context::GetStreamError), + JSStreamGetError(#[from] async_nats::jetstream::context::GetStreamError), #[error(transparent)] - KVUpdateError(#[from] async_nats::jetstream::context::UpdateKeyValueError), + JSKVUpdateError(#[from] async_nats::jetstream::context::UpdateKeyValueError), #[error(transparent)] JSPublishError(#[from] async_nats::jetstream::context::PublishError), #[error(transparent)] + JSObjectStoreError(#[from] async_nats::jetstream::context::ObjectStoreError), + #[error(transparent)] KVEntryError(#[from] async_nats::jetstream::kv::EntryError), #[error(transparent)] KVPutError(#[from] async_nats::jetstream::kv::PutError), #[error(transparent)] - DeleteError(#[from] async_nats::jetstream::kv::DeleteError), + KVUpdateError(#[from] async_nats::jetstream::kv::UpdateError), + #[error(transparent)] + KVCreateError(#[from] async_nats::jetstream::kv::CreateError), + #[error(transparent)] + KVWatcherError(#[from] async_nats::jetstream::kv::WatcherError), + #[error(transparent)] + KVHistoryError(#[from] async_nats::jetstream::kv::HistoryError), + #[error(transparent)] + KVStatusError(#[from] async_nats::jetstream::kv::StatusError), #[error(transparent)] StreamDirectGetError(#[from] async_nats::jetstream::stream::DirectGetError), #[error(transparent)] @@ -67,8 +79,6 @@ pub enum NatsrpyError { #[error(transparent)] StreamPurgeError(#[from] async_nats::jetstream::stream::PurgeError), #[error(transparent)] - UnknownError(#[from] Box), - #[error(transparent)] PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError), #[error(transparent)] ConsumerError(#[from] async_nats::jetstream::stream::ConsumerError), @@ -81,8 +91,6 @@ pub enum NatsrpyError { #[error(transparent)] ConsumerUpdateError(#[from] async_nats::jetstream::stream::ConsumerUpdateError), #[error(transparent)] - ObjectStoreError(#[from] async_nats::jetstream::context::ObjectStoreError), - #[error(transparent)] ObjectStoreGetError(#[from] async_nats::jetstream::object_store::GetError), #[error(transparent)] ObjectStorePutError(#[from] async_nats::jetstream::object_store::PutError), diff --git a/src/js/kv.rs b/src/js/kv.rs index f5a2e80..a34ead4 100644 --- a/src/js/kv.rs +++ b/src/js/kv.rs @@ -1,11 +1,19 @@ use std::{sync::Arc, time::Duration}; -use crate::js; +use crate::{ + js::{self, stream::StreamInfo}, + utils::{ + futures::natsrpy_future_with_timeout, + py_types::{SendableValue, TimeValue, ToPyDate}, + streamer::Streamer, + }, +}; +use futures_util::StreamExt; use pyo3::{ - Bound, PyAny, Python, - types::{PyBytes, PyBytesMethods}, + Bound, Py, PyAny, PyRef, Python, + types::{PyBytes, PyDateTime}, }; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, @@ -128,6 +136,71 @@ impl TryFrom for async_nats::jetstream::kv::Config { } } +#[pyo3::pyclass(from_py_object)] +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum KVOperation { + Put, + Delete, + Purge, +} + +impl From for KVOperation { + fn from(value: async_nats::jetstream::kv::Operation) -> Self { + match value { + async_nats::jetstream::kv::Operation::Put => Self::Put, + async_nats::jetstream::kv::Operation::Purge => Self::Purge, + async_nats::jetstream::kv::Operation::Delete => Self::Delete, + } + } +} + +#[pyo3::pyclass(get_all)] +pub struct KVEntry { + pub bucket: String, + pub key: String, + pub value: Py, + pub revision: u64, + pub delta: u64, + pub created: Py, + pub operation: KVOperation, + pub seen_current: bool, +} + +impl TryFrom for KVEntry { + type Error = NatsrpyError; + + fn try_from(value: async_nats::jetstream::kv::Entry) -> Result { + Ok(Self { + bucket: value.bucket, + key: value.key, + value: Python::attach(|gil| PyBytes::new(gil, &value.value).unbind()), + revision: value.revision, + delta: value.delta, + created: Python::attach(|gil| value.created.to_py_date(gil).map(pyo3::Bound::unbind))?, + operation: value.operation.into(), + seen_current: value.seen_current, + }) + } +} + +#[pyo3::pyclass(from_py_object, get_all)] +#[derive(Debug, Clone)] +pub struct KVStatus { + info: StreamInfo, + bucket: String, +} + +impl TryFrom for KVStatus { + type Error = NatsrpyError; + + fn try_from(value: async_nats::jetstream::kv::bucket::Status) -> Result { + Ok(Self { + info: value.info.try_into()?, + bucket: value.bucket, + }) + } +} + #[pyo3::pyclass(from_py_object)] #[derive(Clone)] pub struct KeyValue { @@ -173,28 +246,343 @@ impl KeyValue { }) } + #[pyo3(signature=(key, value, ttl=None))] + pub fn create<'py>( + &self, + py: Python<'py>, + key: String, + value: SendableValue, + ttl: Option, + ) -> NatsrpyResult> { + let store = self.store.clone(); + let data = value.into(); + natsrpy_future(py, async move { + if let Some(ttl) = ttl { + Ok(store + .read() + .await + .create_with_ttl(key, data, ttl.into()) + .await?) + } else { + Ok(store.read().await.create(key, data).await?) + } + }) + } + + #[pyo3(signature=( + key, + ttl=None, + expect_revision=None, + ))] + pub fn purge<'py>( + &self, + py: Python<'py>, + key: String, + ttl: Option, + expect_revision: Option, + ) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + match (ttl, expect_revision) { + (None, _) => Ok(store + .read() + .await + .purge_expect_revision(key, expect_revision) + .await?), + (Some(ttl), None) => Ok(store.read().await.purge_with_ttl(key, ttl.into()).await?), + (Some(ttl), Some(revision)) => Ok(store + .read() + .await + .purge_expect_revision_with_ttl(key, revision, ttl.into()) + .await?), + } + }) + } + pub fn put<'py>( &self, py: Python<'py>, key: String, - value: &Bound<'py, PyBytes>, + value: SendableValue, ) -> NatsrpyResult> { let store = self.store.clone(); - let data = bytes::Bytes::copy_from_slice(value.as_bytes()); + let data = value.into(); natsrpy_future( py, async move { Ok(store.read().await.put(key, data).await?) }, ) } - pub fn delete<'py>(&self, py: Python<'py>, key: String) -> NatsrpyResult> { + #[pyo3(signature=( + key, + expect_revision=None, + ))] + pub fn delete<'py>( + &self, + py: Python<'py>, + key: String, + expect_revision: Option, + ) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + Ok(store + .read() + .await + .delete_expect_revision(key, expect_revision) + .await?) + }) + } + + pub fn update<'py>( + &self, + py: Python<'py>, + key: String, + value: SendableValue, + revision: u64, + ) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + Ok(store + .read() + .await + .update(key, value.into(), revision) + .await?) + }) + } + + pub fn history<'py>(&self, py: Python<'py>, key: String) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + Ok(KVEntryIterator::new(Streamer::new( + store.read().await.history(key).await?, + ))) + }) + } + + #[pyo3(signature=(from_revision=None))] + pub fn watch_all<'py>( + &self, + py: Python<'py>, + from_revision: Option, + ) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + let watch = if let Some(rev) = from_revision { + store.read().await.watch_all_from_revision(rev).await? + } else { + store.read().await.watch_all().await? + }; + Ok(KVEntryIterator::new(Streamer::new(watch))) + }) + } + + #[pyo3(signature=(key, from_revision=None))] + pub fn watch<'py>( + &self, + py: Python<'py>, + key: String, + from_revision: Option, + ) -> NatsrpyResult> { let store = self.store.clone(); - natsrpy_future(py, async move { Ok(store.read().await.delete(key).await?) }) + natsrpy_future(py, async move { + let watch = if let Some(rev) = from_revision { + store.read().await.watch_from_revision(key, rev).await? + } else { + store.read().await.watch(key).await? + }; + Ok(KVEntryIterator::new(Streamer::new(watch))) + }) + } + + pub fn watch_with_history<'py>( + &self, + py: Python<'py>, + key: String, + ) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + Ok(KVEntryIterator::new(Streamer::new( + store.read().await.watch_with_history(key).await?, + ))) + }) + } + + pub fn watch_many<'py>( + &self, + py: Python<'py>, + keys: Vec, + ) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + Ok(KVEntryIterator::new(Streamer::new( + store.read().await.watch_many(keys).await?, + ))) + }) + } + + pub fn watch_many_with_history<'py>( + &self, + py: Python<'py>, + keys: Vec, + ) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + Ok(KVEntryIterator::new(Streamer::new( + store.read().await.watch_many_with_history(keys).await?, + ))) + }) + } + + #[pyo3(signature=( + key, + revision=None, + ))] + pub fn entry<'py>( + &self, + py: Python<'py>, + key: String, + revision: Option, + ) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + let entry = if let Some(rev) = revision { + store + .read() + .await + .entry_for_revision(key, rev) + .await? + .map(KVEntry::try_from) + .transpose()? + } else { + store + .read() + .await + .entry(key) + .await? + .map(KVEntry::try_from) + .transpose()? + }; + Ok(entry) + }) + } + + pub fn status<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + KVStatus::try_from(store.read().await.status().await?) + }) + } + + pub fn keys<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let store = self.store.clone(); + natsrpy_future(py, async move { + Ok(KeysIterator::new(Streamer::new( + store.read().await.keys().await?, + ))) + }) + } +} + +#[pyo3::pyclass] +pub struct KeysIterator { + streamer: Arc>>>, +} + +impl KeysIterator { + #[must_use] + pub fn new( + streamer: Streamer>, + ) -> Self { + Self { + streamer: Arc::new(Mutex::new(streamer)), + } + } +} + +#[pyo3::pymethods] +impl KeysIterator { + #[must_use] + pub const fn __aiter__(slf: PyRef) -> PyRef { + slf + } + + #[pyo3(signature=(timeout=None))] + pub fn next<'py>( + &self, + py: Python<'py>, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.streamer.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let value = ctx.lock().await.next().await; + match value { + Some(entry) => Ok(entry?), + None => Err(NatsrpyError::AsyncStopIteration), + } + }) + } + + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + self.next(py, None) + } +} + +#[pyo3::pyclass] +pub struct KVEntryIterator { + streamer: Arc< + Mutex< + Streamer< + Result, + >, + >, + >, +} + +impl KVEntryIterator { + #[must_use] + pub fn new( + streamer: Streamer< + Result, + >, + ) -> Self { + Self { + streamer: Arc::new(Mutex::new(streamer)), + } + } +} + +#[pyo3::pymethods] +impl KVEntryIterator { + #[must_use] + pub const fn __aiter__(slf: PyRef) -> PyRef { + slf + } + + #[pyo3(signature=(timeout=None))] + pub fn next<'py>( + &self, + py: Python<'py>, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.streamer.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let value = ctx.lock().await.next().await; + match value { + Some(entry) => KVEntry::try_from(entry?), + None => Err(NatsrpyError::AsyncStopIteration), + } + }) + } + + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + self.next(py, None) } } #[pyo3::pymodule(submodule, name = "kv")] pub mod pymod { #[pymodule_export] - use super::{KVConfig, KeyValue}; + use super::{ + KVConfig, KVEntry, KVEntryIterator, KVOperation, KVStatus, KeyValue, KeysIterator, + }; } diff --git a/src/utils/py_types.rs b/src/utils/py_types.rs index 524ee91..99a4561 100644 --- a/src/utils/py_types.rs +++ b/src/utils/py_types.rs @@ -1,7 +1,7 @@ use std::time::Duration; use pyo3::{ - Bound, FromPyObject, PyResult, Python, + Bound, FromPyObject, PyResult, Python, type_hint_identifier, type_hint_union, types::{PyBytes, PyBytesMethods, PyDateTime, PyTzInfo}, }; @@ -15,6 +15,12 @@ pub enum SendableValue { impl<'py> FromPyObject<'_, 'py> for SendableValue { type Error = NatsrpyError; + const INPUT_TYPE: pyo3::inspect::PyStaticExpr = type_hint_union!( + type_hint_identifier!("builtins", "str"), + type_hint_identifier!("builtins", "bytes"), + type_hint_identifier!("builtins", "bytearray"), + type_hint_identifier!("builtins", "memoryview") + ); fn extract(obj: pyo3::Borrowed<'_, 'py, pyo3::PyAny>) -> Result { #[allow(clippy::option_if_let_else)] @@ -61,6 +67,11 @@ impl From for Duration { impl<'py> FromPyObject<'_, 'py> for TimeValue { type Error = NatsrpyError; + const INPUT_TYPE: pyo3::inspect::PyStaticExpr = type_hint_union!( + type_hint_identifier!("builtins", "float"), + type_hint_identifier!("datetime", "timedelta") + ); + fn extract(obj: pyo3::Borrowed<'_, 'py, pyo3::PyAny>) -> Result { #[allow(clippy::option_if_let_else)] if let Ok(fsec) = obj.extract::() {