Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions python/natsrpy/_natsrpy_rs/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from asyncio import Future
from collections.abc import Awaitable, Callable
from datetime import timedelta
from typing import Any, final, overload
Expand Down Expand Up @@ -42,8 +43,8 @@ class IteratorSubscription:
"""

def __aiter__(self) -> IteratorSubscription: ...
async def __anext__(self) -> Message: ...
async def next(self, timeout: float | timedelta | None = None) -> Message:
def __anext__(self) -> Future[Message]: ...
def next(self, timeout: float | timedelta | None = None) -> Future[Message]:
"""Receive the next message from the subscription.

:param timeout: maximum time to wait for a message in seconds
Expand All @@ -53,14 +54,14 @@ class IteratorSubscription:
unsubscribed.
"""

async def unsubscribe(self, limit: int | None = None) -> None:
def unsubscribe(self, limit: int | None = None) -> Future[None]:
"""Unsubscribe from the subject.

:param limit: if set, automatically unsubscribe after receiving
this many additional messages, defaults to None.
"""

async def drain(self) -> None:
def drain(self) -> Future[None]:
"""Drain the subscription.

Unsubscribes and flushes any remaining messages before closing.
Expand All @@ -74,14 +75,14 @@ class CallbackSubscription:
Messages are automatically delivered to the callback in a background task.
"""

async def unsubscribe(self, limit: int | None = None) -> None:
def unsubscribe(self, limit: int | None = None) -> Future[None]:
"""Unsubscribe from the subject.

:param limit: if set, automatically unsubscribe after receiving
this many additional messages, defaults to None.
"""

async def drain(self) -> None:
def drain(self) -> Future[None]:
"""Drain the subscription.

Unsubscribes and flushes any remaining messages before closing.
Expand Down Expand Up @@ -132,30 +133,30 @@ class Nats:
in seconds or as a timedelta, defaults to 10 seconds.
"""

async def startup(self) -> None:
def startup(self) -> Future[None]:
"""Connect to the NATS server.

Establishes the connection using the parameters provided at
construction time. Must be called before any publish, subscribe,
or JetStream operations.
"""

async def shutdown(self) -> None:
def shutdown(self) -> Future[None]:
"""Close the NATS connection.

Drains all subscriptions and flushes pending data before
disconnecting.
"""

async def publish(
def publish(
self,
subject: str,
payload: bytes | str | bytearray | memoryview,
*,
headers: dict[str, Any] | None = None,
reply: str | None = None,
err_on_disconnect: bool = False,
) -> None:
) -> Future[None]:
"""Publish a message to a subject.

:param subject: subject to publish the message to.
Expand All @@ -167,15 +168,15 @@ class Nats:
is disconnected, defaults to False.
"""

async def request(
def request(
self,
subject: str,
payload: bytes | str | bytearray | memoryview,
*,
headers: dict[str, Any] | None = None,
inbox: str | None = None,
timeout: float | timedelta | None = None,
) -> Message:
) -> Future[Message]:
"""Send a request and discard the response.

:param subject: subject to send the request to.
Expand All @@ -188,31 +189,31 @@ class Nats:
:return: response message.
"""

async def drain(self) -> None:
def drain(self) -> Future[None]:
"""Drain the connection.

Gracefully closes all subscriptions and flushes pending messages.
"""

async def flush(self) -> None:
def flush(self) -> Future[None]:
"""Flush the connection.

Waits until all pending messages have been sent to the server.
"""

@overload
async def subscribe(
def subscribe(
self,
subject: str,
callback: Callable[[Message], Awaitable[None]],
) -> CallbackSubscription: ...
) -> Future[CallbackSubscription]: ...
@overload
async def subscribe(
def subscribe(
self,
subject: str,
callback: None = None,
) -> IteratorSubscription: ...
async def jetstream(
) -> Future[IteratorSubscription]: ...
def jetstream(
self,
*,
domain: str | None = None,
Expand All @@ -222,7 +223,7 @@ class Nats:
concurrency_limit: int | None = None,
max_ack_inflight: int | None = None,
backpressure_on_inflight: bool | None = None,
) -> js.JetStream:
) -> Future[js.JetStream]:
"""Create a JetStream context.

:param domain: JetStream domain to use.
Expand Down
25 changes: 13 additions & 12 deletions python/natsrpy/_natsrpy_rs/js/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from asyncio import Future
from datetime import datetime, timedelta
from typing import Any, Literal, final, overload

