Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ docs/_build/
.python-version
.venv/
target/
*.profraw
48 changes: 48 additions & 0 deletions python/tests/test_callback_subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import asyncio
import uuid

from natsrpy import CallbackSubscription, Nats


async def test_callback_unsubscribe(nats: Nats) -> None:
subj = uuid.uuid4().hex

async def callback(msg: object) -> None:
pass

sub = await nats.subscribe(subject=subj, callback=callback)
assert isinstance(sub, CallbackSubscription)
await sub.unsubscribe()


async def test_callback_unsubscribe_with_limit(nats: Nats) -> None:
subj = uuid.uuid4().hex
received: list[bytes] = []
event = asyncio.Event()

async def callback(msg: object) -> None:
received.append(msg.payload) # type: ignore[attr-defined]
if len(received) >= 2:
event.set()

sub = await nats.subscribe(subject=subj, callback=callback)
assert isinstance(sub, CallbackSubscription)
await sub.unsubscribe(limit=2)
await nats.publish(subj, b"msg-1")
await nats.publish(subj, b"msg-2")
await asyncio.wait_for(event.wait(), timeout=5.0)
assert received == [b"msg-1", b"msg-2"]


async def test_callback_drain(nats_url: str) -> None:
client = Nats(addrs=[nats_url])
await client.startup()
subj = uuid.uuid4().hex

async def callback(msg: object) -> None:
pass

sub = await client.subscribe(subject=subj, callback=callback)
assert isinstance(sub, CallbackSubscription)
await sub.drain()
await client.shutdown()
76 changes: 76 additions & 0 deletions python/tests/test_consumer_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import uuid

from natsrpy.js import (
JetStream,
PullConsumer,
PullConsumerConfig,
PushConsumer,
PushConsumerConfig,
StreamConfig,
)


async def test_pull_consumer_name_and_stream(js: JetStream) -> None:
stream_name = f"test-pcns-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
consumer = await stream.consumers.create(
PullConsumerConfig(name=consumer_name),
)
assert isinstance(consumer, PullConsumer)
assert consumer.name == consumer_name
assert consumer.stream_name == stream_name
finally:
await js.streams.delete(stream_name)


async def test_push_consumer_name_and_stream(js: JetStream) -> None:
stream_name = f"test-pushns-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
deliver_subj = uuid.uuid4().hex
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
consumer = await stream.consumers.create(
PushConsumerConfig(deliver_subject=deliver_subj, name=consumer_name),
)
assert isinstance(consumer, PushConsumer)
assert consumer.name == consumer_name
assert consumer.stream_name == stream_name
finally:
await js.streams.delete(stream_name)


async def test_pull_consumer_messages_iterator(js: JetStream) -> None:
stream_name = f"test-pullmsgiter-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"iter-msg-1", wait=True)
await js.publish(subj, b"iter-msg-2", wait=True)
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
msgs = await consumer.fetch(max_messages=2, timeout=5.0)
assert len(msgs) == 2
assert msgs[0].payload == b"iter-msg-1"
assert msgs[1].payload == b"iter-msg-2"
finally:
await js.streams.delete(stream_name)


async def test_pull_consumer_fetch_empty(js: JetStream) -> None:
stream_name = f"test-fetchempty-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
msgs = await consumer.fetch(max_messages=1, timeout=0.5)
assert len(msgs) == 0
finally:
await js.streams.delete(stream_name)
107 changes: 107 additions & 0 deletions python/tests/test_consumers_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import uuid
from datetime import timedelta

from natsrpy.js import (
JetStream,
PullConsumer,
PullConsumerConfig,
PushConsumer,
PushConsumerConfig,
StreamConfig,
)


async def test_consumers_manager_delete(js: JetStream) -> None:
stream_name = f"test-cmdel-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
result = await stream.consumers.delete(consumer_name)
assert result is True
finally:
await js.streams.delete(stream_name)


async def test_consumers_manager_get_push(js: JetStream) -> None:
stream_name = f"test-cmgp-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
deliver_subj = uuid.uuid4().hex
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(
PushConsumerConfig(deliver_subject=deliver_subj, name=consumer_name),
)
consumer = await stream.consumers.get_push(consumer_name)
assert isinstance(consumer, PushConsumer)
assert consumer.name == consumer_name
finally:
await js.streams.delete(stream_name)


async def test_consumers_manager_update_pull(js: JetStream) -> None:
stream_name = f"test-cmup-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
cfg = PullConsumerConfig(name=consumer_name, description="original")
await stream.consumers.create(cfg)
cfg.description = "updated"
updated = await stream.consumers.update(cfg)
assert isinstance(updated, PullConsumer)
finally:
await js.streams.delete(stream_name)


async def test_consumers_manager_update_push(js: JetStream) -> None:
stream_name = f"test-cmupp-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
deliver_subj = uuid.uuid4().hex
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
cfg = PushConsumerConfig(
deliver_subject=deliver_subj,
name=consumer_name,
description="original",
)
await stream.consumers.create(cfg)
cfg.description = "updated"
updated = await stream.consumers.update(cfg)
assert isinstance(updated, PushConsumer)
finally:
await js.streams.delete(stream_name)


async def test_consumers_manager_pause_and_resume(js: JetStream) -> None:
stream_name = f"test-cmpr-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
paused = await stream.consumers.pause(consumer_name, delay=60.0)
assert isinstance(paused, bool)
resumed = await stream.consumers.resume(consumer_name)
assert isinstance(resumed, bool)
finally:
await js.streams.delete(stream_name)


async def test_consumers_manager_pause_timedelta(js: JetStream) -> None:
stream_name = f"test-cmpd-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
paused = await stream.consumers.pause(
consumer_name,
delay=timedelta(seconds=60),
)
assert isinstance(paused, bool)
finally:
await js.streams.delete(stream_name)
Loading
Loading