-
Notifications
You must be signed in to change notification settings - Fork 68
q7/b01: command-layer segment clean + map payload retrieval helpers (split 1/3) #778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2d03d6a
861b338
632b160
733159c
a20ae01
0b8f6fe
211f486
76531ad
4887362
8303a8c
71b785f
88bdb42
3ca8043
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,31 +5,67 @@ | |
| import asyncio | ||
| import json | ||
| import logging | ||
| from typing import Any | ||
| from collections.abc import Callable | ||
| from typing import Any, TypeVar | ||
|
|
||
| from roborock.devices.transport.mqtt_channel import MqttChannel | ||
| from roborock.exceptions import RoborockException | ||
| from roborock.protocols.b01_q7_protocol import ( | ||
| Q7RequestMessage, | ||
| decode_rpc_response, | ||
| encode_mqtt_payload, | ||
| ) | ||
| from roborock.roborock_message import RoborockMessage | ||
| from roborock.protocols.b01_q7_protocol import B01_VERSION, Q7RequestMessage, decode_rpc_response, encode_mqtt_payload | ||
| from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
| _TIMEOUT = 10.0 | ||
| _T = TypeVar("_T") | ||
|
|
||
|
|
||
| def _matches_map_response(response_message: RoborockMessage, *, version: bytes | None) -> bytes | None: | ||
| """Return raw map payload bytes for matching MAP_RESPONSE messages.""" | ||
| if ( | ||
| response_message.protocol == RoborockMessageProtocol.MAP_RESPONSE | ||
| and response_message.payload | ||
| and response_message.version == version | ||
| ): | ||
| return response_message.payload | ||
| return None | ||
|
|
||
|
|
||
| async def _send_command( | ||
| mqtt_channel: MqttChannel, | ||
| request_message: Q7RequestMessage, | ||
| *, | ||
| response_matcher: Callable[[RoborockMessage], _T | None], | ||
| ) -> _T: | ||
| """Publish a B01 command and resolve on the first matching response.""" | ||
| roborock_message = encode_mqtt_payload(request_message) | ||
| future: asyncio.Future[_T] = asyncio.get_running_loop().create_future() | ||
|
|
||
| def on_message(response_message: RoborockMessage) -> None: | ||
| if future.done(): | ||
| return | ||
| try: | ||
| response = response_matcher(response_message) | ||
| except Exception as ex: | ||
| future.set_exception(ex) | ||
| return | ||
| if response is not None: | ||
| future.set_result(response) | ||
|
|
||
| unsub = await mqtt_channel.subscribe(on_message) | ||
| try: | ||
| await mqtt_channel.publish(roborock_message) | ||
| return await asyncio.wait_for(future, timeout=_TIMEOUT) | ||
| finally: | ||
| unsub() | ||
|
|
||
|
|
||
| async def send_decoded_command( | ||
| mqtt_channel: MqttChannel, | ||
| request_message: Q7RequestMessage, | ||
| ) -> dict[str, Any] | None: | ||
| ) -> Any: | ||
| """Send a command on the MQTT channel and get a decoded response.""" | ||
| _LOGGER.debug("Sending B01 MQTT command: %s", request_message) | ||
| roborock_message = encode_mqtt_payload(request_message) | ||
| future: asyncio.Future[Any] = asyncio.get_running_loop().create_future() | ||
|
|
||
| def find_response(response_message: RoborockMessage) -> None: | ||
| def find_response(response_message: RoborockMessage) -> Any | None: | ||
| """Handle incoming messages and resolve the future.""" | ||
| try: | ||
| decoded_dps = decode_rpc_response(response_message) | ||
|
|
@@ -41,7 +77,7 @@ def find_response(response_message: RoborockMessage) -> None: | |
| response_message, | ||
| ex, | ||
| ) | ||
| return | ||
| return None | ||
| for dps_value in decoded_dps.values(): | ||
| # valid responses are JSON strings wrapped in the dps value | ||
| if not isinstance(dps_value, str): | ||
|
|
@@ -55,31 +91,23 @@ def find_response(response_message: RoborockMessage) -> None: | |
| continue | ||
| if isinstance(inner, dict) and inner.get("msgId") == str(request_message.msg_id): | ||
| _LOGGER.debug("Received query response: %s", inner) | ||
| # Check for error code (0 = success, non-zero = error) | ||
| code = inner.get("code", 0) | ||
| if code != 0: | ||
| error_msg = f"B01 command failed with code {code} ({request_message})" | ||
| _LOGGER.debug("B01 error response: %s", error_msg) | ||
| if not future.done(): | ||
| future.set_exception(RoborockException(error_msg)) | ||
| return | ||
| raise RoborockException(error_msg) | ||
| data = inner.get("data") | ||
| # All get commands should be dicts | ||
| if request_message.command.endswith(".get") and not isinstance(data, dict): | ||
| if not future.done(): | ||
| future.set_exception( | ||
| RoborockException(f"Unexpected data type for response {data} ({request_message})") | ||
| ) | ||
| return | ||
| if not future.done(): | ||
| future.set_result(data) | ||
|
|
||
| unsub = await mqtt_channel.subscribe(find_response) | ||
|
|
||
| _LOGGER.debug("Sending MQTT message: %s", roborock_message) | ||
| if request_message.command == "prop.get" and not isinstance(data, dict): | ||
| raise RoborockException(f"Unexpected data type for response {data} ({request_message})") | ||
| return data | ||
| return None | ||
|
|
||
| try: | ||
| await mqtt_channel.publish(roborock_message) | ||
| return await asyncio.wait_for(future, timeout=_TIMEOUT) | ||
| return await _send_command( | ||
| mqtt_channel, | ||
| request_message, | ||
| response_matcher=find_response, | ||
| ) | ||
| except TimeoutError as ex: | ||
| raise RoborockException(f"B01 command timed out after {_TIMEOUT}s ({request_message})") from ex | ||
| except RoborockException as ex: | ||
|
|
@@ -89,13 +117,27 @@ def find_response(response_message: RoborockMessage) -> None: | |
| ex, | ||
| ) | ||
| raise | ||
|
|
||
| except Exception as ex: | ||
| _LOGGER.exception( | ||
| "Error sending B01 decoded command (%ss): %s", | ||
| request_message, | ||
| ex, | ||
| ) | ||
| raise | ||
| finally: | ||
| unsub() | ||
|
|
||
|
|
||
| async def send_map_command(mqtt_channel: MqttChannel, request_message: Q7RequestMessage) -> bytes: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't this use the send_decoded_command() in b01_q7_channel? |
||
| """Send map upload command and wait for MAP_RESPONSE payload bytes. | ||
|
|
||
| This stays separate from ``send_decoded_command()`` because map uploads arrive as | ||
| raw ``MAP_RESPONSE`` payload bytes instead of a decoded RPC ``data`` payload. | ||
| """ | ||
|
|
||
| try: | ||
| return await _send_command( | ||
| mqtt_channel, | ||
| request_message, | ||
| response_matcher=lambda response_message: _matches_map_response(response_message, version=B01_VERSION), | ||
| ) | ||
| except TimeoutError as ex: | ||
| raise RoborockException(f"B01 map command timed out after {_TIMEOUT}s ({request_message})") from ex | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| from typing import Any | ||
|
|
||
| from roborock import B01Props | ||
| from roborock.data import Q7MapList, Q7MapListEntry | ||
| from roborock.data.b01_q7.b01_q7_code_mappings import ( | ||
| CleanPathPreferenceMapping, | ||
| CleanRepeatMapping, | ||
|
|
@@ -16,15 +17,19 @@ | |
| from roborock.devices.rpc.b01_q7_channel import send_decoded_command | ||
| from roborock.devices.traits import Trait | ||
| from roborock.devices.transport.mqtt_channel import MqttChannel | ||
| from roborock.protocols.b01_q7_protocol import CommandType, ParamsType, Q7RequestMessage | ||
| from roborock.protocols.b01_q7_protocol import B01_Q7_DPS, CommandType, ParamsType, Q7RequestMessage | ||
| from roborock.roborock_message import RoborockB01Props | ||
| from roborock.roborock_typing import RoborockB01Q7Methods | ||
|
|
||
| from .clean_summary import CleanSummaryTrait | ||
| from .map import MapTrait | ||
|
|
||
| __all__ = [ | ||
| "Q7PropertiesApi", | ||
| "CleanSummaryTrait", | ||
| "MapTrait", | ||
| "Q7MapList", | ||
| "Q7MapListEntry", | ||
| ] | ||
|
|
||
|
|
||
|
|
@@ -34,10 +39,14 @@ class Q7PropertiesApi(Trait): | |
| clean_summary: CleanSummaryTrait | ||
| """Trait for clean records / clean summary (Q7 `service.get_record_list`).""" | ||
|
|
||
| map: MapTrait | ||
| """Trait for map list metadata + raw map payload retrieval.""" | ||
|
|
||
| def __init__(self, channel: MqttChannel) -> None: | ||
| """Initialize the B01Props API.""" | ||
| self._channel = channel | ||
| self.clean_summary = CleanSummaryTrait(channel) | ||
| self.map = MapTrait(channel) | ||
|
|
||
| async def query_values(self, props: list[RoborockB01Props]) -> B01Props | None: | ||
| """Query the device for the values of the given Q7 properties.""" | ||
|
|
@@ -87,6 +96,17 @@ async def start_clean(self) -> None: | |
| }, | ||
| ) | ||
|
|
||
| async def clean_segments(self, segment_ids: list[int]) -> None: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can live on the map trait. that way the interaction is .map.clean_segments(), but i'm not sold. Thoughts @allenporter ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like a "vacuum" command since its asking it to clean? But i get the point that segments are part of a map. |
||
| """Start segment cleaning for the given ids (Q7 uses room ids).""" | ||
| await self.send( | ||
| command=RoborockB01Q7Methods.SET_ROOM_CLEAN, | ||
| params={ | ||
| "clean_type": CleanTaskTypeMapping.ROOM.code, | ||
| "ctrl_value": SCDeviceCleanParam.START.code, | ||
| "room_ids": segment_ids, | ||
| }, | ||
| ) | ||
|
|
||
| async def pause_clean(self) -> None: | ||
| """Pause cleaning.""" | ||
| await self.send( | ||
|
|
@@ -127,7 +147,7 @@ async def send(self, command: CommandType, params: ParamsType) -> Any: | |
| """Send a command to the device.""" | ||
| return await send_decoded_command( | ||
| self._channel, | ||
| Q7RequestMessage(dps=10000, command=command, params=params), | ||
| Q7RequestMessage(dps=B01_Q7_DPS, command=command, params=params), | ||
| ) | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| """Map trait for B01 Q7 devices.""" | ||
|
|
||
| import asyncio | ||
|
|
||
| from roborock.data import Q7MapList | ||
| from roborock.devices.rpc.b01_q7_channel import send_decoded_command, send_map_command | ||
| from roborock.devices.traits import Trait | ||
| from roborock.devices.transport.mqtt_channel import MqttChannel | ||
| from roborock.exceptions import RoborockException | ||
| from roborock.protocols.b01_q7_protocol import B01_Q7_DPS, Q7RequestMessage | ||
| from roborock.roborock_typing import RoborockB01Q7Methods | ||
|
|
||
|
|
||
| class MapTrait(Q7MapList, Trait): | ||
| """Map retrieval + map metadata helpers for Q7 devices.""" | ||
|
|
||
| def __init__(self, channel: MqttChannel) -> None: | ||
| super().__init__() | ||
| self._channel = channel | ||
| # Map uploads are serialized per-device to avoid response cross-wiring. | ||
| self._map_command_lock = asyncio.Lock() | ||
| self._loaded = False | ||
|
|
||
| async def refresh(self) -> None: | ||
| """Refresh cached map list metadata from the device.""" | ||
| response = await send_decoded_command( | ||
| self._channel, | ||
| Q7RequestMessage(dps=B01_Q7_DPS, command=RoborockB01Q7Methods.GET_MAP_LIST, params={}), | ||
| ) | ||
| if not isinstance(response, dict): | ||
| raise RoborockException( | ||
| f"Unexpected response type for GET_MAP_LIST: {type(response).__name__}: {response!r}" | ||
| ) | ||
|
|
||
| if (parsed := Q7MapList.from_dict(response)) is None: | ||
| raise RoborockException(f"Failed to decode map list response: {response!r}") | ||
|
|
||
| self.map_list = parsed.map_list | ||
| self._loaded = True | ||
|
|
||
| async def _get_map_payload(self, *, map_id: int) -> bytes: | ||
| """Fetch raw map payload bytes for the given map id.""" | ||
| request = Q7RequestMessage( | ||
| dps=B01_Q7_DPS, | ||
| command=RoborockB01Q7Methods.UPLOAD_BY_MAPID, | ||
| params={"map_id": map_id}, | ||
| ) | ||
| async with self._map_command_lock: | ||
| return await send_map_command(self._channel, request) | ||
|
|
||
| async def get_current_map_payload(self) -> bytes: | ||
| """Fetch raw map payload bytes for the currently selected map.""" | ||
| if not self._loaded: | ||
| await self.refresh() | ||
|
|
||
| map_id = self.current_map_id | ||
| if map_id is None: | ||
| raise RoborockException(f"Unable to determine map_id from map list response: {self!r}") | ||
| return await self._get_map_payload(map_id=map_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get more specific here? I believe we know find_response returns a dict so this can too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got AI to address it #786