Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@ serde_json = "1.0.149"
thiserror = "2.0.18"
time = "0.3.47"
tokio = { version = "1.50.0", features = ["full"] }

[profile.release]
lto = "fat"
codegen-units = 1
opt-level = 3
strip = true
debug = false
panic = "abort"
66 changes: 66 additions & 0 deletions examples/consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import asyncio

from natsrpy import Nats
from natsrpy.js import PullConsumerConfig, PushConsumerConfig, StreamConfig


async def main() -> None:
"""Main function to run the example."""
nats = Nats(["nats://localhost:4222"])
await nats.startup()

js = await nats.jetstream()

stream = await js.streams.create_or_update(
StreamConfig(
name="stream-example",
subjects=["stream.example.>"],
description="Stream example",
),
)

# Push and pull consumers have different configurations.
# If you supply PushConsumerConfig, you will get a push consumer,
# and otherwise you will get a PullConsumer.
#
# They have different APIs.
pull_consumer = await stream.consumers.create(
PullConsumerConfig(
name="example-pull",
durable_name="example-pull",
),
)
push_consumer = await stream.consumers.create(
PushConsumerConfig(
name="example-push",
deliver_subject="example-push",
durable_name="example-push",
),
)

# We publish a single message
await js.publish("stream.example.test", "message for stream")

# We use messages() to get async iterator which we
# use to get messages for push_consumer.
async for push_message in await push_consumer.messages():
print(f"[FROM_PUSH] {push_message.payload}") # noqa: T201
await push_message.ack()
break

# Pull consumers have to request batches of messages.
for pull_message in await pull_consumer.fetch(max_messages=10):
print(f"[FROM_PULL] {pull_message.payload}") # noqa: T201
await pull_message.ack()

# Cleanup
await stream.consumers.delete(push_consumer.name)
await stream.consumers.delete(pull_consumer.name)
await js.streams.delete(stream.name)

# Don't forget to call shutdown.
await nats.shutdown()


if __name__ == "__main__":
asyncio.run(main())
45 changes: 45 additions & 0 deletions examples/counters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio

from natsrpy import Nats
from natsrpy.js import CountersConfig


async def main() -> None:
"""Main function to run the example."""
nats = Nats(["nats://localhost:4222"])
await nats.startup()

js = await nats.jetstream()
# Counters store is basically a stream,
# but each subject is considered as a counter.
# You can read more about how it works here:
# https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-49.md
counters = await js.counters.create_or_update(
CountersConfig(
name="counters",
subjects=["counters.>"],
),
)

# We have a nice interface for counters.
# Please note, that ADD accepts
# positive and NEGATIVE values.
# I'd rename this method, but this API is
# defined in ADR-49, so we won't change this.
#
# Each add\incr\decr returns current value of the counter.
await counters.add("counters.one", +8)
await counters.add("counters.one", -2)
# Increase by 1
await counters.incr("counters.one")
# Decrease by 1
await counters.decr("counters.one")

print(await counters.get("counters.one")) # noqa: T201

# Don't forget to call shutdown.
await nats.shutdown()


if __name__ == "__main__":
asyncio.run(main())
46 changes: 46 additions & 0 deletions examples/kv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio

from natsrpy import Nats
from natsrpy.js import KVConfig


async def main() -> None:
"""Main function to run the example."""
nats = Nats(["nats://localhost:4222"])
await nats.startup()

js = await nats.jetstream()

kv = await js.kv.create_or_update(KVConfig(bucket="kv-example"))

watcher = await kv.watch("test-key")

await kv.put("test-key", "one")
await kv.put("test-key", b"two")

# To obtain bytes value.
value = await kv.get("test-key")
if value:
print("[VALUE]", value.decode()) # noqa: T201
# To get kv-entry with all
# the metadata.
entry = await kv.entry("test-key")
if entry:
print("[ENTRY]", entry) # noqa: T201

await kv.delete("test-key")

# Alternatively you can
# use await watcher.next()
async for event in watcher:
print("[EVENT]", event) # noqa: T201
break

await js.kv.delete(kv.name)

# Don't forget to call shutdown.
await nats.shutdown()


if __name__ == "__main__":
asyncio.run(main())
45 changes: 45 additions & 0 deletions examples/object_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio
import io
from datetime import timedelta
from pathlib import Path

from natsrpy import Nats
from natsrpy.js import ObjectStoreConfig


async def main() -> None:
"""Main function to run the example."""
nats = Nats(["nats://localhost:4222"])
await nats.startup()

js = await nats.jetstream()

store = await js.object_store.create(
ObjectStoreConfig(
bucket="example-bucket",
max_age=timedelta(minutes=1),
),
)
await store.put("test2.py", Path(__file__).read_bytes())
await store.put("test.py", str(Path(__file__)))

async for obj in await store.list():
print(obj) # noqa: T201
# We use BytesIO, since
# get takes writer as it's argument.
#
# That happens because files can be very large,
# and this approach allows us to stream directly
# to files. using `open('file', 'wb') as output:`
with io.BytesIO() as output:
await store.get(obj.name, output)
assert obj.size == len(output.getvalue()) # noqa: S101

await store.delete(obj.name)

