Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 146 additions & 13 deletions python/natsrpy/_natsrpy_rs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,67 @@ class Message:

@final
class IteratorSubscription:
"""Async iterator subscription for receiving NATS messages.

Returned by :meth:`Nats.subscribe` when no callback is provided.
Messages can be received using ``async for`` or by calling :meth:`next`
directly.
"""

def __aiter__(self) -> IteratorSubscription: ...
async def __anext__(self) -> Message: ...
async def next(self, timeout: float | timedelta | None = None) -> Message: ...
async def unsubscribe(self, limit: int | None = None) -> None: ...
async def drain(self) -> None: ...
async def next(self, timeout: float | timedelta | None = None) -> Message:
"""Receive the next message from the subscription.

:param timeout: maximum time to wait for a message in seconds
or as a timedelta, defaults to None (wait indefinitely).
:return: the next message.
:raises StopAsyncIteration: when the subscription is drained or
unsubscribed.
"""

async def unsubscribe(self, limit: int | None = None) -> None:
"""Unsubscribe from the subject.

:param limit: if set, automatically unsubscribe after receiving
this many additional messages, defaults to None.
"""

async def drain(self) -> None:
"""Drain the subscription.

Unsubscribes and flushes any remaining messages before closing.
"""

@final
class CallbackSubscription:
async def unsubscribe(self, limit: int | None = None) -> None: ...
async def drain(self) -> None: ...
"""Callback-based subscription for receiving NATS messages.

Returned by :meth:`Nats.subscribe` when a callback is provided.
Messages are automatically delivered to the callback in a background task.
"""

async def unsubscribe(self, limit: int | None = None) -> None:
"""Unsubscribe from the subject.

:param limit: if set, automatically unsubscribe after receiving
this many additional messages, defaults to None.
"""

async def drain(self) -> None:
"""Drain the subscription.

Unsubscribes and flushes any remaining messages before closing.
"""

@final
class Nats:
"""NATS client.

Provides publish/subscribe messaging, request-reply, and JetStream
access over a connection to one or more NATS servers.
"""

def __new__(
cls,
/,
Expand All @@ -60,9 +108,45 @@ class Nats:
max_reconnects: int | None = None,
connection_timeout: float | timedelta = ..., # 5 sec
request_timeout: float | timedelta = ..., # 10 sec
) -> Self: ...
async def startup(self) -> None: ...
async def shutdown(self) -> None: ...
) -> Self:
"""Create a new NATS client instance.

The client is not connected until :meth:`startup` is called.

:param addrs: list of NATS server URLs, defaults to
``["nats://localhost:4222"]``.
:param user_and_pass: username and password tuple for authentication.
:param nkey: NKey seed for authentication.
:param token: token string for authentication.
:param custom_inbox_prefix: custom prefix for auto-generated inbox
subjects.
:param read_buffer_capacity: size of the read buffer in bytes,
defaults to 65535.
:param sender_capacity: capacity of the internal send channel,
defaults to 128.
:param max_reconnects: maximum number of reconnection attempts,
None means unlimited.
:param connection_timeout: timeout for establishing a connection
in seconds or as a timedelta, defaults to 5 seconds.
:param request_timeout: default timeout for request-reply operations
in seconds or as a timedelta, defaults to 10 seconds.
"""

async def startup(self) -> None:
"""Connect to the NATS server.

Establishes the connection using the parameters provided at
construction time. Must be called before any publish, subscribe,
or JetStream operations.
"""

async def shutdown(self) -> None:
"""Close the NATS connection.

Drains all subscriptions and flushes pending data before
disconnecting.
"""

async def publish(
self,
subject: str,
Expand All @@ -71,7 +155,18 @@ class Nats:
headers: dict[str, Any] | None = None,
reply: str | None = None,
err_on_disconnect: bool = False,
) -> None: ...
) -> None:
"""Publish a message to a subject.

:param subject: subject to publish the message to.
:param payload: message payload.
:param headers: optional NATS headers dictionary.
:param reply: optional reply-to subject for the request-reply
pattern.
:param err_on_disconnect: when True, raise an error if the client
is disconnected, defaults to False.
"""

async def request(
self,
subject: str,
Expand All @@ -80,9 +175,30 @@ class Nats:
headers: dict[str, Any] | None = None,
inbox: str | None = None,
timeout: float | timedelta | None = None,
) -> None: ...
async def drain(self) -> None: ...
async def flush(self) -> None: ...
) -> None:
"""Send a request and discard the response.

:param subject: subject to send the request to.
:param payload: request payload.
:param headers: optional NATS headers dictionary.
:param inbox: custom inbox subject for the reply, auto-generated
if None.
:param timeout: maximum time to wait for a response in seconds
or as a timedelta, defaults to the client request_timeout.
"""

async def drain(self) -> None:
"""Drain the connection.

Gracefully closes all subscriptions and flushes pending messages.
"""

async def flush(self) -> None:
"""Flush the connection.

Waits until all pending messages have been sent to the server.
"""

@overload
async def subscribe(
self,
Expand All @@ -105,7 +221,24 @@ class Nats:
concurrency_limit: int | None = None,
max_ack_inflight: int | None = None,
backpressure_on_inflight: bool | None = None,
) -> js.JetStream: ...
) -> js.JetStream:
"""Create a JetStream context.

:param domain: JetStream domain to use.
:param api_prefix: custom JetStream API prefix, cannot be used
together with *domain*.
:param timeout: default request timeout for JetStream operations
in seconds or as a timedelta.
:param ack_timeout: acknowledgement timeout for consumers in seconds
or as a timedelta.
:param concurrency_limit: maximum number of concurrent JetStream
operations.
:param max_ack_inflight: maximum number of unacknowledged messages
in flight.
:param backpressure_on_inflight: when True, apply backpressure when
the in-flight limit is reached.
:return: a JetStream context.
"""

__all__ = [
"CallbackSubscription",
Expand Down
11 changes: 8 additions & 3 deletions python/natsrpy/_natsrpy_rs/exceptions.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
class NatsrpyBaseError(Exception): ...
class NatsrpySessionError(NatsrpyBaseError): ...
class NatsrpyPublishError(NatsrpyBaseError): ...
class NatsrpyBaseError(Exception):
"""Base exception for all natsrpy errors."""

class NatsrpySessionError(NatsrpyBaseError):
"""Raised on connection or session-level errors."""

class NatsrpyPublishError(NatsrpyBaseError):
"""Raised when a publish operation fails."""

__all__ = [
"NatsrpyBaseError",
Expand Down
Loading
Loading