diff --git a/taskiq_nats/broker.py b/taskiq_nats/broker.py index 62b272d..0363379 100644 --- a/taskiq_nats/broker.py +++ b/taskiq_nats/broker.py @@ -7,6 +7,7 @@ from nats.errors import TimeoutError as NatsTimeoutError from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, StreamConfig +from nats.js.errors import NotFoundError as StreamNotFoundError from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage _T = typing.TypeVar("_T") # (Too short) @@ -138,6 +139,15 @@ def __init__( self.consumer: JetStreamConsumerType + async def _add_or_reuse_stream(self) -> None: + try: + await self.js.stream_info(self.stream_config.name) + await self.js.update_stream(self.stream_config) + logger.info(f"Stream {self.stream_config.name} already exists and was reused.") + except StreamNotFoundError: + await self.js.add_stream(config=self.stream_config) + logger.info(f"Created stream {self.stream_config.name}") + async def startup(self) -> None: """ Startup event handler. @@ -152,7 +162,7 @@ async def startup(self) -> None: self.stream_config.name = self.stream_name if not self.stream_config.subjects: self.stream_config.subjects = [self.subject] - await self.js.add_stream(config=self.stream_config) + await self._add_or_reuse_stream() await self._startup_consumer() async def shutdown(self) -> None: