diff --git a/.gitignore b/.gitignore index 4cb54d1..32ce7fc 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,4 @@ docs/_build/ .python-version .venv/ target/ +*.profraw diff --git a/python/tests/test_callback_subscription.py b/python/tests/test_callback_subscription.py new file mode 100644 index 0000000..5fdbada --- /dev/null +++ b/python/tests/test_callback_subscription.py @@ -0,0 +1,48 @@ +import asyncio +import uuid + +from natsrpy import CallbackSubscription, Nats + + +async def test_callback_unsubscribe(nats: Nats) -> None: + subj = uuid.uuid4().hex + + async def callback(msg: object) -> None: + pass + + sub = await nats.subscribe(subject=subj, callback=callback) + assert isinstance(sub, CallbackSubscription) + await sub.unsubscribe() + + +async def test_callback_unsubscribe_with_limit(nats: Nats) -> None: + subj = uuid.uuid4().hex + received: list[bytes] = [] + event = asyncio.Event() + + async def callback(msg: object) -> None: + received.append(msg.payload) # type: ignore[attr-defined] + if len(received) >= 2: + event.set() + + sub = await nats.subscribe(subject=subj, callback=callback) + assert isinstance(sub, CallbackSubscription) + await sub.unsubscribe(limit=2) + await nats.publish(subj, b"msg-1") + await nats.publish(subj, b"msg-2") + await asyncio.wait_for(event.wait(), timeout=5.0) + assert received == [b"msg-1", b"msg-2"] + + +async def test_callback_drain(nats_url: str) -> None: + client = Nats(addrs=[nats_url]) + await client.startup() + subj = uuid.uuid4().hex + + async def callback(msg: object) -> None: + pass + + sub = await client.subscribe(subject=subj, callback=callback) + assert isinstance(sub, CallbackSubscription) + await sub.drain() + await client.shutdown() diff --git a/python/tests/test_consumer_properties.py b/python/tests/test_consumer_properties.py new file mode 100644 index 0000000..7c92ea6 --- /dev/null +++ b/python/tests/test_consumer_properties.py @@ -0,0 +1,76 @@ +import uuid + +from natsrpy.js import ( + JetStream, + PullConsumer, + PullConsumerConfig, + PushConsumer, + PushConsumerConfig, + StreamConfig, +) + + +async def test_pull_consumer_name_and_stream(js: JetStream) -> None: + stream_name = f"test-pcns-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + consumer_name = f"consumer-{uuid.uuid4().hex[:8]}" + consumer = await stream.consumers.create( + PullConsumerConfig(name=consumer_name), + ) + assert isinstance(consumer, PullConsumer) + assert consumer.name == consumer_name + assert consumer.stream_name == stream_name + finally: + await js.streams.delete(stream_name) + + +async def test_push_consumer_name_and_stream(js: JetStream) -> None: + stream_name = f"test-pushns-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + deliver_subj = uuid.uuid4().hex + consumer_name = f"push-{uuid.uuid4().hex[:8]}" + consumer = await stream.consumers.create( + PushConsumerConfig(deliver_subject=deliver_subj, name=consumer_name), + ) + assert isinstance(consumer, PushConsumer) + assert consumer.name == consumer_name + assert consumer.stream_name == stream_name + finally: + await js.streams.delete(stream_name) + + +async def test_pull_consumer_messages_iterator(js: JetStream) -> None: + stream_name = f"test-pullmsgiter-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"iter-msg-1", wait=True) + await js.publish(subj, b"iter-msg-2", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + msgs = await consumer.fetch(max_messages=2, timeout=5.0) + assert len(msgs) == 2 + assert msgs[0].payload == b"iter-msg-1" + assert msgs[1].payload == b"iter-msg-2" + finally: + await js.streams.delete(stream_name) + + +async def test_pull_consumer_fetch_empty(js: JetStream) -> None: + stream_name = f"test-fetchempty-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + msgs = await consumer.fetch(max_messages=1, timeout=0.5) + assert len(msgs) == 0 + finally: + await js.streams.delete(stream_name) diff --git a/python/tests/test_consumers_manager.py b/python/tests/test_consumers_manager.py new file mode 100644 index 0000000..9b354ca --- /dev/null +++ b/python/tests/test_consumers_manager.py @@ -0,0 +1,107 @@ +import uuid +from datetime import timedelta + +from natsrpy.js import ( + JetStream, + PullConsumer, + PullConsumerConfig, + PushConsumer, + PushConsumerConfig, + StreamConfig, +) + + +async def test_consumers_manager_delete(js: JetStream) -> None: + stream_name = f"test-cmdel-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + consumer_name = f"consumer-{uuid.uuid4().hex[:8]}" + await stream.consumers.create(PullConsumerConfig(name=consumer_name)) + result = await stream.consumers.delete(consumer_name) + assert result is True + finally: + await js.streams.delete(stream_name) + + +async def test_consumers_manager_get_push(js: JetStream) -> None: + stream_name = f"test-cmgp-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + deliver_subj = uuid.uuid4().hex + consumer_name = f"push-{uuid.uuid4().hex[:8]}" + await stream.consumers.create( + PushConsumerConfig(deliver_subject=deliver_subj, name=consumer_name), + ) + consumer = await stream.consumers.get_push(consumer_name) + assert isinstance(consumer, PushConsumer) + assert consumer.name == consumer_name + finally: + await js.streams.delete(stream_name) + + +async def test_consumers_manager_update_pull(js: JetStream) -> None: + stream_name = f"test-cmup-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + consumer_name = f"consumer-{uuid.uuid4().hex[:8]}" + cfg = PullConsumerConfig(name=consumer_name, description="original") + await stream.consumers.create(cfg) + cfg.description = "updated" + updated = await stream.consumers.update(cfg) + assert isinstance(updated, PullConsumer) + finally: + await js.streams.delete(stream_name) + + +async def test_consumers_manager_update_push(js: JetStream) -> None: + stream_name = f"test-cmupp-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + deliver_subj = uuid.uuid4().hex + consumer_name = f"push-{uuid.uuid4().hex[:8]}" + cfg = PushConsumerConfig( + deliver_subject=deliver_subj, + name=consumer_name, + description="original", + ) + await stream.consumers.create(cfg) + cfg.description = "updated" + updated = await stream.consumers.update(cfg) + assert isinstance(updated, PushConsumer) + finally: + await js.streams.delete(stream_name) + + +async def test_consumers_manager_pause_and_resume(js: JetStream) -> None: + stream_name = f"test-cmpr-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + consumer_name = f"consumer-{uuid.uuid4().hex[:8]}" + await stream.consumers.create(PullConsumerConfig(name=consumer_name)) + paused = await stream.consumers.pause(consumer_name, delay=60.0) + assert isinstance(paused, bool) + resumed = await stream.consumers.resume(consumer_name) + assert isinstance(resumed, bool) + finally: + await js.streams.delete(stream_name) + + +async def test_consumers_manager_pause_timedelta(js: JetStream) -> None: + stream_name = f"test-cmpd-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + consumer_name = f"consumer-{uuid.uuid4().hex[:8]}" + await stream.consumers.create(PullConsumerConfig(name=consumer_name)) + paused = await stream.consumers.pause( + consumer_name, + delay=timedelta(seconds=60), + ) + assert isinstance(paused, bool) + finally: + await js.streams.delete(stream_name) diff --git a/python/tests/test_js_message_acks.py b/python/tests/test_js_message_acks.py new file mode 100644 index 0000000..bc4b3bc --- /dev/null +++ b/python/tests/test_js_message_acks.py @@ -0,0 +1,168 @@ +import uuid + +from natsrpy.js import ( + AckPolicy, + JetStream, + PullConsumerConfig, + StreamConfig, +) + + +async def test_message_ack_double(js: JetStream) -> None: + stream_name = f"test-ackd-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"ack-double", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig( + name=f"consumer-{uuid.uuid4().hex[:8]}", + ack_policy=AckPolicy.EXPLICIT, + ), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + await messages[0].ack(double=True) + finally: + await js.streams.delete(stream_name) + + +async def test_message_nack_with_delay(js: JetStream) -> None: + stream_name = f"test-nackd-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"nack-delay", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig( + name=f"consumer-{uuid.uuid4().hex[:8]}", + ack_policy=AckPolicy.EXPLICIT, + ), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + await messages[0].nack(delay=2.0) + finally: + await js.streams.delete(stream_name) + + +async def test_message_nack_double(js: JetStream) -> None: + stream_name = f"test-nackdb-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"nack-double", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig( + name=f"consumer-{uuid.uuid4().hex[:8]}", + ack_policy=AckPolicy.EXPLICIT, + ), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + await messages[0].nack(double=True) + finally: + await js.streams.delete(stream_name) + + +async def test_message_next_ack(js: JetStream) -> None: + stream_name = f"test-nextack-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"next-ack", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig( + name=f"consumer-{uuid.uuid4().hex[:8]}", + ack_policy=AckPolicy.EXPLICIT, + ), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + await messages[0].next() + finally: + await js.streams.delete(stream_name) + + +async def test_message_term_double(js: JetStream) -> None: + stream_name = f"test-termd-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"term-double", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig( + name=f"consumer-{uuid.uuid4().hex[:8]}", + ack_policy=AckPolicy.EXPLICIT, + ), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + await messages[0].term(double=True) + finally: + await js.streams.delete(stream_name) + + +async def test_message_progress_double(js: JetStream) -> None: + stream_name = f"test-progd-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"progress-double", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig( + name=f"consumer-{uuid.uuid4().hex[:8]}", + ack_policy=AckPolicy.EXPLICIT, + ), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + await messages[0].progress(double=True) + await messages[0].ack() + finally: + await js.streams.delete(stream_name) + + +async def test_message_domain_and_acc_hash(js: JetStream) -> None: + stream_name = f"test-domhash-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"domain-test", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + # domain may be None for a non-domain jetstream + assert msg.domain is None or isinstance(msg.domain, str) + assert msg.acc_hash is None or isinstance(msg.acc_hash, str) + assert msg.token is None or isinstance(msg.token, str) + assert msg.reply is None or isinstance(msg.reply, str) + finally: + await js.streams.delete(stream_name) + + +async def test_message_headers_empty(js: JetStream) -> None: + stream_name = f"test-msghdr-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"no-headers", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + assert isinstance(messages[0].headers, dict) + finally: + await js.streams.delete(stream_name) diff --git a/python/tests/test_kv_manager.py b/python/tests/test_kv_manager.py new file mode 100644 index 0000000..6e63311 --- /dev/null +++ b/python/tests/test_kv_manager.py @@ -0,0 +1,20 @@ +import uuid + +from natsrpy.js import ( + JetStream, + KeyValue, + KVConfig, +) + + +async def test_kv_manager_update(js: JetStream) -> None: + bucket = f"test-kv-mgrupd-{uuid.uuid4().hex[:8]}" + config = KVConfig(bucket=bucket, description="original") + await js.kv.create(config) + try: + config.description = "updated description" + updated_kv = await js.kv.update(config) + assert isinstance(updated_kv, KeyValue) + assert updated_kv.name == bucket + finally: + await js.kv.delete(bucket) diff --git a/python/tests/test_publication_and_extras.py b/python/tests/test_publication_and_extras.py new file mode 100644 index 0000000..e16e18a --- /dev/null +++ b/python/tests/test_publication_and_extras.py @@ -0,0 +1,63 @@ +import uuid + +from natsrpy import Nats +from natsrpy.js import JetStream, Publication, StreamConfig + + +async def test_publication_properties(js: JetStream) -> None: + stream_name = f"test-pub-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + await js.streams.create(config) + try: + pub = await js.publish(subj, b"pub-test", wait=True) + assert isinstance(pub, Publication) + assert pub.stream == stream_name + assert pub.sequence >= 1 + assert isinstance(pub.domain, str) + assert isinstance(pub.duplicate, bool) + assert pub.duplicate is False + finally: + await js.streams.delete(stream_name) + + +async def test_publication_value_none(js: JetStream) -> None: + stream_name = f"test-pubval-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + await js.streams.create(config) + try: + pub = await js.publish(subj, b"val-test", wait=True) + # value is only set for counters, None for regular streams + assert pub.value is None + finally: + await js.streams.delete(stream_name) + + +async def test_subscribe_with_queue_group(nats: Nats) -> None: + subj = uuid.uuid4().hex + queue = f"queue-{uuid.uuid4().hex[:8]}" + sub = await nats.subscribe(subject=subj, queue=queue) + await nats.publish(subj, b"queue-msg") + msg = await sub.next(timeout=5.0) + assert msg.payload == b"queue-msg" + await sub.unsubscribe() + + +async def test_nats_publish_bytearray(nats: Nats) -> None: + subj = uuid.uuid4().hex + payload = bytearray(b"bytearray-payload") + sub = await nats.subscribe(subject=subj) + await nats.publish(subj, payload) + msg = await anext(sub) + assert msg.payload == bytes(payload) + + +async def test_nats_publish_memoryview(nats: Nats) -> None: + subj = uuid.uuid4().hex + data = b"memoryview-payload" + payload = memoryview(data) + sub = await nats.subscribe(subject=subj) + await nats.publish(subj, payload) + msg = await anext(sub) + assert msg.payload == data diff --git a/python/tests/test_stream_config_types.py b/python/tests/test_stream_config_types.py new file mode 100644 index 0000000..7f001ae --- /dev/null +++ b/python/tests/test_stream_config_types.py @@ -0,0 +1,97 @@ +from datetime import timedelta + +from natsrpy.js import ( + ConsumerLimits, + External, + Placement, + Source, +) + + +async def test_external_construct() -> None: + ext = External(api_prefix="$JS.API") + assert ext.api_prefix == "$JS.API" + assert ext.delivery_prefix is None + + +async def test_external_with_delivery_prefix() -> None: + ext = External(api_prefix="$JS.API", delivery_prefix="$JS.DELIVER") + assert ext.api_prefix == "$JS.API" + assert ext.delivery_prefix == "$JS.DELIVER" + + +async def test_external_setters() -> None: + ext = External(api_prefix="$JS.API") + ext.api_prefix = "$JS.NEW" + assert ext.api_prefix == "$JS.NEW" + ext.delivery_prefix = "$JS.DEL" + assert ext.delivery_prefix == "$JS.DEL" + + +async def test_source_construct_minimal() -> None: + src = Source(name="src-stream") + assert src.name == "src-stream" + assert src.filter_subject is None + assert src.external is None + assert src.start_sequence is None + assert src.domain is None + + +async def test_source_with_filter() -> None: + src = Source(name="src-stream", filter_subject="test.>") + assert src.name == "src-stream" + assert src.filter_subject == "test.>" + + +async def test_source_with_external() -> None: + ext = External(api_prefix="$JS.API") + src = Source(name="src-stream", external=ext) + assert src.name == "src-stream" + assert src.external is not None + + +async def test_source_setters() -> None: + src = Source(name="src-stream") + src.name = "new-stream" + assert src.name == "new-stream" + src.filter_subject = "filter.>" + assert src.filter_subject == "filter.>" + + +async def test_placement_construct() -> None: + p = Placement(cluster="us-east", tags=["fast", "ssd"]) + assert p.cluster == "us-east" + assert p.tags == ["fast", "ssd"] + + +async def test_placement_defaults() -> None: + p = Placement() + assert p.cluster is None + # default is empty list + assert p.tags == [] + + +async def test_placement_setters() -> None: + p = Placement() + p.cluster = "eu-west" + assert p.cluster == "eu-west" + p.tags = ["backup"] + assert p.tags == ["backup"] + + +async def test_consumer_limits_construct() -> None: + cl = ConsumerLimits( + inactive_threshold=timedelta(seconds=30), + max_ack_pending=100, + ) + # ConsumerLimits doesn't expose its fields as Python getters, + # but the constructor should succeed without errors. + assert cl is not None + + +async def test_consumer_limits_construct_with_float() -> None: + cl = ConsumerLimits( + inactive_threshold=timedelta(seconds=30), + max_ack_pending=50, + ) + assert cl is not None diff --git a/python/tests/test_stream_purge.py b/python/tests/test_stream_purge.py new file mode 100644 index 0000000..ad9d231 --- /dev/null +++ b/python/tests/test_stream_purge.py @@ -0,0 +1,39 @@ +import uuid + +from natsrpy.js import JetStream, StreamConfig + + +async def test_stream_purge_with_sequence(js: JetStream) -> None: + name = f"test-purgeseq-{uuid.uuid4().hex[:8]}" + subj = f"{name}.data" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"msg-1", wait=True) + await js.publish(subj, b"msg-2", wait=True) + await js.publish(subj, b"msg-3", wait=True) + purged = await stream.purge(sequence=2) + assert purged >= 1 + info = await stream.get_info() + assert info.state.messages < 3 + finally: + await js.streams.delete(name) + + +async def test_stream_purge_with_keep(js: JetStream) -> None: + name = f"test-purgekeep-{uuid.uuid4().hex[:8]}" + subj = f"{name}.data" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"msg-1", wait=True) + await js.publish(subj, b"msg-2", wait=True) + await js.publish(subj, b"msg-3", wait=True) + await js.publish(subj, b"msg-4", wait=True) + await js.publish(subj, b"msg-5", wait=True) + purged = await stream.purge(keep=2) + assert purged == 3 + info = await stream.get_info() + assert info.state.messages == 2 + finally: + await js.streams.delete(name)