# Don't forget to call shutdown.
await nats.shutdown()


if __name__ == "__main__":
asyncio.run(main())
42 changes: 42 additions & 0 deletions examples/request_reply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio

from natsrpy import Message, Nats


async def main() -> None:
"""Main function to run the example."""
nats = Nats(["nats://localhost:4222"])
await nats.startup()
subj = "request-reply"

# Here we create responder, that will be
# answering to our requests.
async def responder(message: Message) -> None:
print(f"[REQUEST]: {message.payload}, headers={message.headers}") # noqa: T201
if message.reply:
await nats.publish(
message.reply,
f"reply to {message.payload}",
headers=message.headers,
)

# Start responder using callback-based subsciption.
sub = await nats.subscribe(subj, callback=responder)
# Send 3 concurrent requests.
responses = await asyncio.gather(
nats.request(subj, "request1"),
nats.request(subj, "request2", headers={"header": "value"}),
nats.request(subj, "request3", inbox="test-inbox"),
)
# Disconnect resonder.
await sub.drain()

# Iterate over replies.
for resp in responses:
print(f"[RESPONSE]: {resp}") # noqa: T201

await nats.shutdown()


if __name__ == "__main__":
asyncio.run(main())
35 changes: 35 additions & 0 deletions examples/simple_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio

from natsrpy import Nats


async def main() -> None:
"""Main function to run the example."""
nats = Nats(["nats://localhost:4222"])
await nats.startup()

# Here we initiate subscription.
# We do it before sending messages,
# in order to catch them once we will start reading.
subscription = await nats.subscribe("hello")

# Publish accepts str | bytes | bytearray | memoryview
await nats.publish("hello", "str world")
await nats.publish("hello", b"bytes world")
await nats.publish("hello", bytearray(b"bytearray world"))
await nats.publish("hello", "headers", headers={"one": "two"})
await nats.publish("hello", "headers", headers={"one": ["two", "three"]})

# Calling this method will unsubscribe us,
# after `n` delivered messages.
# or immediately if `n` is not provided.
subscription.unsubscribe(limit=5)
async for message in subscription:
print(message) # noqa: T201

# Don't forget to call shutdown.
await nats.shutdown()


if __name__ == "__main__":
asyncio.run(main())
56 changes: 56 additions & 0 deletions examples/subscriptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio

from natsrpy import Message, Nats


async def main() -> None:
"""Main function to run the example."""
nats = Nats(["nats://localhost:4222"])
await nats.startup()

cb_lock = asyncio.Event()

async def callback(message: Message) -> None:
print(f"[FROM_CALLBACK] {message.payload}") # noqa: T201
cb_lock.set()

# When subscribing you can set callback.
# In that case CallbackSubscription is returned.
# This type of subscription cannot be iterated.
cb_sub = await nats.subscribe("cb-subj", callback=callback)

# When callback is not set, you get a subscription
# that should be used along with `async for`
# loop, or alternatively you can call
# `await iter_sub.next()` to get a single message.
iter_sub = await nats.subscribe("iter-subj")

# Subscriptions with queue argument create
# subscription with a queue group to distribute
# messages along all subscribers.
queue_sub = await nats.subscribe("queue-subj", queue="example-queue")

await nats.publish("cb-subj", "message for callback")
await nats.publish("iter-subj", "message for iterator")
await nats.publish("queue-subj", "message for queue sub")

# We can unsubscribe after a particular amount of messages.
await iter_sub.unsubscribe(limit=1)
await cb_sub.unsubscribe(limit=1)
await queue_sub.unsubscribe(limit=1)

async for message in iter_sub:
print(f"[FROM_ITERATOR] {message.payload}") # noqa: T201

async for message in queue_sub:
print(f"[FROM_QUEUED] {message.payload}") # noqa: T201

# Making sure that the message in callback is received.
await cb_lock.wait()

# Don't forget to call shutdown.
await nats.shutdown()


if __name__ == "__main__":
asyncio.run(main())
2 changes: 2 additions & 0 deletions python/natsrpy/_natsrpy_rs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,14 @@ class Nats:
self,
subject: str,
callback: Callable[[Message], Awaitable[None]],
queue: str | None = None,
) -> Future[CallbackSubscription]: ...
@overload
def subscribe(
self,
subject: str,
callback: None = None,
queue: str | None = None,
) -> Future[IteratorSubscription]: ...
def jetstream(
self,
Expand Down
16 changes: 16 additions & 0 deletions python/natsrpy/_natsrpy_rs/js/consumers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ class PushConsumer:
Messages are delivered by the server to a specified subject.
"""

@property
def name(self) -> str:
"""Get consumer name."""

@property
def stream_name(self) -> str:
"""Get stream name that this consumer attached to."""

def messages(self) -> Future[MessagesIterator]:
"""Get an async iterator for consuming messages.

Expand All @@ -313,6 +321,14 @@ class PullConsumer:
Messages are fetched on demand in batches by the client.
"""

@property
def name(self) -> str:
"""Get consumer name."""

@property
def stream_name(self) -> str:
"""Get stream name that this consumer attached to."""

def fetch(
self,
max_messages: int | None = None,
Expand Down
Loading
Loading