diff --git a/httpx_auth/__init__.py b/httpx_auth/__init__.py index b948ba1..5bc1231 100644 --- a/httpx_auth/__init__.py +++ b/httpx_auth/__init__.py @@ -4,8 +4,17 @@ QueryApiKey, SupportMultiAuth, ) -from httpx_auth._oauth2.browser import DisplaySettings -from httpx_auth._oauth2.common import OAuth2 +from httpx_auth._aws import AWS4Auth +from httpx_auth._errors import ( + AuthenticationFailed, + GrantNotProvided, + HttpxAuthException, + InvalidGrantRequest, + InvalidToken, + StateNotProvided, + TimeoutOccurred, + TokenExpiryNotProvided, +) from httpx_auth._oauth2.authorization_code import ( OAuth2AuthorizationCode, OktaAuthorizationCode, @@ -13,35 +22,28 @@ ) from httpx_auth._oauth2.authorization_code_pkce import ( OAuth2AuthorizationCodePKCE, - OktaAuthorizationCodePKCE, + OktaAuthorizationCodePKCE ) +from httpx_auth._oauth2.browser import DisplaySettings from httpx_auth._oauth2.client_credentials import ( OAuth2ClientCredentials, - OktaClientCredentials, + OktaClientCredentials ) +from httpx_auth._oauth2.common import OAuth2 +from httpx_auth._oauth2.device_code import OAuth2DeviceCode +from httpx_auth._oauth2.device_code_pkce import OAuth2DeviceCodePKCE from httpx_auth._oauth2.implicit import ( + AzureActiveDirectoryImplicit, + AzureActiveDirectoryImplicitIdToken, OAuth2Implicit, OktaImplicit, OktaImplicitIdToken, - AzureActiveDirectoryImplicit, - AzureActiveDirectoryImplicitIdToken, ) from httpx_auth._oauth2.resource_owner_password import ( OAuth2ResourceOwnerPasswordCredentials, OktaResourceOwnerPasswordCredentials, ) from httpx_auth._oauth2.tokens import JsonTokenFileCache, TokenMemoryCache -from httpx_auth._aws import AWS4Auth -from httpx_auth._errors import ( - GrantNotProvided, - TimeoutOccurred, - AuthenticationFailed, - StateNotProvided, - InvalidToken, - TokenExpiryNotProvided, - InvalidGrantRequest, - HttpxAuthException, -) from httpx_auth.version import __version__ __all__ = [ @@ -58,6 +60,8 @@ "AzureActiveDirectoryImplicit", "AzureActiveDirectoryImplicitIdToken", "OAuth2AuthorizationCode", + "OAuth2DeviceCode", + "OAuth2DeviceCodePKCE", "OktaAuthorizationCode", "OAuth2ClientCredentials", "OktaClientCredentials", diff --git a/httpx_auth/_oauth2/common.py b/httpx_auth/_oauth2/common.py index 7670c1f..4d4bd33 100644 --- a/httpx_auth/_oauth2/common.py +++ b/httpx_auth/_oauth2/common.py @@ -1,6 +1,6 @@ import abc from typing import Callable, Generator, Optional, Union -from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode +from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit import httpx @@ -84,6 +84,28 @@ def request_new_grant_with_post( return token, content.get("expires_in"), content.get("refresh_token") +def request_device_code_with_post( + url: str, data, user_code_name: str, device_code_name: str, uri_name: str, client: httpx.Client +) -> (str, str, str, str, int, int): + print(data) + response = client.post(url, data) + + if response.is_error: + # As described in https://tools.ietf.org/html/rfc6749#section-5.2 + raise InvalidGrantRequest(response) + + content = _content_from_response(response) + + return ( + content.get(user_code_name), + content.get(device_code_name), + content.get(uri_name), + content.get("verification_uri_complete"), + content.get("expires_in"), + content.get("interval", 5), + ) + + class OAuth2: token_cache = TokenMemoryCache() display = DisplaySettings() diff --git a/httpx_auth/_oauth2/device_code.py b/httpx_auth/_oauth2/device_code.py new file mode 100644 index 0000000..19a7a25 --- /dev/null +++ b/httpx_auth/_oauth2/device_code.py @@ -0,0 +1,258 @@ +import time +from hashlib import sha512 + +import httpx + +from httpx_auth._authentication import SupportMultiAuth +from httpx_auth._oauth2.browser import BrowserAuth +from httpx_auth._oauth2.common import ( + OAuth2BaseAuth, + _add_parameters, + _get_query_parameter, + _pop_parameter, + request_device_code_with_post, + request_new_grant_with_post, +) + + +class DeviceCodeDetails: + def __init__( + self, + url: str, + user_code_field: str, + device_code_field: str, + uri_field: int, + ): + + self.url = url + self.user_code_field = user_code_field + self.device_code_field = device_code_field + self.uri_field = uri_field + + +class OAuth2DeviceCode(OAuth2BaseAuth, SupportMultiAuth, BrowserAuth): + """ + Device Code Flow + + Describes an OAuth 2 device code flow requests authentication. + + Request device & user codes & verication uri, user then uses verfication uri and user code to verify, + then request a token using this code. Store the token and use it for subsequent valid requests. + + More details can be found in https://tools.ietf.org/html/rfc6749#section-4.1 + """ + + def __init__(self, device_url: str, token_url: str, **kwargs): + """ + :param device_url: OAuth 2 device URL. + :param token_url: OAuth 2 token URL. + :param redirect_uri_domain: FQDN to use in the redirect_uri when localhost (default) is not allowed. + :param redirect_uri_endpoint: Custom endpoint that will be used as redirect_uri the following way: + http://localhost:/. Default value is to redirect on / (root). + :param redirect_uri_port: The port on which the server listening for the OAuth 2 code will be started. + Listen on port 5000 by default. + :param timeout: Maximum amount of seconds to wait for a code or a token to be received once requested. + Wait for 1 minute by default. + :param header_name: Name of the header field used to send token. + Token will be sent in Authorization header field by default. + :param header_value: Format used to send the token value. + "{token}" must be present as it will be replaced by the actual token. + Token will be sent as "Bearer {token}" by default. + :param response_type: Value of the response_type query parameter if not already provided in authorization URL. + code by default. + :param token_field_name: Field name containing the token. access_token by default. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param code_field_name: Field name containing the code. code by default. + :param username: Username in case basic authentication should be used to retrieve token. + :param password: User password in case basic authentication should be used to retrieve token. + :param client: httpx.Client instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter + in the authorization URL and as body parameters in the token URL. + Usual parameters are: + * client_id: Corresponding to your Application ID (in Microsoft Azure app portal) + * client_secret: If client is not authenticated with the authorization server + * nonce: Refer to http://openid.net/specs/openid-connect-core-1_0.html#IDToken for more details + """ + self.device_url = device_url + if not self.device_url: + raise Exception("Device URL is mandatory.") + + self.token_url = token_url + if not self.token_url: + raise Exception("Token URL is mandatory.") + + BrowserAuth.__init__(self, kwargs) + + header_name = kwargs.pop("header_name", None) or "Authorization" + header_value = kwargs.pop("header_value", None) or "Bearer {token}" + + self.token_field_name = kwargs.pop("token_field_name", None) or "access_token" + early_expiry = float(kwargs.pop("early_expiry", None) or 30.0) + + username = kwargs.pop("username", None) + password = kwargs.pop("password", None) + self.auth = (username, password) if username and password else None + self.client = kwargs.pop("client", None) + + user_code_field_name = kwargs.pop("user_code_field_name", "user_code") + device_code_field_name = kwargs.pop("device_code_field_name", "device_code") + uri_field_name = kwargs.pop("uri_field_name", "verification_uri") + + if _get_query_parameter(self.device_url, "response_type"): + # Ensure provided value will not be overridden + kwargs.pop("response_type", None) + else: + kwargs.setdefault("response_type", "device_code") + + device_url_without_nonce = _add_parameters(self.device_url, kwargs) + device_url_without_nonce, nonce = _pop_parameter(device_url_without_nonce, "nonce") + state = sha512(device_url_without_nonce.encode("unicode_escape")).hexdigest() + custom_code_parameters = {"state": state} + + if nonce: + custom_code_parameters["nonce"] = nonce + + device_code_url = _add_parameters(device_url_without_nonce, custom_code_parameters) + self.device_code_details = DeviceCodeDetails( + device_code_url, + user_code_field_name, + device_code_field_name, + uri_field_name, + ) + + self.device_data = kwargs + + self.token_data = { + "grant_type": "device_code", + "redirect_uri": self.redirect_uri, + } + self.token_data.update(kwargs) + + self.refresh_data = {"grant_type": "refresh_token"} + self.refresh_data.update(kwargs) + + OAuth2BaseAuth.__init__( + self, + state, + early_expiry, + header_name, + header_value, + self.refresh_token, + ) + + def request_new_token(self) -> tuple: + client = self.client or httpx.Client() + self._configure_client(client) + + user_code, device_code, verification_uri, verification_uri_complete, expires_in, interval = ( + request_device_code_with_post( + self.device_url, + self.device_data, + self.device_code_details.user_code_field, + self.device_code_details.device_code_field, + self.device_code_details.uri_field, + client, + ) + ) + + if verification_uri_complete: + print(f"To sign in, go to: {verification_uri_complete}") + else: + print(f"To sign in, go to: {verification_uri}\nEnter the following code: {user_code}") + + input("After signing in Press Enter to continue...") + + print(f"waiting for recomended interval {interval}s") + time.sleep(interval) + self.token_data["device_code"] = device_code + + client = self.client or httpx.Client() + self._configure_client(client) + try: + token, expires_in, refresh_token = request_new_grant_with_post( + self.token_url, self.token_data, self.token_field_name, client + ) + finally: + # Close client only if it was created by this module + if self.client is None: + client.close() + # Handle both Access and Bearer tokens + return (self.state, token, expires_in, refresh_token) if expires_in else (self.state, token) + + def refresh_token(self, refresh_token: str) -> tuple: + client = self.client or httpx.Client() + self._configure_client(client) + try: + # As described in https://tools.ietf.org/html/rfc6749#section-6 + self.refresh_data["refresh_token"] = refresh_token + token, expires_in, refresh_token = request_new_grant_with_post( + self.token_url, + self.refresh_data, + self.token_field_name, + client, + ) + finally: + # Close client only if it was created by this module + if self.client is None: + client.close() + return self.state, token, expires_in, refresh_token + + def _configure_client(self, client: httpx.Client): + client.auth = self.auth + client.timeout = self.timeout + + +class OktaDeviceCode(OAuth2DeviceCode): + """ + Describes an Okta (OAuth 2) "Access Token" device code flow requests authentication. + """ + + def __init__(self, instance: str, client_id: str, **kwargs): + """ + :param instance: Okta instance (like "testserver.okta-emea.com") + :param client_id: Okta Application Identifier (formatted as a Universal Unique Identifier) + :param response_type: Value of the response_type query parameter. + token by default. + :param token_field_name: Name of the expected field containing the token. + access_token by default. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param nonce: Refer to http://openid.net/specs/openid-connect-core-1_0.html#IDToken for more details + (formatted as a Universal Unique Identifier - UUID). Use a newly generated UUID by default. + :param authorization_server: Okta authorization server + default by default. + :param scope: Scope parameter sent in query. Can also be a list of scopes. + Request 'openid' by default. + :param redirect_uri_domain: FQDN to use in the redirect_uri when localhost (default) is not allowed. + :param redirect_uri_endpoint: Custom endpoint that will be used as redirect_uri the following way: + http://localhost:/. Default value is to redirect on / (root). + :param redirect_uri_port: The port on which the server listening for the OAuth 2 token will be started. + Listen on port 5000 by default. + :param timeout: Maximum amount of seconds to wait for a token to be received once requested. + Wait for 1 minute by default. + :param header_name: Name of the header field used to send token. + Token will be sent in Authorization header field by default. + :param header_value: Format used to send the token value. + "{token}" must be present as it will be replaced by the actual token. + Token will be sent as "Bearer {token}" by default. + :param client: httpx.Client instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter + in the authorization URL. + Usual parameters are: + * prompt: none to avoid prompting the user if a session is already opened. + """ + authorization_server = kwargs.pop("authorization_server", None) or "default" + scopes = kwargs.pop("scope", "openid") + kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes + OAuth2DeviceCode.__init__( + self, + f"https://{instance}/oauth2/{authorization_server}/v1/authorize", + f"https://{instance}/oauth2/{authorization_server}/v1/token", + client_id=client_id, + **kwargs, + ) diff --git a/httpx_auth/_oauth2/device_code_pkce.py b/httpx_auth/_oauth2/device_code_pkce.py new file mode 100644 index 0000000..82ca808 --- /dev/null +++ b/httpx_auth/_oauth2/device_code_pkce.py @@ -0,0 +1,300 @@ +import base64 +import os +import time +from hashlib import sha256, sha512 + +import httpx + +from httpx_auth._authentication import SupportMultiAuth +from httpx_auth._oauth2.browser import BrowserAuth +from httpx_auth._oauth2.common import ( + OAuth2BaseAuth, + _add_parameters, + _get_query_parameter, + _pop_parameter, + request_device_code_with_post, + request_new_grant_with_post, +) + + +class DeviceCodeDetails: + def __init__( + self, + url: str, + user_code_field: str, + device_code_field: str, + uri_field: int, + ): + + self.url = url + self.user_code_field = user_code_field + self.device_code_field = device_code_field + self.uri_field = uri_field + + +class OAuth2DeviceCodePKCE(OAuth2BaseAuth, SupportMultiAuth, BrowserAuth): + """ + Device Code Flow with PKCE + + Describes an OAuth 2 device code flow with PKCE requests authentication. + + Request device & user codes & verication uri, user then uses verfication uri and user code to verify, + then request a token using this code. Store the token and use it for subsequent valid requests. + + More details can be found in https://tools.ietf.org/html/rfc6749#section-4.1 + """ + + def __init__(self, device_url: str, token_url: str, **kwargs): + """ + :param device_url: OAuth 2 device URL. + :param token_url: OAuth 2 token URL. + :param timeout: Maximum amount of seconds to wait for a code or a token to be received once requested. + Wait for 1 minute by default. + :param header_name: Name of the header field used to send token. + Token will be sent in Authorization header field by default. + :param header_value: Format used to send the token value. + "{token}" must be present as it will be replaced by the actual token. + Token will be sent as "Bearer {token}" by default. + :param response_type: Value of the response_type query parameter if not already provided in authorization URL. + code by default. + :param token_field_name: Field name containing the token. access_token by default. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param code_field_name: Field name containing the code. code by default. + :param username: Username in case basic authentication should be used to retrieve token. + :param password: User password in case basic authentication should be used to retrieve token. + :param client: httpx.Client instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter + in the authorization URL and as body parameters in the token URL. + Usual parameters are: + * client_id: Corresponding to your Application ID (in Microsoft Azure app portal) + * client_secret: If client is not authenticated with the authorization server + * nonce: Refer to http://openid.net/specs/openid-connect-core-1_0.html#IDToken for more details + """ + self.device_url = device_url + if not self.device_url: + raise Exception("Device URL is mandatory.") + + self.token_url = token_url + if not self.token_url: + raise Exception("Token URL is mandatory.") + + BrowserAuth.__init__(self, kwargs) + + header_name = kwargs.pop("header_name", None) or "Authorization" + header_value = kwargs.pop("header_value", None) or "Bearer {token}" + + self.token_field_name = kwargs.pop("token_field_name", None) or "access_token" + early_expiry = float(kwargs.pop("early_expiry", None) or 30.0) + + username = kwargs.pop("username", None) + password = kwargs.pop("password", None) + self.auth = (username, password) if username and password else None + self.client = kwargs.pop("client", None) + + user_code_field_name = kwargs.pop("user_code_field_name", "user_code") + device_code_field_name = kwargs.pop("device_code_field_name", "device_code") + uri_field_name = kwargs.pop("uri_field_name", "verification_uri") + + if _get_query_parameter(self.device_url, "response_type"): + # Ensure provided value will not be overridden + kwargs.pop("response_type", None) + else: + kwargs.setdefault("response_type", "device_code") + + device_url_without_nonce = _add_parameters(self.device_url, kwargs) + device_url_without_nonce, nonce = _pop_parameter(device_url_without_nonce, "nonce") + state = sha512(device_url_without_nonce.encode("unicode_escape")).hexdigest() + custom_code_parameters = {"state": state} + + if nonce: + custom_code_parameters["nonce"] = nonce + + # generate PKCE code verifier and challenge + code_verifier = self.generate_code_verifier() + code_challenge = self.generate_code_challenge(code_verifier) + + # add code challenge parameters to the authorization_url request + custom_code_parameters["code_challenge"] = code_challenge + custom_code_parameters["code_challenge_method"] = "S256" + + device_code_url = _add_parameters(device_url_without_nonce, custom_code_parameters) + self.device_code_details = DeviceCodeDetails( + device_code_url, + user_code_field_name, + device_code_field_name, + uri_field_name, + ) + + self.device_data = kwargs + + self.token_data = { + "grant_type": "device_code", + "redirect_uri": self.redirect_uri, + } + self.token_data.update(kwargs) + + self.refresh_data = {"grant_type": "refresh_token"} + self.refresh_data.update(kwargs) + + OAuth2BaseAuth.__init__( + self, + state, + early_expiry, + header_name, + header_value, + self.refresh_token, + ) + + def request_new_token(self) -> tuple: + client = self.client or httpx.Client() + self._configure_client(client) + + user_code, device_code, verification_uri, verification_uri_complete, expires_in, interval = ( + request_device_code_with_post( + self.device_url, + self.device_data, + self.device_code_details.user_code_field, + self.device_code_details.device_code_field, + self.device_code_details.uri_field, + client, + ) + ) + + if verification_uri_complete: + print(f"To sign in, go to: {verification_uri_complete}") + else: + print(f"To sign in, go to: {verification_uri}\nEnter the following code: {user_code}") + + input("After signing in Press Enter to continue...") + + print(f"waiting for recomended interval {interval}s") + time.sleep(interval) + # As described in https://tools.ietf.org/html/rfc6749#section-4.1.3 + self.token_data["device_code"] = device_code + + client = self.client or httpx.Client() + self._configure_client(client) + try: + # As described in https://tools.ietf.org/html/rfc6749#section-4.1.4 + token, expires_in, refresh_token = request_new_grant_with_post( + self.token_url, self.token_data, self.token_field_name, client + ) + finally: + # Close client only if it was created by this module + if self.client is None: + client.close() + # Handle both Access and Bearer tokens + return (self.state, token, expires_in, refresh_token) if expires_in else (self.state, token) + + def refresh_token(self, refresh_token: str) -> tuple: + client = self.client or httpx.Client() + self._configure_client(client) + try: + # As described in https://tools.ietf.org/html/rfc6749#section-6 + self.refresh_data["refresh_token"] = refresh_token + token, expires_in, refresh_token = request_new_grant_with_post( + self.token_url, + self.refresh_data, + self.token_field_name, + client, + ) + finally: + # Close client only if it was created by this module + if self.client is None: + client.close() + return self.state, token, expires_in, refresh_token + + def _configure_client(self, client: httpx.Client): + client.auth = self.auth + client.timeout = self.timeout + + @staticmethod + def generate_code_verifier() -> bytes: + """ + Source: https://github.com/openstack/deb-python-oauth2client/blob/master/oauth2client/_pkce.py + + Generates a 'code_verifier' as described in section 4.1 of RFC 7636. + This is a 'high-entropy cryptographic random string' that will be + impractical for an attacker to guess. + + https://tools.ietf.org/html/rfc7636#section-4.1 + + :return: urlsafe base64-encoded random data. + """ + return base64.urlsafe_b64encode(os.urandom(64)).rstrip(b"=") + + @staticmethod + def generate_code_challenge(verifier: bytes) -> bytes: + """ + Source: https://github.com/openstack/deb-python-oauth2client/blob/master/oauth2client/_pkce.py + + Creates a 'code_challenge' as described in section 4.2 of RFC 7636 + by taking the sha256 hash of the verifier and then urlsafe + base64-encoding it. + + https://tools.ietf.org/html/rfc7636#section-4.1 + + :param verifier: code_verifier as generated by generate_code_verifier() + :return: urlsafe base64-encoded sha256 hash digest, without '=' padding. + """ + digest = sha256(verifier).digest() + return base64.urlsafe_b64encode(digest).rstrip(b"=") + + +class OktaDeviceCodePKCE(OAuth2DeviceCodePKCE): + """ + Describes an Okta (OAuth 2) "Access Token" Device Code with Proof Key for Code Exchange (PKCE) + flow requests authentication. + """ + + def __init__(self, instance: str, client_id: str, **kwargs): + """ + :param instance: Okta instance (like "testserver.okta-emea.com") + :param client_id: Okta Application Identifier (formatted as a Universal Unique Identifier) + :param response_type: Value of the response_type query parameter. + code by default. + :param token_field_name: Name of the expected field containing the token. + access_token by default. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param code_field_name: Field name containing the code. code by default. + :param nonce: Refer to http://openid.net/specs/openid-connect-core-1_0.html#IDToken for more details + (formatted as a Universal Unique Identifier - UUID). Use a newly generated UUID by default. + :param authorization_server: Okta authorization server + default by default. + :param scope: Scope parameter sent in query. Can also be a list of scopes. + Request 'openid' by default. + :param redirect_uri_domain: FQDN to use in the redirect_uri when localhost (default) is not allowed. + :param redirect_uri_endpoint: Custom endpoint that will be used as redirect_uri the following way: + http://localhost:/. Default value is to redirect on / (root). + :param redirect_uri_port: The port on which the server listening for the OAuth 2 token will be started. + Listen on port 5000 by default. + :param timeout: Maximum amount of seconds to wait for a token to be received once requested. + Wait for 1 minute by default. + :param header_name: Name of the header field used to send token. + Token will be sent in Authorization header field by default. + :param header_value: Format used to send the token value. + "{token}" must be present as it will be replaced by the actual token. + Token will be sent as "Bearer {token}" by default. + :param client: httpx.Client instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter + in the authorization URL and as body parameters in the token URL. + Usual parameters are: + * client_secret: If client is not authenticated with the authorization server + * nonce: Refer to http://openid.net/specs/openid-connect-core-1_0.html#IDToken for more details + """ + authorization_server = kwargs.pop("authorization_server", None) or "default" + scopes = kwargs.pop("scope", "openid") + kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes + OAuth2DeviceCodePKCE.__init__( + self, + f"https://{instance}/oauth2/{authorization_server}/v1/authorize", + f"https://{instance}/oauth2/{authorization_server}/v1/token", + client_id=client_id, + **kwargs, + )