Expand Down Expand Up @@ -48,35 +49,35 @@ class JetStream:
"""

@overload
async def publish(
def publish(
self,
subject: str,
payload: str | bytes | bytearray | memoryview,
*,
headers: dict[str, str] | None = None,
err_on_disconnect: bool = False,
wait: Literal[True],
) -> Publication: ...
) -> Future[Publication]: ...
@overload
async def publish(
def publish(
self,
subject: str,
payload: str | bytes | bytearray | memoryview,
*,
headers: dict[str, str] | None = None,
err_on_disconnect: bool = False,
wait: Literal[False] = False,
) -> None: ...
) -> Future[None]: ...
@overload
async def publish(
def publish(
self,
subject: str,
payload: str | bytes | bytearray | memoryview,
*,
headers: dict[str, str] | None = None,
err_on_disconnect: bool = False,
wait: bool = False,
) -> Publication | None: ...
) -> Future[Publication | None]: ...
@property
def kv(self) -> KVManager:
"""Manager for key-value store buckets."""
Expand Down Expand Up @@ -158,17 +159,17 @@ class JetStreamMessage:
def token(self) -> str | None:
"""Authentication token, if applicable."""

async def ack(self, double: bool = False) -> None:
def ack(self, double: bool = False) -> Future[None]:
"""Acknowledge that a message was handled.

:param double: whether to wait for server response, defaults to False.
"""

async def nack(
def nack(
self,
delay: float | timedelta | None = None,
double: bool = False,
) -> None:
) -> Future[None]:
"""Negative acknowledgement.

Signals that the message will not be processed now
Expand All @@ -179,7 +180,7 @@ class JetStreamMessage:
:param double: whether to wait for server response, defaults to False.
"""

async def progress(self, double: bool = False) -> None:
def progress(self, double: bool = False) -> Future[None]:
"""Progress acknowledgement.

Signals that the message is being handled right now.
Expand All @@ -189,7 +190,7 @@ class JetStreamMessage:
:param double: whether to wait for server response, defaults to False.
"""

async def next(self, double: bool = False) -> None:
def next(self, double: bool = False) -> Future[None]:
"""Next acknowledgement.

Only applies to pull consumers.
Expand All @@ -199,7 +200,7 @@ class JetStreamMessage:
:param double: whether to wait for server response, defaults to False.
"""

async def term(self, double: bool = False) -> None:
def term(self, double: bool = False) -> Future[None]:
"""Term acknowledgement.

Instructs server to stop redelivering message.
Expand Down
13 changes: 7 additions & 6 deletions python/natsrpy/_natsrpy_rs/js/consumers.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from asyncio import Future
from datetime import timedelta
from typing import final

Expand Down Expand Up @@ -280,11 +281,11 @@ class MessagesIterator:
"""Async iterator over JetStream consumer messages."""

def __aiter__(self) -> Self: ...
async def __anext__(self) -> JetStreamMessage: ...
async def next(
def __anext__(self) -> Future[JetStreamMessage]: ...
def next(
self,
timeout: float | timedelta | None = None,
) -> JetStreamMessage:
) -> Future[JetStreamMessage]:
"""Receive the next message from the consumer.

:param timeout: maximum time to wait in seconds or as a timedelta,
Expand All @@ -299,7 +300,7 @@ class PushConsumer:
Messages are delivered by the server to a specified subject.
"""

async def messages(self) -> MessagesIterator:
def messages(self) -> Future[MessagesIterator]:
"""Get an async iterator for consuming messages.

:return: an async iterator over JetStream messages.
Expand All @@ -312,7 +313,7 @@ class PullConsumer:
Messages are fetched on demand in batches by the client.
"""

async def fetch(
def fetch(
self,
max_messages: int | None = None,
group: str | None = None,
Expand All @@ -323,7 +324,7 @@ class PullConsumer:
min_pending: int | None = None,
min_ack_pending: int | None = None,
timeout: float | timedelta | None = None,
) -> list[JetStreamMessage]:
) -> Future[list[JetStreamMessage]]:
"""Fetch a batch of messages from the consumer.

:param max_messages: maximum number of messages to fetch.
Expand Down
17 changes: 9 additions & 8 deletions python/natsrpy/_natsrpy_rs/js/counters.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from asyncio import Future
from datetime import timedelta
from typing import final

Expand Down Expand Up @@ -183,12 +184,12 @@ class Counters:
``allow_message_counter`` enabled.
"""

async def add(
def add(
self,
key: str,
value: int,
timeout: float | timedelta | None = None,
) -> int:
) -> Future[int]:
"""Add an arbitrary value to a counter.

:param key: subject key identifying the counter.
Expand All @@ -198,11 +199,11 @@ class Counters:
:return: the new counter value after the addition.
"""

async def incr(
def incr(
self,
key: str,
timeout: float | timedelta | None = None,
) -> int:
) -> Future[int]:
"""Increment a counter by one.

Shorthand for ``add(key, 1)``.
Expand All @@ -213,11 +214,11 @@ class Counters:
:return: the new counter value after the increment.
"""

async def decr(
def decr(
self,
key: str,
timeout: float | timedelta | None = None,
) -> int:
) -> Future[int]:
"""Decrement a counter by one.

Shorthand for ``add(key, -1)``.
Expand All @@ -228,11 +229,11 @@ class Counters:
:return: the new counter value after the decrement.
"""

async def get(
def get(
self,
key: str,
timeout: float | timedelta | None = None,
) -> CounterEntry:
) -> Future[CounterEntry]:
"""Retrieve the current value of a counter.

:param key: subject key identifying the counter.
Expand Down
Loading
Loading