Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions roborock/mqtt/roborock_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
_LOGGER = logging.getLogger(__name__)
_MQTT_LOGGER = logging.getLogger(f"{__name__}.aiomqtt")

CLIENT_KEEPALIVE = datetime.timedelta(seconds=60)
CLIENT_KEEPALIVE = datetime.timedelta(seconds=45)
TOPIC_KEEPALIVE = datetime.timedelta(seconds=60)

# Exponential backoff parameters
Expand Down Expand Up @@ -56,9 +56,12 @@ class RoborockMqttSession(MqttSession):
The client is run as a background task that will run until shutdown. Once
connected, the client will wait for messages to be received in a loop. If
the connection is lost, the client will be re-created and reconnected. There
is backoff to avoid spamming the broker with connection attempts. The client
will automatically re-establish any subscriptions when the connection is
re-established.
is backoff to avoid spamming the broker with connection attempts.

Reconnect attempts are deferred while there are no active subscriptions,
which avoids unnecessary reconnect churn for idle sessions. Reconnects
resume as soon as a subscription is added again. The client automatically
re-establishes any existing subscriptions when the connection returns.
"""

def __init__(
Expand Down Expand Up @@ -175,6 +178,16 @@ async def _run_reconnect_loop(self, start_future: asyncio.Future[None] | None) -
if self._stop:
_LOGGER.debug("MQTT session closed, stopping retry loop")
return
if not self._client_subscribed_topics and not self._listeners.keys():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this implementation looks good. One thought is this behavior could live in LazyMqttSession. I can see that on one hand (a) it could be easier to make a more efficient "wake up" method in the lazy session if the logic is simpler (e.g. using an asyncio.Event to do wake up when a new subscriber is added) but (b) might make it more complex to have an "API" between the two classes exposed for this. Just sharing a thought but not need to act on it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a good idea, I don't think i'll act on it right now, but we can see how things end up looking with future work with the mqtt connection and how we handle the subscriptions

_LOGGER.debug("MQTT session disconnected with no active subscriptions, deferring reconnect")
self._diagnostics.increment("reconnect_deferred")
while not self._stop and not self._client_subscribed_topics and not self._listeners.keys():
await asyncio.sleep(0.1)
if self._stop:
_LOGGER.debug("MQTT session closed while waiting for active subscriptions")
return
self._backoff = MIN_BACKOFF_INTERVAL
continue
_LOGGER.info("MQTT session disconnected, retrying in %s seconds", self._backoff.total_seconds())
self._diagnostics.increment("reconnect_wait")
await asyncio.sleep(self._backoff.total_seconds())
Expand Down
10 changes: 5 additions & 5 deletions tests/e2e/__snapshots__/test_device_manager.ambr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: test_a01_device[home_data0]
[mqtt >]
00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....|
00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....|
00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467|
00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e|
[mqtt <]
Expand Down Expand Up @@ -29,7 +29,7 @@
# ---
# name: test_l01_device
[mqtt >]
00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....|
00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....|
00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467|
00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e|
[mqtt <]
Expand Down Expand Up @@ -240,7 +240,7 @@
# ---
# name: test_q10_device[home_data0]
[mqtt >]
00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....|
00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....|
00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467|
00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e|
[mqtt <]
Expand All @@ -262,7 +262,7 @@
# ---
# name: test_q7_device[home_data0]
[mqtt >]
00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....|
00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....|
00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467|
00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e|
[mqtt <]
Expand Down Expand Up @@ -326,7 +326,7 @@
# ---
# name: test_v1_device
[mqtt >]
00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....|
00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....|
00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467|
00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e|
[mqtt <]
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/__snapshots__/test_mqtt_session.ambr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: test_session_e2e_publish_message
[mqtt >]
00000000 10 21 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.!..MQTT...<....|
00000000 10 21 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.!..MQTT...-....|
00000010 08 75 73 65 72 6e 61 6d 65 00 08 70 61 73 73 77 |.username..passw|
00000020 6f 72 64 |ord|
[mqtt <]
Expand All @@ -15,7 +15,7 @@
# ---
# name: test_session_e2e_receive_message
[mqtt >]
00000000 10 21 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.!..MQTT...<....|
00000000 10 21 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.!..MQTT...-....|
00000010 08 75 73 65 72 6e 61 6d 65 00 08 70 61 73 73 77 |.username..passw|
00000020 6f 72 64 |ord|
[mqtt <]
Expand Down
25 changes: 25 additions & 0 deletions tests/mqtt/test_roborock_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,33 @@ def succeed_then_fail_unauthorized() -> Any:
session = await create_mqtt_session(params)
assert session.connected

# Keep an active subscription so reconnect attempts are not deferred.
await session.subscribe("topic-1", Subscriber().append)

try:
async with asyncio.timeout(10):
assert await unauthorized.wait()
finally:
await session.close()


async def test_session_defers_reconnect_when_idle(
mock_aenter_client: AsyncMock,
message_iterator: FakeAsyncIterator,
mqtt_client_lite: AsyncMock,
) -> None:
"""Test that reconnects are deferred when there are no active subscriptions."""

params = copy.deepcopy(FAKE_PARAMS)
message_iterator.loop = False

session = await create_mqtt_session(params)

assert mqtt_client_lite.messages is message_iterator

try:
await asyncio.sleep(0.1)
assert mock_aenter_client.await_count == 1
assert params.diagnostics.as_dict().get("reconnect_deferred", 0) >= 1
finally:
await session.close()