diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 97e302b..789c684 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -70,3 +70,12 @@ repos: pass_filenames: false args: - check + + - id: stubtest + name: python stubtest + language: python + additional_dependencies: + - mypy + entry: python ./scripts/stubtest.py + pass_filenames: false + always_run: true diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index 61694eb..fceec93 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -169,7 +169,7 @@ class PushConsumerConfig: @final class MessagesIterator: - def __aiter__(self) -> MessagesIterator: ... + def __aiter__(self) -> Self: ... async def __anext__(self) -> JetStreamMessage: ... async def next( self, diff --git a/python/natsrpy/_natsrpy_rs/js/object_store.pyi b/python/natsrpy/_natsrpy_rs/js/object_store.pyi index 3d2eb1b..cc3b3bd 100644 --- a/python/natsrpy/_natsrpy_rs/js/object_store.pyi +++ b/python/natsrpy/_natsrpy_rs/js/object_store.pyi @@ -1,11 +1,14 @@ -from datetime import timedelta -from typing import final +from datetime import datetime, timedelta +from typing import Any, final from typing_extensions import Self, Writer from .stream import Placement, StorageType __all__ = [ + "ObjectInfo", + "ObjectInfoIterator", + "ObjectLink", "ObjectStore", "ObjectStoreConfig", ] @@ -33,6 +36,33 @@ class ObjectStoreConfig: placement: Placement | None = None, ) -> Self: ... +@final +class ObjectLink: + name: str | None + bucket: str + +@final +class ObjectInfo: + name: str + description: str | None + metadata: dict[str, str] + headers: dict[str, Any] + bucket: str + nuid: str + size: int + chunks: int + modified: datetime | None + digest: str | None + deleted: bool + link: ObjectLink | None + max_chunk_size: int | None + +@final +class ObjectInfoIterator: + def __aiter__(self) -> Self: ... + async def __anext__(self) -> ObjectInfo: ... + async def next(self, timeout: float | timedelta | None = None) -> ObjectInfo: ... + @final class ObjectStore: async def get( @@ -51,3 +81,17 @@ class ObjectStore: metadata: dict[str, str] | None = None, ) -> None: ... async def delete(self, name: str) -> None: ... + async def seal(self) -> None: ... + async def get_info(self, name: str) -> ObjectInfo: ... + async def watch(self, with_history: bool = False) -> ObjectInfoIterator: ... + async def list(self) -> ObjectInfoIterator: ... + async def link_bucket(self, src_bucket: str, dest: str) -> ObjectInfo: ... + async def link_object(self, src: str, dest: str) -> ObjectInfo: ... + async def update_metadata( + self, + name: str, + new_name: str | None = None, + description: str | None = None, + headers: dict[str, Any] | None = None, + metadata: dict[str, str] | None = None, + ) -> ObjectInfo: ... diff --git a/scripts/bump_version.py b/scripts/bump_version.py index 5b4f373..c7b6e3a 100644 --- a/scripts/bump_version.py +++ b/scripts/bump_version.py @@ -1,3 +1,4 @@ +# ruff: noqa import re import argparse from pathlib import Path diff --git a/scripts/stubtest.py b/scripts/stubtest.py new file mode 100644 index 0000000..748ca91 --- /dev/null +++ b/scripts/stubtest.py @@ -0,0 +1,15 @@ +# ruff: noqa +import os +import subprocess +from pathlib import Path + +ROOT_DIR = Path(__file__).parent.parent + + +def main(): + subprocess.run(["maturin", "dev", "--uv"], cwd=ROOT_DIR, check=True) + os.execvpe("stubtest", ["--ignore-disjoint-bases", "natsrpy"], env=os.environ) + + +if __name__ == "__main__": + main() diff --git a/src/exceptions/rust_err.rs b/src/exceptions/rust_err.rs index 2b347f8..781b598 100644 --- a/src/exceptions/rust_err.rs +++ b/src/exceptions/rust_err.rs @@ -88,6 +88,20 @@ pub enum NatsrpyError { ObjectStorePutError(#[from] async_nats::jetstream::object_store::PutError), #[error(transparent)] ObjectStoreDeleteError(#[from] async_nats::jetstream::object_store::DeleteError), + #[error(transparent)] + ObjectStoreSealError(#[from] async_nats::jetstream::object_store::SealError), + #[error(transparent)] + ObjectStoreInfoError(#[from] async_nats::jetstream::object_store::InfoError), + #[error(transparent)] + ObjectStoreWatchError(#[from] async_nats::jetstream::object_store::WatchError), + #[error(transparent)] + ObjectStoreWatcherError(#[from] async_nats::jetstream::object_store::WatcherError), + #[error(transparent)] + ObjectStoreAddLinkError(#[from] async_nats::jetstream::object_store::AddLinkError), + #[error(transparent)] + ObjectStoreUpdateMetadataError( + #[from] async_nats::jetstream::object_store::UpdateMetadataError, + ), } impl From for pyo3::PyErr { diff --git a/src/js/object_store.rs b/src/js/object_store.rs index 4058f7f..7ef3986 100644 --- a/src/js/object_store.rs +++ b/src/js/object_store.rs @@ -1,16 +1,25 @@ +use futures_util::StreamExt; use std::{collections::HashMap, sync::Arc, time::Duration}; use async_nats::HeaderMap; -use pyo3::{Bound, Py, PyAny, Python, types::PyDict}; -use tokio::{io::AsyncReadExt, sync::RwLock}; +use pyo3::{ + Bound, Py, PyAny, PyRef, Python, + types::{IntoPyDict, PyDateTime, PyDict}, +}; +use tokio::{ + io::AsyncReadExt, + sync::{Mutex, RwLock}, +}; use crate::{ - exceptions::rust_err::NatsrpyResult, + exceptions::rust_err::{NatsrpyError, NatsrpyResult}, js::stream::{Placement, StorageType}, utils::{ + futures::natsrpy_future_with_timeout, headers::NatsrpyHeadermapExt, natsrpy_future, - py_types::{SendableValue, TimeValue}, + py_types::{SendableValue, TimeValue, ToPyDate}, + streamer::Streamer, }, }; @@ -27,6 +36,95 @@ pub struct ObjectStoreConfig { pub placement: Option, } +#[pyo3::pyclass(from_py_object, get_all)] +#[derive(Debug, Clone)] +pub struct ObjectLink { + pub name: Option, + pub bucket: String, +} + +#[pyo3::pyclass(get_all)] +#[derive(Debug)] +pub struct ObjectInfo { + pub name: String, + pub description: Option, + pub metadata: Py, + pub headers: Py, + pub bucket: String, + pub nuid: String, + pub size: usize, + pub chunks: usize, + pub modified: Option>, + pub digest: Option, + pub deleted: bool, + + pub link: Option, + pub max_chunk_size: Option, +} + +impl From for ObjectLink { + fn from(value: async_nats::jetstream::object_store::ObjectLink) -> Self { + Self { + name: value.name, + bucket: value.bucket, + } + } +} + +impl TryFrom for ObjectInfo { + type Error = NatsrpyError; + + fn try_from( + value: async_nats::jetstream::object_store::ObjectInfo, + ) -> Result { + Ok(Self { + name: value.name, + description: value.description, + metadata: Python::attach(|gil| { + value.metadata.into_py_dict(gil).map(pyo3::Bound::unbind) + })?, + headers: value + .headers + // To PyResul> and then to PyResul> + .map(|val| Python::attach(|gil| val.to_pydict(gil).map(pyo3::Bound::unbind))) + .transpose()? + .unwrap_or_else(|| Python::attach(|gil| PyDict::new(gil).unbind())), + bucket: value.bucket, + nuid: value.nuid, + size: value.size, + chunks: value.chunks, + modified: value + .modified + // To PyResul> and then to PyResul> + .map(|dt| Python::attach(|gil| dt.to_py_date(gil).map(pyo3::Bound::unbind))) + .transpose()?, + digest: value.digest, + deleted: value.deleted, + link: value + .options + .as_ref() + .and_then(|opts| opts.link.as_ref().map(|link| link.clone().into())), + max_chunk_size: value.options.and_then(|opts| opts.max_chunk_size), + }) + } +} + +#[pyo3::pymethods] +impl ObjectInfo { + pub fn __str__(&self) -> String { + format!( + "ObjectInfo", + self.name, + self.bucket, + self.size, + self.modified + .as_ref() + .map_or_else(|| String::from("None"), ToString::to_string), + self.description, + ) + } +} + impl From for async_nats::jetstream::object_store::Config { fn from(value: ObjectStoreConfig) -> Self { Self { @@ -188,10 +286,171 @@ impl ObjectStore { Ok(()) }) } + + #[pyo3(signature=( + name, + new_name=None, + description=None, + headers=None, + metadata=None, + ))] + pub fn update_metadata<'py>( + &self, + py: Python<'py>, + name: String, + new_name: Option, + description: Option, + headers: Option>, + metadata: Option>, + ) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + let headers = headers.map(|val| HeaderMap::from_pydict(val)).transpose()?; + let meta = async_nats::jetstream::object_store::UpdateMetadata { + name: new_name.unwrap_or_else(|| name.clone()), + description, + metadata: metadata.unwrap_or_default(), + headers, + }; + natsrpy_future(py, async move { + ObjectInfo::try_from(ctx_guard.read().await.update_metadata(name, meta).await?) + }) + } + + pub fn seal<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + natsrpy_future(py, async move { + ctx_guard.write().await.seal().await?; + Ok(()) + }) + } + + pub fn get_info<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + natsrpy_future(py, async move { + let info = ctx_guard.write().await.info(name).await?; + ObjectInfo::try_from(info) + }) + } + + #[pyo3(signature=(with_history=false))] + pub fn watch<'py>( + &self, + py: Python<'py>, + with_history: bool, + ) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + natsrpy_future(py, async move { + let watcher = if with_history { + ctx_guard.read().await.watch_with_history().await? + } else { + ctx_guard.read().await.watch().await? + }; + Ok(ObjectInfoIterator::new(Streamer::new(watcher))) + }) + } + + pub fn list<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + natsrpy_future(py, async move { + Ok(ObjectInfoIterator::new(Streamer::new( + ctx_guard.read().await.list().await?, + ))) + }) + } + + pub fn link_bucket<'py>( + &self, + py: Python<'py>, + src_bucket: String, + dest: String, + ) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + natsrpy_future(py, async move { + ObjectInfo::try_from( + ctx_guard + .read() + .await + .add_bucket_link(dest, src_bucket) + .await?, + ) + }) + } + + pub fn link_object<'py>( + &self, + py: Python<'py>, + src: String, + dest: String, + ) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + natsrpy_future(py, async move { + let target = ctx_guard.read().await.get(src).await?; + ObjectInfo::try_from(ctx_guard.read().await.add_link(dest, &target.info).await?) + }) + } +} + +#[pyo3::pyclass(from_py_object)] +#[derive(Clone, Debug)] +pub struct ObjectInfoIterator { + streamer: Arc< + Mutex< + Streamer< + Result< + async_nats::jetstream::object_store::ObjectInfo, + async_nats::jetstream::object_store::WatcherError, + >, + >, + >, + >, +} + +impl ObjectInfoIterator { + #[must_use] + pub fn new( + streamer: Streamer< + Result< + async_nats::jetstream::object_store::ObjectInfo, + async_nats::jetstream::object_store::WatcherError, + >, + >, + ) -> Self { + Self { + streamer: Arc::new(Mutex::new(streamer)), + } + } +} + +#[pyo3::pymethods] +impl ObjectInfoIterator { + #[must_use] + pub const fn __aiter__(slf: PyRef) -> PyRef { + slf + } + + #[pyo3(signature=(timeout=None))] + pub fn next<'py>( + &self, + py: Python<'py>, + timeout: Option, + ) -> NatsrpyResult> { + let ctx = self.streamer.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let value = ctx.lock().await.next().await; + match value { + Some(info) => ObjectInfo::try_from(info?), + None => Err(NatsrpyError::AsyncStopIteration), + } + }) + } + + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + self.next(py, None) + } } #[pyo3::pymodule(submodule, name = "object_store")] pub mod pymod { #[pymodule_export] - pub use super::{ObjectStore, ObjectStoreConfig}; + pub use super::{ObjectInfo, ObjectInfoIterator, ObjectLink, ObjectStore, ObjectStoreConfig}; } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d574516..185bd89 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -2,5 +2,5 @@ pub mod futures; pub mod headers; pub mod py; pub mod py_types; - +pub mod streamer; pub use futures::natsrpy_future; diff --git a/src/utils/streamer.rs b/src/utils/streamer.rs new file mode 100644 index 0000000..9bd7cd8 --- /dev/null +++ b/src/utils/streamer.rs @@ -0,0 +1,64 @@ +use std::{sync::Arc, task::Poll}; + +use futures_util::StreamExt; +use tokio::sync::Mutex; + +#[derive(Clone, Debug)] +pub struct Streamer { + messages: Arc>>, + abort_handle: tokio::task::AbortHandle, +} + +async fn task_pooler( + mut stream: impl futures_util::Stream + std::marker::Unpin + Send + 'static, + channel: tokio::sync::mpsc::Sender, +) { + while let Some(msg) = stream.next().await { + if let Err(err) = channel.send(msg).await { + log::error!("Cannot send updates to streaming channel. Cause: {err}."); + } + } +} + +impl Streamer { + pub fn new( + stream: impl futures_util::Stream + std::marker::Unpin + Send + 'static, + ) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(1); + let task = tokio::task::spawn(task_pooler(stream, tx)); + Self { + messages: Arc::new(Mutex::new(rx)), + abort_handle: task.abort_handle(), + } + } +} + +impl Streamer { + pub async fn close(&mut self) { + self.abort_handle.abort(); + self.messages.lock().await.close(); + } +} + +impl futures_util::Stream for Streamer { + type Item = T; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mutex = std::pin::pin!(self.messages.lock()); + let Poll::Ready(mut lock) = mutex.poll(cx) else { + return Poll::Pending; + }; + lock.poll_recv(cx) + } +} + +impl Drop for Streamer { + fn drop(&mut self) { + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + self.close().await; + }); + } +}