From be77d1806b81f3d8204a664244c8d6e26a48b2da Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 26 Mar 2026 17:58:49 +0100 Subject: [PATCH] Replaced async functions with actual returned Future types. --- python/natsrpy/_natsrpy_rs/__init__.pyi | 41 +++++------ python/natsrpy/_natsrpy_rs/js/__init__.pyi | 25 +++---- python/natsrpy/_natsrpy_rs/js/consumers.pyi | 13 ++-- python/natsrpy/_natsrpy_rs/js/counters.pyi | 17 ++--- python/natsrpy/_natsrpy_rs/js/kv.pyi | 47 ++++++------- python/natsrpy/_natsrpy_rs/js/managers.pyi | 69 ++++++++++--------- .../natsrpy/_natsrpy_rs/js/object_store.pyi | 31 +++++---- python/natsrpy/_natsrpy_rs/js/stream.pyi | 27 ++++---- 8 files changed, 139 insertions(+), 131 deletions(-) diff --git a/python/natsrpy/_natsrpy_rs/__init__.pyi b/python/natsrpy/_natsrpy_rs/__init__.pyi index f2a3934..e181442 100644 --- a/python/natsrpy/_natsrpy_rs/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/__init__.pyi @@ -1,3 +1,4 @@ +from asyncio import Future from collections.abc import Awaitable, Callable from datetime import timedelta from typing import Any, final, overload @@ -42,8 +43,8 @@ class IteratorSubscription: """ def __aiter__(self) -> IteratorSubscription: ... - async def __anext__(self) -> Message: ... - async def next(self, timeout: float | timedelta | None = None) -> Message: + def __anext__(self) -> Future[Message]: ... + def next(self, timeout: float | timedelta | None = None) -> Future[Message]: """Receive the next message from the subscription. :param timeout: maximum time to wait for a message in seconds @@ -53,14 +54,14 @@ class IteratorSubscription: unsubscribed. """ - async def unsubscribe(self, limit: int | None = None) -> None: + def unsubscribe(self, limit: int | None = None) -> Future[None]: """Unsubscribe from the subject. :param limit: if set, automatically unsubscribe after receiving this many additional messages, defaults to None. """ - async def drain(self) -> None: + def drain(self) -> Future[None]: """Drain the subscription. Unsubscribes and flushes any remaining messages before closing. @@ -74,14 +75,14 @@ class CallbackSubscription: Messages are automatically delivered to the callback in a background task. """ - async def unsubscribe(self, limit: int | None = None) -> None: + def unsubscribe(self, limit: int | None = None) -> Future[None]: """Unsubscribe from the subject. :param limit: if set, automatically unsubscribe after receiving this many additional messages, defaults to None. """ - async def drain(self) -> None: + def drain(self) -> Future[None]: """Drain the subscription. Unsubscribes and flushes any remaining messages before closing. @@ -132,7 +133,7 @@ class Nats: in seconds or as a timedelta, defaults to 10 seconds. """ - async def startup(self) -> None: + def startup(self) -> Future[None]: """Connect to the NATS server. Establishes the connection using the parameters provided at @@ -140,14 +141,14 @@ class Nats: or JetStream operations. """ - async def shutdown(self) -> None: + def shutdown(self) -> Future[None]: """Close the NATS connection. Drains all subscriptions and flushes pending data before disconnecting. """ - async def publish( + def publish( self, subject: str, payload: bytes | str | bytearray | memoryview, @@ -155,7 +156,7 @@ class Nats: headers: dict[str, Any] | None = None, reply: str | None = None, err_on_disconnect: bool = False, - ) -> None: + ) -> Future[None]: """Publish a message to a subject. :param subject: subject to publish the message to. @@ -167,7 +168,7 @@ class Nats: is disconnected, defaults to False. """ - async def request( + def request( self, subject: str, payload: bytes | str | bytearray | memoryview, @@ -175,7 +176,7 @@ class Nats: headers: dict[str, Any] | None = None, inbox: str | None = None, timeout: float | timedelta | None = None, - ) -> Message: + ) -> Future[Message]: """Send a request and discard the response. :param subject: subject to send the request to. @@ -188,31 +189,31 @@ class Nats: :return: response message. """ - async def drain(self) -> None: + def drain(self) -> Future[None]: """Drain the connection. Gracefully closes all subscriptions and flushes pending messages. """ - async def flush(self) -> None: + def flush(self) -> Future[None]: """Flush the connection. Waits until all pending messages have been sent to the server. """ @overload - async def subscribe( + def subscribe( self, subject: str, callback: Callable[[Message], Awaitable[None]], - ) -> CallbackSubscription: ... + ) -> Future[CallbackSubscription]: ... @overload - async def subscribe( + def subscribe( self, subject: str, callback: None = None, - ) -> IteratorSubscription: ... - async def jetstream( + ) -> Future[IteratorSubscription]: ... + def jetstream( self, *, domain: str | None = None, @@ -222,7 +223,7 @@ class Nats: concurrency_limit: int | None = None, max_ack_inflight: int | None = None, backpressure_on_inflight: bool | None = None, - ) -> js.JetStream: + ) -> Future[js.JetStream]: """Create a JetStream context. :param domain: JetStream domain to use. diff --git a/python/natsrpy/_natsrpy_rs/js/__init__.pyi b/python/natsrpy/_natsrpy_rs/js/__init__.pyi index d280a09..b921153 100644 --- a/python/natsrpy/_natsrpy_rs/js/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/js/__init__.pyi @@ -1,3 +1,4 @@ +from asyncio import Future from datetime import datetime, timedelta from typing import Any, Literal, final, overload @@ -48,7 +49,7 @@ class JetStream: """ @overload - async def publish( + def publish( self, subject: str, payload: str | bytes | bytearray | memoryview, @@ -56,9 +57,9 @@ class JetStream: headers: dict[str, str] | None = None, err_on_disconnect: bool = False, wait: Literal[True], - ) -> Publication: ... + ) -> Future[Publication]: ... @overload - async def publish( + def publish( self, subject: str, payload: str | bytes | bytearray | memoryview, @@ -66,9 +67,9 @@ class JetStream: headers: dict[str, str] | None = None, err_on_disconnect: bool = False, wait: Literal[False] = False, - ) -> None: ... + ) -> Future[None]: ... @overload - async def publish( + def publish( self, subject: str, payload: str | bytes | bytearray | memoryview, @@ -76,7 +77,7 @@ class JetStream: headers: dict[str, str] | None = None, err_on_disconnect: bool = False, wait: bool = False, - ) -> Publication | None: ... + ) -> Future[Publication | None]: ... @property def kv(self) -> KVManager: """Manager for key-value store buckets.""" @@ -158,17 +159,17 @@ class JetStreamMessage: def token(self) -> str | None: """Authentication token, if applicable.""" - async def ack(self, double: bool = False) -> None: + def ack(self, double: bool = False) -> Future[None]: """Acknowledge that a message was handled. :param double: whether to wait for server response, defaults to False. """ - async def nack( + def nack( self, delay: float | timedelta | None = None, double: bool = False, - ) -> None: + ) -> Future[None]: """Negative acknowledgement. Signals that the message will not be processed now @@ -179,7 +180,7 @@ class JetStreamMessage: :param double: whether to wait for server response, defaults to False. """ - async def progress(self, double: bool = False) -> None: + def progress(self, double: bool = False) -> Future[None]: """Progress acknowledgement. Signals that the message is being handled right now. @@ -189,7 +190,7 @@ class JetStreamMessage: :param double: whether to wait for server response, defaults to False. """ - async def next(self, double: bool = False) -> None: + def next(self, double: bool = False) -> Future[None]: """Next acknowledgement. Only applies to pull consumers. @@ -199,7 +200,7 @@ class JetStreamMessage: :param double: whether to wait for server response, defaults to False. """ - async def term(self, double: bool = False) -> None: + def term(self, double: bool = False) -> Future[None]: """Term acknowledgement. Instructs server to stop redelivering message. diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index 0a32338..34ecf51 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -1,3 +1,4 @@ +from asyncio import Future from datetime import timedelta from typing import final @@ -280,11 +281,11 @@ class MessagesIterator: """Async iterator over JetStream consumer messages.""" def __aiter__(self) -> Self: ... - async def __anext__(self) -> JetStreamMessage: ... - async def next( + def __anext__(self) -> Future[JetStreamMessage]: ... + def next( self, timeout: float | timedelta | None = None, - ) -> JetStreamMessage: + ) -> Future[JetStreamMessage]: """Receive the next message from the consumer. :param timeout: maximum time to wait in seconds or as a timedelta, @@ -299,7 +300,7 @@ class PushConsumer: Messages are delivered by the server to a specified subject. """ - async def messages(self) -> MessagesIterator: + def messages(self) -> Future[MessagesIterator]: """Get an async iterator for consuming messages. :return: an async iterator over JetStream messages. @@ -312,7 +313,7 @@ class PullConsumer: Messages are fetched on demand in batches by the client. """ - async def fetch( + def fetch( self, max_messages: int | None = None, group: str | None = None, @@ -323,7 +324,7 @@ class PullConsumer: min_pending: int | None = None, min_ack_pending: int | None = None, timeout: float | timedelta | None = None, - ) -> list[JetStreamMessage]: + ) -> Future[list[JetStreamMessage]]: """Fetch a batch of messages from the consumer. :param max_messages: maximum number of messages to fetch. diff --git a/python/natsrpy/_natsrpy_rs/js/counters.pyi b/python/natsrpy/_natsrpy_rs/js/counters.pyi index 44f960d..4f89a46 100644 --- a/python/natsrpy/_natsrpy_rs/js/counters.pyi +++ b/python/natsrpy/_natsrpy_rs/js/counters.pyi @@ -1,3 +1,4 @@ +from asyncio import Future from datetime import timedelta from typing import final @@ -183,12 +184,12 @@ class Counters: ``allow_message_counter`` enabled. """ - async def add( + def add( self, key: str, value: int, timeout: float | timedelta | None = None, - ) -> int: + ) -> Future[int]: """Add an arbitrary value to a counter. :param key: subject key identifying the counter. @@ -198,11 +199,11 @@ class Counters: :return: the new counter value after the addition. """ - async def incr( + def incr( self, key: str, timeout: float | timedelta | None = None, - ) -> int: + ) -> Future[int]: """Increment a counter by one. Shorthand for ``add(key, 1)``. @@ -213,11 +214,11 @@ class Counters: :return: the new counter value after the increment. """ - async def decr( + def decr( self, key: str, timeout: float | timedelta | None = None, - ) -> int: + ) -> Future[int]: """Decrement a counter by one. Shorthand for ``add(key, -1)``. @@ -228,11 +229,11 @@ class Counters: :return: the new counter value after the decrement. """ - async def get( + def get( self, key: str, timeout: float | timedelta | None = None, - ) -> CounterEntry: + ) -> Future[CounterEntry]: """Retrieve the current value of a counter. :param key: subject key identifying the counter. diff --git a/python/natsrpy/_natsrpy_rs/js/kv.pyi b/python/natsrpy/_natsrpy_rs/js/kv.pyi index f0213fc..874ea96 100644 --- a/python/natsrpy/_natsrpy_rs/js/kv.pyi +++ b/python/natsrpy/_natsrpy_rs/js/kv.pyi @@ -1,3 +1,4 @@ +from asyncio import Future from datetime import datetime, timedelta from typing import final @@ -88,8 +89,8 @@ class KVEntryIterator: """Async iterator over key-value entries.""" def __aiter__(self) -> Self: ... - async def __anext__(self) -> KVEntry: ... - async def next(self, timeout: float | timedelta | None = None) -> KVEntry: + def __anext__(self) -> Future[KVEntry]: ... + def next(self, timeout: float | timedelta | None = None) -> Future[KVEntry]: """Receive the next key-value entry. :param timeout: maximum time to wait in seconds or as a timedelta, @@ -102,8 +103,8 @@ class KeysIterator: """Async iterator over key-value bucket keys.""" def __aiter__(self) -> Self: ... - async def __anext__(self) -> str: ... - async def next(self, timeout: float | timedelta | None = None) -> str: + def __anext__(self) -> Future[str]: ... + def next(self, timeout: float | timedelta | None = None) -> Future[str]: """Receive the next key. :param timeout: maximum time to wait in seconds or as a timedelta, @@ -197,18 +198,18 @@ class KeyValue: def name(self) -> str: """Name of the key-value bucket.""" - async def get(self, key: str) -> bytes | None: + def get(self, key: str) -> Future[bytes | None]: """Get the current value for a key. :param key: key to look up. :return: value bytes, or None if the key does not exist. """ - async def delete( + def delete( self, key: str, expect_revision: int | None = None, - ) -> int: + ) -> Future[int]: """Delete a key from the bucket. :param key: key to delete. @@ -217,7 +218,7 @@ class KeyValue: :return: the new revision number. """ - async def update(self, key: str, value: bytes | str, revision: int) -> None: + def update(self, key: str, value: bytes | str, revision: int) -> Future[None]: """Update a key only if it matches the expected revision. :param key: key to update. @@ -225,12 +226,12 @@ class KeyValue: :param revision: expected current revision. """ - async def create( + def create( self, key: str, value: bytes | str, ttl: float | timedelta | None = None, - ) -> int: + ) -> Future[int]: """Create a new key. Fails if the key already exists. @@ -241,7 +242,7 @@ class KeyValue: :return: the initial revision number. """ - async def put(self, key: str, value: bytes | str) -> int: + def put(self, key: str, value: bytes | str) -> Future[int]: """Put a value for a key, creating or updating as needed. :param key: key to set. @@ -249,12 +250,12 @@ class KeyValue: :return: the new revision number. """ - async def purge( + def purge( self, key: str, ttl: float | timedelta | None = None, expect_revision: int | None = None, - ) -> None: + ) -> Future[None]: """Purge all revisions of a key. :param key: key to purge. @@ -264,14 +265,14 @@ class KeyValue: concurrency control, defaults to None. """ - async def history(self, key: str) -> KVEntryIterator: + def history(self, key: str) -> Future[KVEntryIterator]: """Get the revision history for a key. :param key: key to query. :return: an async iterator over historical entries. """ - async def entry(self, key: str, revision: int | None = None) -> KVEntry | None: + def entry(self, key: str, revision: int | None = None) -> Future[KVEntry | None]: """Get a specific entry for a key. :param key: key to look up. @@ -280,11 +281,11 @@ class KeyValue: :return: the entry, or None if not found. """ - async def watch( + def watch( self, key: str, from_revision: int | None = None, - ) -> KVEntryIterator: + ) -> Future[KVEntryIterator]: """Watch a key for changes. :param key: key to watch. @@ -293,14 +294,14 @@ class KeyValue: :return: an async iterator over entry changes. """ - async def watch_with_history(self, key: str) -> KVEntryIterator: + def watch_with_history(self, key: str) -> Future[KVEntryIterator]: """Watch a key for changes, delivering all historical entries first. :param key: key to watch. :return: an async iterator starting with history then live changes. """ - async def watch_all(self, from_revision: int | None = None) -> KVEntryIterator: + def watch_all(self, from_revision: int | None = None) -> Future[KVEntryIterator]: """Watch all keys in the bucket for changes. :param from_revision: start watching from this revision, @@ -308,27 +309,27 @@ class KeyValue: :return: an async iterator over entry changes. """ - async def watch_many(self, keys: list[str]) -> KVEntryIterator: + def watch_many(self, keys: list[str]) -> Future[KVEntryIterator]: """Watch multiple keys for changes. :param keys: list of keys to watch. :return: an async iterator over entry changes. """ - async def watch_many_with_history(self, keys: list[str]) -> KVEntryIterator: + def watch_many_with_history(self, keys: list[str]) -> Future[KVEntryIterator]: """Watch multiple keys, delivering all historical entries first. :param keys: list of keys to watch. :return: an async iterator starting with history then live changes. """ - async def keys(self) -> KeysIterator: + def keys(self) -> Future[KeysIterator]: """List all keys in the bucket. :return: an async iterator over key names. """ - async def status(self) -> KVStatus: + def status(self) -> Future[KVStatus]: """Get the status of the key-value bucket. :return: bucket status information. diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index 2da9429..dd0ba40 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -1,3 +1,4 @@ +from asyncio import Future from datetime import timedelta from typing import final, overload @@ -37,11 +38,11 @@ class ConsumersIterator: """ def __aiter__(self) -> Self: ... - async def __anext__(self) -> PullConsumer | PushConsumer: ... - async def next( + def __anext__(self) -> Future[PullConsumer | PushConsumer]: ... + def next( self, timeout: float | timedelta | None = None, - ) -> PullConsumer | PushConsumer: + ) -> Future[PullConsumer | PushConsumer]: """Receive the next consumer from the stream. :param timeout: maximum time to wait for a message in seconds @@ -61,8 +62,8 @@ class ConsumersNamesIterator: """ def __aiter__(self) -> Self: ... - async def __anext__(self) -> str: ... - async def next(self, timeout: float | timedelta | None = None) -> str: + def __anext__(self) -> Future[str]: ... + def next(self, timeout: float | timedelta | None = None) -> Future[str]: """Receive the next consumer name from the stream. :param timeout: maximum time to wait for a message in seconds @@ -76,35 +77,35 @@ class ConsumersNamesIterator: class StreamsManager: """Manager for JetStream stream CRUD operations.""" - async def create(self, config: StreamConfig) -> Stream: + def create(self, config: StreamConfig) -> Future[Stream]: """Create a new stream. :param config: stream configuration. :return: the created stream. """ - async def create_or_update(self, config: StreamConfig) -> Stream: + def create_or_update(self, config: StreamConfig) -> Future[Stream]: """Create a stream or update it if it already exists. :param config: stream configuration. :return: the created or updated stream. """ - async def get(self, name: str) -> Stream: + def get(self, name: str) -> Future[Stream]: """Get an existing stream by name. :param name: stream name. :return: the stream. """ - async def delete(self, name: str) -> bool: + def delete(self, name: str) -> Future[bool]: """Delete a stream. :param name: stream name. :return: True if the stream was deleted. """ - async def update(self, config: StreamConfig) -> Stream: + def update(self, config: StreamConfig) -> Future[Stream]: """Update an existing stream configuration. :param config: new stream configuration. @@ -115,35 +116,35 @@ class StreamsManager: class CountersManager: """Manager for JetStream stream with counters support CRUD operations.""" - async def create(self, config: CountersConfig) -> Counters: + def create(self, config: CountersConfig) -> Future[Counters]: """Create a new counters stream. :param config: stream configuration. :return: the created stream. """ - async def create_or_update(self, config: CountersConfig) -> Counters: + def create_or_update(self, config: CountersConfig) -> Future[Counters]: """Create a counters stream or update it if it already exists. :param config: stream configuration. :return: the created or updated stream. """ - async def get(self, name: str) -> Counters: + def get(self, name: str) -> Future[Counters]: """Get an existing counters stream by name. :param name: stream name. :return: the stream. """ - async def delete(self, name: str) -> bool: + def delete(self, name: str) -> Future[bool]: """Delete a counters stream. :param name: stream name. :return: True if the stream was deleted. """ - async def update(self, config: CountersConfig) -> Counters: + def update(self, config: CountersConfig) -> Future[Counters]: """Update an existing counters stream configuration. :param config: new stream configuration. @@ -154,35 +155,35 @@ class CountersManager: class KVManager: """Manager for key-value bucket CRUD operations.""" - async def create(self, config: KVConfig) -> KeyValue: + def create(self, config: KVConfig) -> Future[KeyValue]: """Create a new key-value bucket. :param config: bucket configuration. :return: the created key-value bucket. """ - async def create_or_update(self, config: KVConfig) -> KeyValue: + def create_or_update(self, config: KVConfig) -> Future[KeyValue]: """Create a bucket or update it if it already exists. :param config: bucket configuration. :return: the created or updated key-value bucket. """ - async def get(self, bucket: str) -> KeyValue: + def get(self, bucket: str) -> Future[KeyValue]: """Get an existing key-value bucket by name. :param bucket: bucket name. :return: the key-value bucket. """ - async def delete(self, bucket: str) -> bool: + def delete(self, bucket: str) -> Future[bool]: """Delete a key-value bucket. :param bucket: bucket name. :return: True if the bucket was deleted. """ - async def update(self, config: KVConfig) -> KeyValue: + def update(self, config: KVConfig) -> Future[KeyValue]: """Update an existing key-value bucket configuration. :param config: new bucket configuration. @@ -194,35 +195,35 @@ class ConsumersManager: """Manager for JetStream consumer CRUD operations.""" @overload - async def create(self, config: PullConsumerConfig) -> PullConsumer: ... + def create(self, config: PullConsumerConfig) -> Future[PullConsumer]: ... @overload - async def create(self, config: PushConsumerConfig) -> PushConsumer: ... + def create(self, config: PushConsumerConfig) -> Future[PushConsumer]: ... @overload - async def update(self, config: PullConsumerConfig) -> PullConsumer: ... + def update(self, config: PullConsumerConfig) -> Future[PullConsumer]: ... @overload - async def update(self, config: PushConsumerConfig) -> PushConsumer: ... - async def get_pull(self, name: str) -> PullConsumer: + def update(self, config: PushConsumerConfig) -> Future[PushConsumer]: ... + def get_pull(self, name: str) -> Future[PullConsumer]: """Get an existing pull consumer by name. :param name: consumer name. :return: the pull consumer. """ - async def get_push(self, name: str) -> PushConsumer: + def get_push(self, name: str) -> Future[PushConsumer]: """Get an existing push consumer by name. :param name: consumer name. :return: the push consumer. """ - async def delete(self, name: str) -> bool: + def delete(self, name: str) -> Future[bool]: """Delete a consumer. :param name: consumer name. :return: True if the consumer was deleted. """ - async def pause(self, name: str, delay: float | timedelta) -> bool: + def pause(self, name: str, delay: float | timedelta) -> Future[bool]: """Pause a consumer for a specified duration. :param name: consumer name. @@ -230,14 +231,14 @@ class ConsumersManager: :return: True if the consumer was paused. """ - async def resume(self, name: str) -> bool: + def resume(self, name: str) -> Future[bool]: """Resume a paused consumer. :param name: consumer name. :return: True if the consumer was resumed. """ - async def list(self) -> ConsumersIterator: + def list(self) -> Future[ConsumersIterator]: """List consumers subscribed to the stream. This method iterates over all consumers on a @@ -249,7 +250,7 @@ class ConsumersManager: :return: an async iterator over consumers. """ - async def list_names(self) -> ConsumersNamesIterator: + def list_names(self) -> Future[ConsumersNamesIterator]: """List names of consumers subscribed to the stream. This method iterates over all consumer names on a @@ -262,21 +263,21 @@ class ConsumersManager: class ObjectStoreManager: """Manager for object store bucket operations.""" - async def create(self, config: ObjectStoreConfig) -> ObjectStore: + def create(self, config: ObjectStoreConfig) -> Future[ObjectStore]: """Create a new object store bucket. :param config: object store configuration. :return: the created object store. """ - async def get(self, bucket: str) -> ObjectStore: + def get(self, bucket: str) -> Future[ObjectStore]: """Get an existing object store bucket by name. :param bucket: bucket name. :return: the object store. """ - async def delete(self, bucket: str) -> None: + def delete(self, bucket: str) -> Future[None]: """Delete an object store bucket. :param bucket: bucket name. diff --git a/python/natsrpy/_natsrpy_rs/js/object_store.pyi b/python/natsrpy/_natsrpy_rs/js/object_store.pyi index 449e8c9..6498ce3 100644 --- a/python/natsrpy/_natsrpy_rs/js/object_store.pyi +++ b/python/natsrpy/_natsrpy_rs/js/object_store.pyi @@ -1,3 +1,4 @@ +from asyncio import Future from datetime import datetime, timedelta from typing import Any, final @@ -100,8 +101,8 @@ class ObjectInfoIterator: """Async iterator over object info entries.""" def __aiter__(self) -> Self: ... - async def __anext__(self) -> ObjectInfo: ... - async def next(self, timeout: float | timedelta | None = None) -> ObjectInfo: + def __anext__(self) -> Future[ObjectInfo]: ... + def next(self, timeout: float | timedelta | None = None) -> Future[ObjectInfo]: """Receive the next object info entry. :param timeout: maximum time to wait in seconds or as a timedelta, @@ -117,12 +118,12 @@ class ObjectStore: objects that are chunked across NATS messages. """ - async def get( + def get( self, name: str, writer: Writer[bytes], chunk_size: int | None = ..., # 24MB - ) -> None: + ) -> Future[None]: """Download an object from the store. Writes the object content to the given writer in chunks. @@ -133,7 +134,7 @@ class ObjectStore: defaults to 24 MB. """ - async def put( + def put( self, name: str, value: bytes | str, @@ -141,7 +142,7 @@ class ObjectStore: description: str | None = None, headers: dict[str, str | list[str]] | None = None, metadata: dict[str, str] | None = None, - ) -> None: + ) -> Future[None]: """Upload an object to the store. :param name: name for the stored object. @@ -153,23 +154,23 @@ class ObjectStore: :param metadata: optional custom key-value metadata. """ - async def delete(self, name: str) -> None: + def delete(self, name: str) -> Future[None]: """Delete an object from the store. :param name: name of the object to delete. """ - async def seal(self) -> None: + def seal(self) -> Future[None]: """Seal the object store, making it read-only.""" - async def get_info(self, name: str) -> ObjectInfo: + def get_info(self, name: str) -> Future[ObjectInfo]: """Get metadata for an object. :param name: name of the object. :return: object metadata. """ - async def watch(self, with_history: bool = False) -> ObjectInfoIterator: + def watch(self, with_history: bool = False) -> Future[ObjectInfoIterator]: """Watch the object store for changes. :param with_history: when True, deliver all existing entries @@ -177,13 +178,13 @@ class ObjectStore: :return: an async iterator over object info changes. """ - async def list(self) -> ObjectInfoIterator: + def list(self) -> Future[ObjectInfoIterator]: """List all objects in the store. :return: an async iterator over object info entries. """ - async def link_bucket(self, src_bucket: str, dest: str) -> ObjectInfo: + def link_bucket(self, src_bucket: str, dest: str) -> Future[ObjectInfo]: """Create a link to another object store bucket. :param src_bucket: name of the source bucket to link. @@ -191,7 +192,7 @@ class ObjectStore: :return: object info for the created link. """ - async def link_object(self, src: str, dest: str) -> ObjectInfo: + def link_object(self, src: str, dest: str) -> Future[ObjectInfo]: """Create a link to another object in the store. :param src: name of the source object to link. @@ -199,14 +200,14 @@ class ObjectStore: :return: object info for the created link. """ - async def update_metadata( + def update_metadata( self, name: str, new_name: str | None = None, description: str | None = None, headers: dict[str, Any] | None = None, metadata: dict[str, str] | None = None, - ) -> ObjectInfo: + ) -> Future[ObjectInfo]: """Update the metadata of an existing object. :param name: current name of the object. diff --git a/python/natsrpy/_natsrpy_rs/js/stream.pyi b/python/natsrpy/_natsrpy_rs/js/stream.pyi index 7721e40..74bec7c 100644 --- a/python/natsrpy/_natsrpy_rs/js/stream.pyi +++ b/python/natsrpy/_natsrpy_rs/js/stream.pyi @@ -1,3 +1,4 @@ +from asyncio import Future from datetime import datetime, timedelta from typing import Any, final @@ -471,11 +472,11 @@ class Stream: def consumers(self) -> ConsumersManager: """Manager for consumers bound to this stream.""" - async def direct_get( + def direct_get( self, sequence: int, timeout: float | datetime | None = None, - ) -> StreamMessage: + ) -> Future[StreamMessage]: """Get a message directly from the stream by sequence number. :param sequence: sequence number of the message to get. @@ -483,12 +484,12 @@ class Stream: :return: the stream message. """ - async def direct_get_next_for_subject( + def direct_get_next_for_subject( self, subject: str, sequence: int | None = None, timeout: float | timedelta | None = None, - ) -> StreamMessage: + ) -> Future[StreamMessage]: """Get the next message for a subject directly from the stream. :param subject: subject to get the next message for. @@ -498,11 +499,11 @@ class Stream: :return: the next stream message matching the subject filter. """ - async def direct_get_first_for_subject( + def direct_get_first_for_subject( self, subject: str, timeout: float | timedelta | None = None, - ) -> StreamMessage: + ) -> Future[StreamMessage]: """Get the first message for a subject directly from the stream. :param subject: subject to get the first message for. @@ -510,11 +511,11 @@ class Stream: :return: the first stream message matching the subject filter. """ - async def direct_get_last_for_subject( + def direct_get_last_for_subject( self, subject: str, timeout: float | timedelta | None = None, - ) -> StreamMessage: + ) -> Future[StreamMessage]: """Get the last message for a subject directly from the stream. :param subject: subject to get the last message for. @@ -522,20 +523,20 @@ class Stream: :return: the last stream message matching the subject filter. """ - async def get_info(self, timeout: float | datetime | None = None) -> StreamInfo: + def get_info(self, timeout: float | datetime | None = None) -> Future[StreamInfo]: """Get information about the stream. :param timeout: operation timeout. :return: stream info. """ - async def purge( + def purge( self, filter: str | None = None, sequence: int | None = None, keep: int | None = None, timeout: float | datetime | None = None, - ) -> int: + ) -> Future[int]: """Purge messages from the stream. :param filter: subject filter for messages to purge, @@ -548,11 +549,11 @@ class Stream: :return: number of messages purged. """ - async def delete_message( + def delete_message( self, sequence: int, timeout: float | datetime | None = None, - ) -> None: + ) -> Future[None]: """Delete a message from the stream by sequence number. :param sequence: sequence number of the message to delete.