From 1c05a01760cd9bfb0d2ef05a8c369ea35a5c8a28 Mon Sep 17 00:00:00 2001 From: Davin <61286753+deltamikeromeo@users.noreply.github.com> Date: Thu, 7 Aug 2025 22:30:54 -0400 Subject: [PATCH 1/2] Defines custom exception classes for various Control4 error scenarios (Unauthorized, NotFound, BadCredentials, InvalidCategory, C4CorruptXMLResponse). Implements checkResponseForError(response_text) to: Detect and raise exceptions for known error messages in plain text, JSON, or XML responses from the Control4 Director. Handles malformed XML responses and logs parsing errors. Raises specific exceptions based on error codes or messages found in the response. --- pyControl4/error_handling.py | 181 +++++++++++++++-------------------- 1 file changed, 78 insertions(+), 103 deletions(-) diff --git a/pyControl4/error_handling.py b/pyControl4/error_handling.py index 4a43255..594c51e 100644 --- a/pyControl4/error_handling.py +++ b/pyControl4/error_handling.py @@ -1,124 +1,99 @@ -"""Handles errors recieved from the Control4 API.""" - +"""Handles errors returned by the Control4 Director.""" import json -import xmltodict - +import logging +from xml.parsers.expat import ExpatError -class C4Exception(Exception): - """Base error for pyControl4.""" +import xmltodict - def __init__(self, message): - self.message = message +_LOGGER = logging.getLogger(__name__) -class NotFound(C4Exception): - """Raised when a 404 response is recieved from the Control4 API. - Occurs when the requested controller, etc. could not be found.""" +class C4Exception(Exception): + """Base class for pyControl4 exceptions.""" + pass class Unauthorized(C4Exception): - """Raised when unauthorized, but no other recognized details are provided. - Occurs when token is invalid.""" + """Raised when the bearer token is invalid or expired.""" + pass -class BadCredentials(Unauthorized): - """Raised when provided credentials are incorrect.""" +class NotFound(C4Exception): + """Raised when the Control4 Director returns a 404 Not Found error.""" + pass -class BadToken(Unauthorized): - """Raised when director bearer token is invalid.""" +class BadCredentials(C4Exception): + """Raised when the username or password for the Control4 account is invalid.""" + pass class InvalidCategory(C4Exception): - """Raised when an invalid category is provided when calling - `pyControl4.director.C4Director.getAllItemsByCategory`.""" - - -ERROR_CODES = {"401": Unauthorized, "404": NotFound} - -ERROR_DETAILS = { - "Permission denied Bad credentials": BadCredentials, -} - -DIRECTOR_ERRORS = {"Unauthorized": Unauthorized, "Invalid category": InvalidCategory} - -DIRECTOR_ERROR_DETAILS = {"Expired or invalid token": BadToken} - - -async def __checkResponseFormat(response_text: str): - """Known Control4 authentication API error message formats: - ```json - { - "C4ErrorResponse": { - "code": 401, - "details": "Permission denied Bad credentials", - "message": "Permission denied", - "subCode": 0 - } - } - ``` - ```json - { - "code": 404, - "details": "Account with id:000000 not found in DB", - "message": "Account not found", - "subCode": 0 - }``` - ```xml - - - 401 -
- Permission denied - 0 -
- ``` - Known Control4 director error message formats: - ```json - { - "error": "Unauthorized", - "details": "Expired or invalid token" - } - ``` - """ - if response_text.startswith("<"): - return "XML" - return "JSON" + """Raised when a category does not exist on the Control4 system.""" + pass -async def checkResponseForError(response_text: str): - """Checks a string response from the Control4 API for error codes. +class C4CorruptXMLResponse(C4Exception): + """Raised when the Control4 Director sends a malformed XML response.""" + pass - Parameters: - `response_text` - JSON or XML response from Control4, as a string. + +async def checkResponseForError(response_text): + """ + Checks a response from the Control4 Director for an error message. + Returns if no error is found. + Raises Unauthorized or NotFound if an error is found. + Raises C4CorruptXMLResponse if the XML is malformed. """ - if await __checkResponseFormat(response_text) == "JSON": + # Check for known plain-text error messages first. + if "Cannot GET" in response_text: + raise NotFound(f"Endpoint not found on Director: {response_text}") + + # First, try to parse the response as JSON, as some controllers return this. + try: dictionary = json.loads(response_text) - elif await __checkResponseFormat(response_text) == "XML": + if "status_code" in dictionary: + if dictionary["status_code"] == 404: + raise NotFound("404 Not Found from Control4 Director.") + if "error" in dictionary: + if "Invalid category" in dictionary["error"]: + raise InvalidCategory(dictionary["error"]) + # If it's valid JSON but not an error, we can return. + return + except json.JSONDecodeError: + # Not a JSON response, so we'll try to parse it as XML. + pass + + # If JSON parsing fails, try to parse the response as XML. + try: dictionary = xmltodict.parse(response_text) - if "C4ErrorResponse" in dictionary: - if ( - "details" in dictionary["C4ErrorResponse"] - and dictionary["C4ErrorResponse"]["details"] in ERROR_DETAILS - ): - exception = ERROR_DETAILS.get(dictionary["C4ErrorResponse"]["details"]) - raise exception(response_text) - else: - exception = ERROR_CODES.get( - str(dictionary["C4ErrorResponse"]["code"]), C4Exception - ) - raise exception(response_text) - elif "code" in dictionary: - if "details" in dictionary and dictionary["details"] in ERROR_DETAILS: - exception = ERROR_DETAILS.get(dictionary["details"]) - raise exception(response_text) - else: - exception = ERROR_CODES.get(str(dictionary["code"]), C4Exception) - raise exception(response_text) - elif "error" in dictionary: - if "details" in dictionary and dictionary["details"] in DIRECTOR_ERROR_DETAILS: - exception = DIRECTOR_ERROR_DETAILS.get(dictionary["details"]) - raise exception(response_text) - else: - exception = DIRECTOR_ERRORS.get(str(dictionary["error"]), C4Exception) - raise exception(response_text) + except ExpatError as e: + _LOGGER.error( + ( + "Failed to parse XML response from Director due to a mismatched tag or other corruption. " + "The raw text received from the controller was: \n%s" + ), + response_text, + ) + # Re-raise the original error so the integration still fails as expected + raise e + except Exception: + # Not a valid XML response, so it can't be a C4 error message. + return + + # Check for C4 errors in the parsed XML + if "c4soap" in dictionary: + if "error" in dictionary["c4soap"]: + error_code = int(dictionary["c4soap"]["error"]) + # 401 is Unauthorized + if error_code == 401: + raise Unauthorized( + "Invalid or expired bearer token. Re-authentication is required." + ) + # Other error codes can be added here if necessary + else: + # Generic error for other codes + raise Exception( + f"Control4 Director returned an unknown error: {dictionary['c4soap']['error_string']}" + ) + return From ef0c8456e4d165cbc700f74be00966e95a5b6452 Mon Sep 17 00:00:00 2001 From: Davin <61286753+deltamikeromeo@users.noreply.github.com> Date: Thu, 7 Aug 2025 22:40:05 -0400 Subject: [PATCH 2/2] trailing blank lines removed --- pyControl4/error_handling.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pyControl4/error_handling.py b/pyControl4/error_handling.py index 594c51e..922c405 100644 --- a/pyControl4/error_handling.py +++ b/pyControl4/error_handling.py @@ -12,32 +12,26 @@ class C4Exception(Exception): """Base class for pyControl4 exceptions.""" pass - class Unauthorized(C4Exception): """Raised when the bearer token is invalid or expired.""" pass - class NotFound(C4Exception): """Raised when the Control4 Director returns a 404 Not Found error.""" pass - class BadCredentials(C4Exception): """Raised when the username or password for the Control4 account is invalid.""" pass - class InvalidCategory(C4Exception): """Raised when a category does not exist on the Control4 system.""" pass - class C4CorruptXMLResponse(C4Exception): """Raised when the Control4 Director sends a malformed XML response.""" pass - async def checkResponseForError(response_text): """ Checks a response from the Control4 Director for an error message.