diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py index da1670a..c287136 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py @@ -549,7 +549,7 @@ def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) """ pass - def get_all_deployed_apps_gsma(self, app_id: str, app_provider: str) -> List: + def get_all_deployed_apps_gsma(self) -> Response: """ Retrieves all instances for a given application of partner OP diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py index 83422ea..b066291 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py @@ -15,6 +15,7 @@ from requests import Response from sunrise6g_opensdk import logger +from sunrise6g_opensdk.edgecloud.core import gsma_schemas from sunrise6g_opensdk.edgecloud.core import schemas as camara_schemas from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( EdgeCloudManagementInterface, @@ -26,9 +27,11 @@ I2EdgeError, i2edge_delete, i2edge_get, + i2edge_patch, i2edge_post, i2edge_post_multiform_data, ) +from .gsma_utils import map_zone log = logger.get_logger(__name__) @@ -733,31 +736,27 @@ def get_edge_cloud_zones_list_gsma(self) -> Response: :return: Response with zone details in GSMA format. """ - url = "{}/zones/list".format(self.base_url) + url = f"{self.base_url}/zones/list" params = {} try: - response = i2edge_get(url, params=params) - if response.status_code == 200: - response_json = response.json() - response_list = [] - for item in response_json: - content = { - "zoneId": item.get("zoneId"), - "geolocation": item.get("geolocation"), - "geographyDetails": item.get("geographyDetails"), - } - response_list.append(content) - return build_custom_http_response( - status_code=200, - content=response_list, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response + response = i2edge_get(url, params=params, expected_status=200) + response_json = response.json() + try: + validated_data = gsma_schemas.ZonesList.model_validate(response_json) + except ValidationError as e: + raise ValueError(f"Invalid schema: {e}") + + return build_custom_http_response( + status_code=200, + content=[zone.model_dump_json() for zone in validated_data.root], + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) except I2EdgeError as e: - raise e + log.error(f"Failed to obtain Zones list from i2edge: {e}") + raise def get_edge_cloud_zones_gsma(self) -> Response: """ @@ -765,34 +764,27 @@ def get_edge_cloud_zones_gsma(self) -> Response: :return: Response with zones and detailed resource information. """ - url = "{}/zones".format(self.base_url) + url = f"{self.base_url}/zones" params = {} try: - response = i2edge_get(url, params=params) - if response.status_code == 200: - response_json = response.json() - response_list = [] - for item in response_json: - content = { - "zoneId": item.get("zoneId"), - "reservedComputeResources": item.get("reservedComputeResources"), - "computeResourceQuotaLimits": item.get("computeResourceQuotaLimits"), - "flavoursSupported": item.get("flavoursSupported"), - "networkResources": item.get("networkResources"), - "zoneServiceLevelObjsInfo": item.get("zoneServiceLevelObjsInfo"), - } - response_list.append(content) - return build_custom_http_response( - status_code=200, - content=response_list, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response + response = i2edge_get(url, params=params, expected_status=200) + response_json = response.json() + mapped = [map_zone(zone) for zone in response_json] + try: + validated_data = gsma_schemas.ZoneRegisteredDataList.model_validate(mapped) + except ValidationError as e: + raise ValueError(f"Invalid schema {e}") + return build_custom_http_response( + status_code=200, + content=validated_data.model_dump_json(), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) except I2EdgeError as e: - raise e + log.error(f"Failed to obtain Zones details from i2edge: {e}") + raise def get_edge_cloud_zone_details_gsma(self, zone_id: str) -> Response: """ @@ -802,31 +794,27 @@ def get_edge_cloud_zone_details_gsma(self, zone_id: str) -> Response: :param zone_id: Unique identifier of the Edge Cloud Zone. :return: Response with Edge Cloud Zone details. """ - url = "{}/zone/{}".format(self.base_url, zone_id) + url = f"{self.base_url}/zone/{zone_id}" params = {} try: - response = i2edge_get(url, params=params) - if response.status_code == 200: - response_json = response.json() - content = { - "zoneId": response_json.get("zoneId"), - "reservedComputeResources": response_json.get("reservedComputeResources"), - "computeResourceQuotaLimits": response_json.get("computeResourceQuotaLimits"), - "flavoursSupported": response_json.get("flavoursSupported"), - "networkResources": response_json.get("networkResources"), - "zoneServiceLevelObjsInfo": response_json.get("zoneServiceLevelObjsInfo"), - } - return build_custom_http_response( - status_code=200, - content=content, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response + response = i2edge_get(url, params=params, expected_status=200) + response_json = response.json() + mapped = map_zone(response_json) + try: + validated_data = gsma_schemas.ZoneRegisteredData.model_validate(mapped) + except ValidationError as e: + raise ValueError(f"Invalid schema: {e}") + return build_custom_http_response( + status_code=200, + content=validated_data.model_dump_json(), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) except I2EdgeError as e: - raise e + log.error(f"Failed to obtain Zones details from i2edge: {e}") + raise # ------------------------------------------------------------------------ # Artefact Management (GSMA) @@ -849,8 +837,8 @@ def create_artefact_gsma(self, request_body: Dict) -> Response: transformed = { "artefact_id": artefact_id, "artefact_name": artefact_name, - "repo_name": repo_data.get("repoName", "unknown-repo"), - "repo_type": request_body.get("repoType", "PUBLICREPO"), + "repo_name": repo_data.get("repoName"), + "repo_type": request_body.get("repoType"), "repo_url": repo_data["repoURL"], "user_name": repo_data.get("userName"), "password": repo_data.get("password"), @@ -868,8 +856,9 @@ def create_artefact_gsma(self, request_body: Dict) -> Response: request=response.request, ) return response - except KeyError as e: - raise I2EdgeError(f"Missing required field in GSMA artefact payload: {e}") + except I2EdgeError as e: + log.error(f"Failed to create artefact: {e}") + raise def get_artefact_gsma(self, artefact_id: str) -> Response: """ @@ -882,36 +871,40 @@ def get_artefact_gsma(self, artefact_id: str) -> Response: response = self.get_artefact(artefact_id) if response.status_code == 200: response_json = response.json() - print(response_json) - content = { - "artefactId": response_json.get("artefact_id"), - "appProviderId": "Ihs0gCqO65SHTz", - "artefactName": response_json.get("name"), - "artefactDescription": "string", - "artefactVersionInfo": response_json.get("version"), - "artefactVirtType": "VM_TYPE", - "artefactFileName": "stringst", - "artefactFileFormat": "ZIP", - "artefactDescriptorType": "HELM", - "repoType": response_json.get("repo_type"), - "artefactRepoLocation": { - "repoURL": response_json.get("repo_url"), - "userName": response_json.get("repo_user_name"), - "password": response_json.get("repo_password"), - "token": response_json.get("repo_token"), - }, - } + content = gsma_schemas.Artefact( + artefactId=response_json.get("artefact_id"), + appProviderId=response_json.get("id"), + artefactName=response_json.get("name"), + artefactDescription="Description", + artefactVersionInfo=response_json.get("version"), + artefactVirtType="VM_TYPE", + artefactFileName="FileName", + artefactFileFormat="TAR", + artefactDescriptorType="HELM", + repoType=response_json.get("repo_type"), + artefactRepoLocation=gsma_schemas.ArtefactRepoLocation( + repoURL=response_json.get("repo_url"), + userName=response_json.get("repo_user_name"), + password=response_json.get("repo_password"), + token=response_json.get("repo_token"), + ), + ) + try: + validated_data = gsma_schemas.Artefact.model_validate(content) + except ValidationError as e: + raise ValueError(f"Invalid schema: {e}") return build_custom_http_response( status_code=200, - content=content, + content=validated_data.model_dump_json(), headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response - except KeyError as e: - raise I2EdgeError(f"Missing artefactId in GSMA payload: {e}") + except I2EdgeError as e: + log.error(f"Failed to retrieve artefact: {e}") + raise def delete_artefact_gsma(self, artefact_id: str) -> Response: """ @@ -955,19 +948,18 @@ def onboard_app_gsma(self, request_body: dict) -> Response: data = body payload = i2edge_schemas.ApplicationOnboardingRequest(profile_data=data) url = "{}/application/onboarding".format(self.base_url) - response = i2edge_post(url, payload) - if response.status_code == 201: - return build_custom_http_response( - status_code=200, - content={"response": "Application onboarded successfully"}, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response - except KeyError as e: - raise I2EdgeError(f"Missing required field in GSMA onboarding payload: {e}") + response = i2edge_post(url, payload, expected_status=201) + return build_custom_http_response( + status_code=200, + content={"response": "Application onboarded successfully"}, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) + except I2EdgeError as e: + log.error(f"Failed to onboard app: {e}") + raise def get_onboarded_app_gsma(self, app_id: str) -> Response: """ @@ -976,30 +968,58 @@ def get_onboarded_app_gsma(self, app_id: str) -> Response: :param app_id: Identifier of the application onboarded. :return: Response with application details. """ + url = f"{self.base_url}/application/onboarding/{app_id}" + params = {} try: - response = self.get_onboarded_app(app_id) - if response.status_code == 200: - response_json = response.json() - profile_data = response_json.get("profile_data") - content = { - "appId": profile_data.get("app_id"), - "appProviderId": "string", - "appDeploymentZones": profile_data.get("appDeploymentZones"), - "appMetaData": profile_data.get("appMetadata"), - "appQoSProfile": profile_data.get("appQoSProfile"), - "appComponentSpecs": profile_data.get("appComponentSpecs"), - } - return build_custom_http_response( - status_code=200, - content=content, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response - except KeyError as e: - raise I2EdgeError(f"Missing appId in GSMA payload: {e}") + response = i2edge_get(url, params, expected_status=200) + response_json = response.json() + profile_data = response_json.get("profile_data") + app_deployment_zones = profile_data.get("appDeploymentZones") + app_metadata = profile_data.get("appMetaData") + app_qos_profile = profile_data.get("appQoSProfile") + app_component_specs = profile_data.get("appComponentSpecs") + content = gsma_schemas.ApplicationModel( + appId=profile_data.get("app_id"), + appProviderId="from_FM", + appDeploymentZones=[ + gsma_schemas.AppDeploymentZone(countryCode="ES", zoneInfo=zone_id) + for zone_id in app_deployment_zones + ], + appMetaData=gsma_schemas.AppMetaData( + appName=app_metadata.get("appName"), + version=app_metadata.get("version"), + appDescription=app_metadata.get("appDescription"), + mobilitySupport=app_metadata.get("mobilitySupport"), + accessToken=app_metadata.get("accessToken"), + category=app_metadata.get("category"), + ), + appQoSProfile=gsma_schemas.AppQoSProfile( + latencyConstraints=app_qos_profile.get("latencyConstraints"), + bandwidthRequired=app_qos_profile.get("bandwidthRequired"), + multiUserClients=app_qos_profile.get("multiUserClients"), + noOfUsersPerAppInst=app_qos_profile.get("noOfUsersPerAppInst"), + appProvisioning=app_qos_profile.get("appProvisioning"), + ), + appComponentSpecs=[ + gsma_schemas.AppComponentSpec(**component) for component in app_component_specs + ], + onboardStatusInfo="ONBOARDED", + ) + try: + validated_data = gsma_schemas.ApplicationModel.model_validate(content) + except ValidationError as e: + raise ValueError(f"Invalid schema: {e}") + return build_custom_http_response( + status_code=200, + content=validated_data.model_dump_json(), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) + except I2EdgeError as e: + log.error(f"Failed to get onboarded app: {e}") + raise def patch_onboarded_app_gsma(self, app_id: str, request_body: dict) -> Response: """ @@ -1010,7 +1030,30 @@ def patch_onboarded_app_gsma(self, app_id: str, request_body: dict) -> Response: :param request_body: Payload with updated onboarding info. :return: Response with update confirmation. """ - pass + url = f"{self.base_url}/application/onboarding/{app_id}" + params = {} + response = i2edge_get(url, params, expected_status=200) + response_json = response.json() + app_component_specs = request_body.get("appComponents") + app_qos_profile = request_body.get("appUpdQoSProfile") + response_json["profile_data"]["appQoSProfile"] = app_qos_profile + response_json["profile_data"]["appComponentSpecs"] = app_component_specs + data = response_json.get("profile_data") + try: + payload = i2edge_schemas.ApplicationOnboardingRequest(profile_data=data) + url = "{}/application/onboarding/{}".format(self.base_url, app_id) + response = i2edge_patch(url, payload, expected_status=200) + return build_custom_http_response( + status_code=200, + content={"response": "Application update successful"}, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) + except I2EdgeError as e: + log.error(f"Failed to patch onboarded app: {e}") + raise def delete_onboarded_app_gsma(self, app_id: str) -> Response: """ @@ -1031,8 +1074,9 @@ def delete_onboarded_app_gsma(self, app_id: str) -> Response: request=response.request, ) return response - except KeyError as e: - raise I2EdgeError(f"Missing appId in GSMA payload: {e}") + except I2EdgeError as e: + log.error(f"Failed to delete onboarded app: {e}") + raise # ------------------------------------------------------------------------ # Application Deployment Management (GSMA) @@ -1057,24 +1101,28 @@ def deploy_app_gsma(self, request_body: dict) -> Response: ) payload = i2edge_schemas.AppDeploy(app_deploy_data=app_deploy_data) url = "{}/application_instance".format(self.base_url) - response = i2edge_post(url, payload, 202) - if response.status_code == 202: - response_json = response.json() - content = { - "zoneId": response_json.get("zoneID"), - "appInstIdentifier": response_json.get("app_instance_id"), - } - return build_custom_http_response( - status_code=202, - content=content, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response - except KeyError as e: - raise I2EdgeError(f"Missing required field in GSMA deployment payload: {e}") + response = i2edge_post(url, payload, expected_status=202) + + response_json = response.json() + content = gsma_schemas.AppInstance( + zoneId=response_json.get("zoneID"), + appInstIdentifier=response_json.get("app_instance_id"), + ) + try: + validated_data = gsma_schemas.AppInstance.model_validate(content) + except ValidationError as e: + raise ValueError(f"Invalid schema: {e}") + return build_custom_http_response( + status_code=202, + content=validated_data.model_dump_json(), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) + except I2EdgeError as e: + log.error(f"Failed to deploy app: {e}") + raise def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) -> Response: """ @@ -1088,26 +1136,31 @@ def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) try: url = "{}/application_instance/{}/{}".format(self.base_url, zone_id, app_instance_id) params = {} - response = i2edge_get(url, params=params) - if response.status_code == 200: - response_json = response.json() - content = { - "appInstanceState": response_json.get("appInstanceState"), - "accesspointInfo": response_json.get("accesspointInfo"), - } - return build_custom_http_response( - status_code=200, - content=content, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response - except KeyError as e: - raise I2EdgeError(f"Missing appId or zoneId in GSMA payload: {e}") + response = i2edge_get(url, params=params, expected_status=200) - def get_all_deployed_apps_gsma(self, app_id: str, app_provider: str) -> Response: + response_json = response.json() + content = gsma_schemas.AppInstanceStatus( + appInstanceState=response_json.get("appInstanceState"), + accesspointInfo=response_json.get("accesspointInfo"), + ) + try: + validated_data = gsma_schemas.AppInstanceStatus.model_validate(content) + except ValidationError as e: + raise ValueError(f"Invalid schema: {e}") + return build_custom_http_response( + status_code=200, + content=validated_data.model_dump_json(), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) + + except I2EdgeError as e: + log.error(f"Failed to retrieve deployed app: {e}") + raise + + def get_all_deployed_apps_gsma(self) -> Response: """ Retrieves all instances for a given application of partner OP using GSMA federation. @@ -1118,36 +1171,38 @@ def get_all_deployed_apps_gsma(self, app_id: str, app_provider: str) -> Response try: url = "{}/application_instances".format(self.base_url) params = {} - response = i2edge_get(url, params=params) - if response.status_code == 200: - response_json = response.json() - response_list = [] - for item in response_json: - content = [ + response = i2edge_get(url, params=params, expected_status=200) + response_json = response.json() + response_list = [] + for item in response_json: + content = { + "zoneId": item.get("app_spec") + .get("nodeSelector") + .get("feature.node.kubernetes.io/zoneID"), + "appInstanceInfo": [ { - "zoneId": item.get("app_spec") - .get("nodeSelector") - .get("feature.node.kubernetes.io/zoneID"), - "appInstanceInfo": [ - { - "appInstIdentifier": item.get("app_instance_id"), - "appInstanceState": item.get("deploy_status"), - } - ], + "appInstIdentifier": item.get("app_instance_id"), + "appInstanceState": item.get("deploy_status"), } - ] - response_list.append(content) - return build_custom_http_response( - status_code=200, - content=response_list, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response - except KeyError as e: - raise I2EdgeError(f"Error retrieving apps: {e}") + ], + } + + response_list.append(content) + try: + validated_data = gsma_schemas.ZoneIdentifierList.model_validate(response_list) + except ValidationError as e: + raise ValueError(f"Invalid schema: {e}") + return build_custom_http_response( + status_code=200, + content=validated_data.model_dump_json(), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) + except I2EdgeError as e: + log.error(f"Failed to retrieve apps: {e}") + raise def undeploy_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) -> Response: """ @@ -1160,16 +1215,15 @@ def undeploy_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) -> """ try: url = "{}/application_instance".format(self.base_url) - response = i2edge_delete(url, app_instance_id) - if response.status_code == 200: - return build_custom_http_response( - status_code=200, - content={"response": "Application instance termination request accepted"}, - headers={"Content-Type": self.content_type_gsma}, - encoding=self.encoding_gsma, - url=response.url, - request=response.request, - ) - return response - except KeyError as e: - raise I2EdgeError(f"Missing appInstanceId in GSMA payload: {e}") + response = i2edge_delete(url, app_instance_id, expected_status=200) + return build_custom_http_response( + status_code=200, + content={"response": "Application instance termination request accepted"}, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=response.url, + request=response.request, + ) + except I2EdgeError as e: + log.error(f"Failed to delete app: {e}") + raise diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/common.py b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/common.py index 0d193fd..c1d6505 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/common.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/common.py @@ -66,16 +66,23 @@ def i2edge_post(url: str, model_payload: BaseModel, expected_status: int = 201) raise I2EdgeError(err_msg) -def i2edge_put(url: str, model_payload: BaseModel) -> dict: +def i2edge_patch(url: str, model_payload: BaseModel, expected_status: int = 200) -> dict: headers = { "Content-Type": "application/json", "accept": "application/json", } - json_payload = json.dumps(model_payload.model_dump(mode="json")) + json_payload = json.dumps(model_payload.model_dump(exclude_unset=True, mode="json")) try: - response = requests.put(url, data=json_payload, headers=headers) - response.raise_for_status() - return response + response = requests.patch(url, data=json_payload, headers=headers) + if response.status_code == expected_status: + return response + else: + i2edge_err_msg = get_error_message_from(response) + err_msg = "Failed to patch: Expected status {}, got {}. Detail: {}".format( + expected_status, response.status_code, i2edge_err_msg + ) + log.error(err_msg) + raise I2EdgeError(err_msg) except requests.exceptions.HTTPError as e: i2edge_err_msg = get_error_message_from(response) err_msg = "Failed to patch: {}. Detail: {}".format(i2edge_err_msg, e) diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/gsma_utils.py b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/gsma_utils.py new file mode 100644 index 0000000..66ef7b8 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/gsma_utils.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +## +# This file is part of the Open SDK +# +# Contributors: +# - César Cajas (cesar.cajas@i2cat.net) +## + + +def map_hugepage(raw_hp: dict) -> dict: + # Map from {'number': int, 'pageSize': str} to {'count': int, 'size': str} + return { + "pageSize": raw_hp.get("pageSize", ""), + "number": raw_hp.get("number", ""), + } + + +def map_compute_resource(raw_cr: dict) -> dict: + # Map numCPU dict -> int, hugepages list, gpu list, vpu/fpga int to optional int or list + # Cast cpuExclusivity to bool + hugepages_raw = raw_cr.get("hugepages") or [] + hugepages = [map_hugepage(hp) for hp in hugepages_raw] + + # numCPU viene {'whole': {'value': int}} + num_cpu_raw = raw_cr.get("numCPU") + if isinstance(num_cpu_raw, dict): + num_cpu = num_cpu_raw.get("whole", {}).get("value", 0) + else: + num_cpu = num_cpu_raw if isinstance(num_cpu_raw, int) else 0 + + gpu = raw_cr.get("gpu") or None + vpu = raw_cr.get("vpu") + if isinstance(vpu, int) and vpu == 0: + vpu = None + + fpga = raw_cr.get("fpga") + if isinstance(fpga, int) and fpga == 0: + fpga = None + + # cpuExclusivity + cpu_exclusivity = raw_cr.get("cpuExclusivity") + if isinstance(cpu_exclusivity, int): + cpu_exclusivity = bool(cpu_exclusivity) + + # dict GSMA + return { + "cpuArchType": raw_cr.get("cpuArchType"), + "numCPU": num_cpu, + "memory": raw_cr.get("memory"), + "diskStorage": raw_cr.get("diskStorage"), + "gpu": gpu if gpu else None, + "vpu": vpu, + "fpga": fpga, + "hugepages": hugepages if hugepages else None, + "cpuExclusivity": cpu_exclusivity, + } + + +def map_ostype(raw_os: dict) -> dict: + # Simple passthrough + return { + "architecture": raw_os.get("architecture"), + "distribution": raw_os.get("distribution"), + "version": raw_os.get("version"), + "license": raw_os.get("license"), + } + + +def map_flavour(raw_flavour: dict) -> dict: + fpga = raw_flavour.get("fpga") + if isinstance(fpga, int): + fpga = None if fpga == 0 else [str(fpga)] + + vpu = raw_flavour.get("vpu") + if isinstance(vpu, int): + vpu = None if vpu == 0 else [str(vpu)] + + cpu_exclusivity = raw_flavour.get("cpuExclusivity") + if not isinstance(cpu_exclusivity, list): + cpu_exclusivity = None + + # Map supportedOSTypes + supported_os = raw_flavour.get("supportedOSTypes", []) + supported_ostypes = [map_ostype(os) for os in supported_os] + + return { + "flavourId": raw_flavour.get("flavourId"), + "cpuArchType": raw_flavour.get("cpuArchType"), + "supportedOSTypes": supported_ostypes, + "numCPU": raw_flavour.get("numCPU"), + "memorySize": raw_flavour.get("memorySize"), + "storageSize": raw_flavour.get("storageSize"), + "gpu": raw_flavour.get("gpu") or None, + "fpga": fpga, + "vpu": vpu, + "hugepages": raw_flavour.get("hugepages") or None, + "cpuExclusivity": cpu_exclusivity, + } + + +def map_network_resources(raw_net: dict) -> dict: + if not raw_net: + return None + return { + "egressBandWidth": raw_net.get("egressBandWidth", 0), + "dedicatedNIC": raw_net.get("dedicatedNIC", 0), + "supportSriov": bool(raw_net.get("supportSriov")), + "supportDPDK": bool(raw_net.get("supportDPDK")), + } + + +def map_zone_service_level(raw_sli: dict) -> dict: + if not raw_sli: + return None + return { + "latencyRanges": { + "minLatency": raw_sli.get("latencyRanges", {}).get("minLatency", 1), + "maxLatency": raw_sli.get("latencyRanges", {}).get("maxLatency", 1), + }, + "jitterRanges": { + "minJitter": raw_sli.get("jitterRanges", {}).get("minJitter", 1), + "maxJitter": raw_sli.get("jitterRanges", {}).get("maxJitter", 1), + }, + "throughputRanges": { + "minThroughput": raw_sli.get("throughputRanges", {}).get("minThroughput", 1), + "maxThroughput": raw_sli.get("throughputRanges", {}).get("maxThroughput", 1), + }, + } + + +def map_zone(raw_zone: dict) -> dict: + reserved_compute = raw_zone.get("reservedComputeResources") + if not reserved_compute or len(reserved_compute) == 0: + reserved_compute = [ + { + "cpuArchType": "ISA_X86_64", + "numCPU": 0, + "memory": 0, + "diskStorage": 0, + "gpu": None, + "vpu": None, + "fpga": None, + "hugepages": None, + "cpuExclusivity": False, + } + ] + + return { + "zoneId": raw_zone.get("zoneId"), + "reservedComputeResources": [map_compute_resource(cr) for cr in reserved_compute], + "computeResourceQuotaLimits": [ + map_compute_resource(cr) for cr in raw_zone.get("computeResourceQuotaLimits", []) + ], + "flavoursSupported": [map_flavour(fl) for fl in raw_zone.get("flavoursSupported", [])], + "networkResources": map_network_resources(raw_zone.get("networkResources")), + "zoneServiceLevelObjsInfo": map_zone_service_level( + raw_zone.get("zoneServiceLevelObjsInfo") + ), + } diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/schemas.py b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/schemas.py index 3e7c511..7573ea2 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/schemas.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/schemas.py @@ -122,6 +122,7 @@ class AppQoSProfile(BaseModel): appProvisioning: bool = Field(default=True) bandwidthRequired: int = Field(default=1) latencyConstraints: str = Field(default="NONE") + mobilitySupport: Optional[bool] = None multiUserClients: str = Field(default="APP_TYPE_SINGLE_USER") noOfUsersPerAppInst: int = Field(default=1) diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/client.py b/src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/client.py index 0a488cf..cad7030 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/client.py @@ -389,7 +389,7 @@ def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) """ pass - def get_all_deployed_apps_gsma(self, app_id: str, app_provider: str) -> List: + def get_all_deployed_apps_gsma(self) -> Response: """ Retrieves all instances for a given application of partner OP diff --git a/src/sunrise6g_opensdk/edgecloud/core/edgecloud_interface.py b/src/sunrise6g_opensdk/edgecloud/core/edgecloud_interface.py index 31a5ad9..302f7c2 100644 --- a/src/sunrise6g_opensdk/edgecloud/core/edgecloud_interface.py +++ b/src/sunrise6g_opensdk/edgecloud/core/edgecloud_interface.py @@ -278,7 +278,7 @@ def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) pass @abstractmethod - def get_all_deployed_apps_gsma(self, app_id: str, app_provider: str) -> Response: + def get_all_deployed_apps_gsma(self) -> Response: """ Retrieves all instances for a given application of partner OP diff --git a/src/sunrise6g_opensdk/edgecloud/core/gsma_schemas.py b/src/sunrise6g_opensdk/edgecloud/core/gsma_schemas.py new file mode 100644 index 0000000..4d78965 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/core/gsma_schemas.py @@ -0,0 +1,226 @@ +from typing import List, Literal, Optional + +from pydantic import BaseModel, Field, HttpUrl, RootModel + +# --------------------------- +# FederationManagement +# --------------------------- + + +class ZoneDetails(BaseModel): + zoneId: str + geolocation: Optional[str] = None + geographyDetails: str + + +class ZonesList(RootModel[List[ZoneDetails]]): + pass + + +# --------------------------- +# AvailabilityZoneInfoSynchronization +# --------------------------- + + +class HugePage(BaseModel): + pageSize: str + number: int + + +class GpuInfo(BaseModel): + gpuVendorType: Literal["GPU_PROVIDER_NVIDIA", "GPU_PROVIDER_AMD"] + gpuModeName: str + gpuMemory: int + numGPU: int + + +class ComputeResourceInfo(BaseModel): + cpuArchType: Literal["ISA_X86", "ISA_X86_64", "ISA_ARM_64"] + numCPU: int + memory: int + diskStorage: Optional[int] = None + gpu: Optional[List[GpuInfo]] = None + vpu: Optional[int] = None + fpga: Optional[int] = None + hugepages: Optional[List[HugePage]] = None + cpuExclusivity: Optional[bool] = None + + +class OSType(BaseModel): + architecture: Literal["x86_64", "x86"] + distribution: Literal["RHEL", "UBUNTU", "COREOS", "FEDORA", "WINDOWS", "OTHER"] + version: Literal[ + "OS_VERSION_UBUNTU_2204_LTS", + "OS_VERSION_RHEL_8", + "OS_VERSION_RHEL_7", + "OS_VERSION_DEBIAN_11", + "OS_VERSION_COREOS_STABLE", + "OS_MS_WINDOWS_2012_R2", + "OTHER", + ] + license: Literal["OS_LICENSE_TYPE_FREE", "OS_LICENSE_TYPE_ON_DEMAND", "NOT_SPECIFIED"] + + +class Flavour(BaseModel): + flavourId: str + cpuArchType: Literal["ISA_X86", "ISA_X86_64", "ISA_ARM_64"] + supportedOSTypes: List[OSType] = Field(..., min_items=1) + numCPU: int + memorySize: int + storageSize: int + gpu: Optional[List[GpuInfo]] = None + fpga: Optional[List[str]] = None + vpu: Optional[List[str]] = None + hugepages: Optional[List[HugePage]] = None + cpuExclusivity: Optional[List[str]] = None + + +class NetworkResources(BaseModel): + egressBandWidth: int + dedicatedNIC: int + supportSriov: bool + supportDPDK: bool + + +class LatencyRange(BaseModel): + minLatency: int = Field(..., ge=1) + maxLatency: int + + +class JitterRange(BaseModel): + minJitter: int = Field(..., ge=1) + maxJitter: int + + +class ThroughputRange(BaseModel): + minThroughput: int = Field(..., ge=1) + maxThroughput: int + + +class ZoneServiceLevelObjsInfo(BaseModel): + latencyRanges: LatencyRange + jitterRanges: JitterRange + throughputRanges: ThroughputRange + + +class ZoneRegisteredData(BaseModel): + zoneId: str + reservedComputeResources: List[ComputeResourceInfo] = Field(..., min_items=1) + computeResourceQuotaLimits: List[ComputeResourceInfo] = Field(..., min_items=1) + flavoursSupported: List[Flavour] = Field(..., min_items=1) + networkResources: Optional[NetworkResources] = None + zoneServiceLevelObjsInfo: Optional[ZoneServiceLevelObjsInfo] = None + + +class ZoneRegisteredDataList(RootModel[List[ZoneRegisteredData]]): + pass + + +# --------------------------- +# ArtefactManagement +# --------------------------- + + +class ArtefactRepoLocation(BaseModel): + repoURL: HttpUrl + userName: Optional[str] = None + password: Optional[str] = None + token: Optional[str] = None + + +class Artefact(BaseModel): + artefactId: str + appProviderId: Optional[str] = None + artefactName: str + artefactDescription: Optional[str] = None + artefactVersionInfo: str + artefactVirtType: Literal["VM_TYPE", "CONTAINER_TYPE"] + artefactFileName: Optional[str] = None + artefactFileFormat: Optional[Literal["ZIP", "TAR", "TEXT", "TARGZ"]] = None + artefactDescriptorType: Literal["HELM", "TERRAFORM", "ANSIBLE", "SHELL", "COMPONENTSPEC"] + repoType: Optional[Literal["PRIVATEREPO", "PUBLICREPO", "UPLOAD"]] = None + artefactRepoLocation: Optional[ArtefactRepoLocation] = None + + +# --------------------------- +# ApplicationOnboardingManagement +# --------------------------- + + +class AppDeploymentZone(BaseModel): + countryCode: str + zoneInfo: str + + +class AppMetaData(BaseModel): + appName: str + version: str + appDescription: Optional[str] = None + mobilitySupport: bool = False + accessToken: str + category: Optional[ + Literal[ + "IOT", + "HEALTH_CARE", + "GAMING", + "VIRTUAL_REALITY", + "SOCIALIZING", + "SURVEILLANCE", + "ENTERTAINMENT", + "CONNECTIVITY", + "PRODUCTIVITY", + "SECURITY", + "INDUSTRIAL", + "EDUCATION", + "OTHERS", + ] + ] = None + + +class AppQoSProfile(BaseModel): + latencyConstraints: Literal["NONE", "LOW", "ULTRALOW"] + bandwidthRequired: int = Field(..., ge=1) + multiUserClients: Literal["APP_TYPE_SINGLE_USER", "APP_TYPE_MULTI_USER"] + noOfUsersPerAppInst: int = 1 + appProvisioning: bool = True + + +class AppComponentSpec(BaseModel): + serviceNameNB: str + serviceNameEW: str + componentName: str + artefactId: str + + +class ApplicationModel(BaseModel): + appId: str + appProviderId: str + appDeploymentZones: List[AppDeploymentZone] = Field(..., min_length=1) + appMetaData: AppMetaData + appQoSProfile: AppQoSProfile + appComponentSpecs: List[AppComponentSpec] = Field(..., min_length=1) + onboardStatusInfo: Literal["PENDING", "ONBOARDED", "DEBOARDING", "REMOVED", "FAILED"] + + +# --------------------------- +# ApplicationDeploymentManagement +# --------------------------- + + +class AppInstance(BaseModel): + zoneId: str + appInstIdentifier: str + + +class AppInstanceStatus(BaseModel): + appInstanceState: Literal["PENDING", "READY", "FAILED", "TERMINATING", "DEPLOYED"] + accesspointInfo: List[dict] + + +class ZoneIdentifier(BaseModel): + zoneId: str + appInstanceInfo: List[dict] + + +class ZoneIdentifierList(RootModel[List[ZoneIdentifier]]): + pass