diff --git a/pyproject.toml b/pyproject.toml index 1ef768d..b501493 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,10 +16,10 @@ classifiers = [ dependencies = [ "alembic", + "kubernetes", "psycopg2", "pydantic>=2.0.0", "PyYAML", - "wiremind-kubernetes~=7.0", ] [build-system] diff --git a/requirements.txt b/requirements.txt index 84c02ef..77771ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,12 +16,10 @@ durationpy==0.10 # via kubernetes google-auth==2.41.1 # via kubernetes -greenlet==3.2.4 - # via sqlalchemy idna==3.10 # via requests kubernetes==34.1.0 - # via wiremind-kubernetes + # via chartreuse (pyproject.toml) mako==1.3.10 # via alembic markupsafe==3.0.3 @@ -75,5 +73,3 @@ urllib3==2.3.0 # requests websocket-client==1.9.0 # via kubernetes -wiremind-kubernetes==7.5.0 - # via chartreuse (pyproject.toml) diff --git a/src/chartreuse/chartreuse.py b/src/chartreuse/chartreuse.py index 7582986..74f0515 100644 --- a/src/chartreuse/chartreuse.py +++ b/src/chartreuse/chartreuse.py @@ -1,9 +1,8 @@ import logging -import wiremind_kubernetes.kubernetes_helper - from .config_loader import DatabaseConfig from .utils import AlembicMigrationHelper +from .utils.kubernetes_helper import KubernetesDeploymentManager logger = logging.getLogger(__name__) @@ -23,7 +22,7 @@ def __init__( self, databases_config: dict[str, DatabaseConfig], release_name: str, - kubernetes_helper: wiremind_kubernetes.kubernetes_helper.KubernetesDeploymentManager | None = None, + kubernetes_helper: KubernetesDeploymentManager | None = None, ): configure_logging() @@ -56,9 +55,7 @@ def __init__( if kubernetes_helper: self.kubernetes_helper = kubernetes_helper else: - self.kubernetes_helper = wiremind_kubernetes.kubernetes_helper.KubernetesDeploymentManager( - use_kubeconfig=None, release_name=release_name - ) + self.kubernetes_helper = KubernetesDeploymentManager(use_kubeconfig=None, release_name=release_name) @property def is_migration_needed(self) -> bool: diff --git a/src/chartreuse/chartreuse_upgrade.py b/src/chartreuse/chartreuse_upgrade.py index 630a6ec..ca9d562 100755 --- a/src/chartreuse/chartreuse_upgrade.py +++ b/src/chartreuse/chartreuse_upgrade.py @@ -1,12 +1,11 @@ import logging import os -from wiremind_kubernetes import KubernetesDeploymentManager - from chartreuse import get_version from .chartreuse import Chartreuse from .config_loader import load_multi_database_config +from .utils.kubernetes_helper import KubernetesDeploymentManager logger = logging.getLogger(__name__) diff --git a/src/chartreuse/tests/conftest.py b/src/chartreuse/tests/conftest.py index 1232161..ffffb9a 100644 --- a/src/chartreuse/tests/conftest.py +++ b/src/chartreuse/tests/conftest.py @@ -59,9 +59,8 @@ def configure_chartreuse_mock( # Mock KubernetesDeploymentManager in all the right places mock_kdm = mocker.MagicMock() - mocker.patch("wiremind_kubernetes.KubernetesDeploymentManager", return_value=mock_kdm) mocker.patch("chartreuse.chartreuse_upgrade.KubernetesDeploymentManager", return_value=mock_kdm) # Also mock the kubernetes config loading to prevent the config errors mocker.patch("kubernetes.config.load_incluster_config") - mocker.patch("wiremind_kubernetes.kube_config.load_kubernetes_config") + mocker.patch("chartreuse.utils.kubernetes_helper.load_kubernetes_config") diff --git a/src/chartreuse/tests/e2e_tests/conftest.py b/src/chartreuse/tests/e2e_tests/conftest.py index bed5fe4..32620a7 100644 --- a/src/chartreuse/tests/e2e_tests/conftest.py +++ b/src/chartreuse/tests/e2e_tests/conftest.py @@ -7,12 +7,10 @@ import pytest import sqlalchemy from sqlalchemy import inspect -from wiremind_kubernetes.kube_config import load_kubernetes_config -from wiremind_kubernetes.kubernetes_helper import KubernetesDeploymentManager -from wiremind_kubernetes.tests.e2e_tests.conftest import create_namespace, setUpE2E # noqa: F401 -from wiremind_kubernetes.utils import run_command import chartreuse +from chartreuse.utils.command import run_command +from chartreuse.utils.kubernetes_helper import KubernetesDeploymentManager, load_kubernetes_config TEST_NAMESPACE = "chartreuse-e2e-test" TEST_RELEASE = "e2e-test-release" @@ -26,6 +24,16 @@ POSTGRESQL_URL = "postgresql://foo:foo@localhost/foo?sslmode=prefer" +@pytest.fixture(scope="session") +def create_namespace() -> Generator: + yield + + +@pytest.fixture(scope="session") +def setUpE2E() -> Generator: + yield + + def _cluster_init(include_chartreuse: bool, pre_upgrade: bool = False) -> Generator: # In order to configure kubernetes load_kubernetes_config(use_kubeconfig=None) diff --git a/src/chartreuse/tests/unit_tests/conftest.py b/src/chartreuse/tests/unit_tests/conftest.py index 5562b10..9611d3b 100644 --- a/src/chartreuse/tests/unit_tests/conftest.py +++ b/src/chartreuse/tests/unit_tests/conftest.py @@ -12,6 +12,6 @@ def configure_chartreuse_mock(mocker: MockerFixture, is_migration_needed: bool = ) mocker.patch("chartreuse.chartreuse_upgrade.Chartreuse.upgrade") mocker.patch( - "wiremind_kubernetes.kubernetes_helper._get_namespace_from_kube", + "chartreuse.utils.kubernetes_helper._get_namespace_from_kube", return_value="foo", ) diff --git a/src/chartreuse/tests/unit_tests/test_chartreuse.py b/src/chartreuse/tests/unit_tests/test_chartreuse.py index d30e074..7c35b2a 100644 --- a/src/chartreuse/tests/unit_tests/test_chartreuse.py +++ b/src/chartreuse/tests/unit_tests/test_chartreuse.py @@ -87,7 +87,7 @@ def test_chartreuse_init_without_kubernetes_helper(self, mocker: MockerFixture) mock_alembic_helper.return_value = mock_alembic_instance # Mock KubernetesDeploymentManager - mock_k8s_manager = mocker.patch("wiremind_kubernetes.kubernetes_helper.KubernetesDeploymentManager") + mock_k8s_manager = mocker.patch("chartreuse.chartreuse.KubernetesDeploymentManager") mock_k8s_instance = MagicMock() mock_k8s_manager.return_value = mock_k8s_instance @@ -315,7 +315,7 @@ def test_multi_chartreuse_init_without_kubernetes_helper(self, mocker: MockerFix mock_alembic_helper.return_value = mock_alembic_instance # Mock KubernetesDeploymentManager - mock_k8s_manager = mocker.patch("wiremind_kubernetes.kubernetes_helper.KubernetesDeploymentManager") + mock_k8s_manager = mocker.patch("chartreuse.chartreuse.KubernetesDeploymentManager") mock_k8s_instance = MagicMock() mock_k8s_manager.return_value = mock_k8s_instance diff --git a/src/chartreuse/tests/unit_tests/test_chartreuse_upgrade.py b/src/chartreuse/tests/unit_tests/test_chartreuse_upgrade.py index 2dcace9..16fea88 100644 --- a/src/chartreuse/tests/unit_tests/test_chartreuse_upgrade.py +++ b/src/chartreuse/tests/unit_tests/test_chartreuse_upgrade.py @@ -14,8 +14,8 @@ def test_chartreuse_upgrade_detected_migration_enabled_stop_pods( Test that chartreuse_upgrades stop pods in case of detected migration. """ configure_chartreuse_mock(mocker=mocker, is_migration_needed=True) - mocked_stop_pods = mocker.patch("wiremind_kubernetes.KubernetesDeploymentManager.stop_pods") - mocker.patch("wiremind_kubernetes.KubernetesDeploymentManager.start_pods") + mocked_stop_pods = mocker.patch("chartreuse.chartreuse_upgrade.KubernetesDeploymentManager.stop_pods") + mocker.patch("chartreuse.chartreuse_upgrade.KubernetesDeploymentManager.start_pods") mocker.patch("chartreuse.chartreuse_upgrade.get_version", return_value="5.0.0") configure_os_environ_mock(mocker=mocker, additional_environment={"HELM_CHART_VERSION": "5.0.0"}) @@ -30,8 +30,8 @@ def test_chartreuse_upgrade_detected_migration_disabled_stop_pods( Test that chartreuse_upgrades does not stop pods in case of detected migration but we disallow stop-pods. """ configure_chartreuse_mock(mocker=mocker, is_migration_needed=True) - mocked_stop_pods = mocker.patch("wiremind_kubernetes.KubernetesDeploymentManager.stop_pods") - mocker.patch("wiremind_kubernetes.KubernetesDeploymentManager.start_pods") + mocked_stop_pods = mocker.patch("chartreuse.chartreuse_upgrade.KubernetesDeploymentManager.stop_pods") + mocker.patch("chartreuse.chartreuse_upgrade.KubernetesDeploymentManager.start_pods") mocker.patch("chartreuse.chartreuse_upgrade.get_version", return_value="5.0.0") configure_os_environ_mock( mocker=mocker, @@ -49,8 +49,8 @@ def test_chartreuse_upgrade_no_migration_disabled_stop_pods( Test that chartreuse_upgrades does NOT stop pods in case of migration not needed. """ configure_chartreuse_mock(mocker=mocker, is_migration_needed=False) - mocked_stop_pods = mocker.patch("wiremind_kubernetes.KubernetesDeploymentManager.stop_pods") - mocker.patch("wiremind_kubernetes.KubernetesDeploymentManager.start_pods") + mocked_stop_pods = mocker.patch("chartreuse.chartreuse_upgrade.KubernetesDeploymentManager.stop_pods") + mocker.patch("chartreuse.chartreuse_upgrade.KubernetesDeploymentManager.start_pods") mocker.patch("chartreuse.chartreuse_upgrade.get_version", return_value="5.0.0") configure_os_environ_mock(mocker=mocker, additional_environment={"HELM_CHART_VERSION": "5.0.0"}) diff --git a/src/chartreuse/utils/__init__.py b/src/chartreuse/utils/__init__.py index c53dbba..de44eeb 100644 --- a/src/chartreuse/utils/__init__.py +++ b/src/chartreuse/utils/__init__.py @@ -1 +1,3 @@ from .alembic_migration_helper import AlembicMigrationHelper # noqa: F401 +from .command import run_command # noqa: F401 +from .kubernetes_helper import KubernetesDeploymentManager, load_kubernetes_config # noqa: F401 diff --git a/src/chartreuse/utils/alembic_migration_helper.py b/src/chartreuse/utils/alembic_migration_helper.py index efc7d4c..57adf3c 100755 --- a/src/chartreuse/utils/alembic_migration_helper.py +++ b/src/chartreuse/utils/alembic_migration_helper.py @@ -5,7 +5,8 @@ import sqlalchemy from sqlalchemy import inspect -from wiremind_kubernetes.utils import run_command + +from .command import run_command logger = logging.getLogger(__name__) diff --git a/src/chartreuse/utils/command.py b/src/chartreuse/utils/command.py new file mode 100644 index 0000000..4e5e571 --- /dev/null +++ b/src/chartreuse/utils/command.py @@ -0,0 +1,22 @@ +import subprocess + + +def run_command( + command: str, + *, + cwd: str | None = None, + return_result: bool = False, +) -> tuple[str, str | None, int] | None: + result = subprocess.run( + command, + cwd=cwd, + shell=True, + check=not return_result, + capture_output=return_result, + text=True, + ) + + if return_result: + return result.stdout, result.stderr, result.returncode + + return None diff --git a/src/chartreuse/utils/kubernetes_helper.py b/src/chartreuse/utils/kubernetes_helper.py new file mode 100644 index 0000000..198b9d7 --- /dev/null +++ b/src/chartreuse/utils/kubernetes_helper.py @@ -0,0 +1,192 @@ +import logging +from pathlib import Path +from typing import Any + +import kubernetes +import kubernetes.client +from kubernetes.client.exceptions import ApiException + +logger = logging.getLogger(__name__) + +EXPECTED_DEPLOYMENT_SCALE_GVP = { + "group": "wiremind.io", + "version": "v1", + "plural": "expecteddeploymentscales", +} +PREVIOUS_REPLICAS_ANNOTATION = "chartreuse.wiremind.io/previous-replicas" + + +def _run_in_cluster() -> bool: + return Path("/var/run/secrets/kubernetes.io/serviceaccount/token").is_file() + + +def _get_namespace_from_kube() -> str: + namespace_path = Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if namespace_path.is_file(): + return namespace_path.read_text(encoding="utf-8").strip() + return "default" + + +def load_kubernetes_config(*, use_kubeconfig: bool | None = None) -> None: + if use_kubeconfig is True: + kubernetes.config.load_kube_config() + return + + if use_kubeconfig is False: + kubernetes.config.load_incluster_config() + return + + if _run_in_cluster(): + try: + kubernetes.config.load_incluster_config() + return + except kubernetes.config.config_exception.ConfigException: + logger.debug("Could not load in-cluster config, falling back to kubeconfig.") + + kubernetes.config.load_kube_config() + + +class KubernetesDeploymentManager: + def __init__( + self, + *, + release_name: str, + namespace: str | None = None, + use_kubeconfig: bool | None = None, + should_load_kubernetes_config: bool = True, + ): + self.release_name = release_name + self.namespace = namespace or _get_namespace_from_kube() + self.use_kubeconfig = use_kubeconfig + self.should_load_kubernetes_config = should_load_kubernetes_config + self._clients_initialized = False + self.client_appsv1_api: kubernetes.client.AppsV1Api | None = None + self.client_custom_objects_api: kubernetes.client.CustomObjectsApi | None = None + + def _ensure_clients_initialized(self) -> None: + if self._clients_initialized: + return + + if self.should_load_kubernetes_config: + load_kubernetes_config(use_kubeconfig=self.use_kubeconfig) + + self.client_appsv1_api = kubernetes.client.AppsV1Api() + self.client_custom_objects_api = kubernetes.client.CustomObjectsApi() + self._clients_initialized = True + + def _release_label_selectors(self) -> list[str]: + if not self.release_name: + return [] + return [ + f"release={self.release_name}", + f"app.kubernetes.io/instance={self.release_name}", + ] + + def _list_release_deployments(self) -> list[Any]: + self._ensure_clients_initialized() + assert self.client_appsv1_api is not None + deployments: dict[str, Any] = {} + selectors = self._release_label_selectors() or [""] + for selector in selectors: + kwargs = {"namespace": self.namespace} + if selector: + kwargs["label_selector"] = selector + for deployment in self.client_appsv1_api.list_namespaced_deployment(**kwargs).items: + deployment_name = deployment.metadata.name + if deployment_name: + deployments[deployment_name] = deployment + return list(deployments.values()) + + def _set_deployment_replicas(self, *, deployment_name: str, replicas: int) -> None: + self._ensure_clients_initialized() + assert self.client_appsv1_api is not None + self.client_appsv1_api.patch_namespaced_deployment_scale( + namespace=self.namespace, + name=deployment_name, + body={"spec": {"replicas": replicas}}, + ) + + def _set_deployment_annotation(self, *, deployment_name: str, key: str, value: str) -> None: + self._ensure_clients_initialized() + assert self.client_appsv1_api is not None + self.client_appsv1_api.patch_namespaced_deployment( + namespace=self.namespace, + name=deployment_name, + body={"metadata": {"annotations": {key: value}}}, + ) + + def _get_expected_deployment_scales(self) -> dict[str, int]: + self._ensure_clients_initialized() + assert self.client_custom_objects_api is not None + scales: dict[str, int] = {} + selectors = self._release_label_selectors() or [""] + for selector in selectors: + kwargs: dict[str, Any] = { + "group": EXPECTED_DEPLOYMENT_SCALE_GVP["group"], + "version": EXPECTED_DEPLOYMENT_SCALE_GVP["version"], + "plural": EXPECTED_DEPLOYMENT_SCALE_GVP["plural"], + "namespace": self.namespace, + } + if selector: + kwargs["label_selector"] = selector + + try: + resources = self.client_custom_objects_api.list_namespaced_custom_object(**kwargs) + except ApiException as exc: + if exc.status in {403, 404}: + logger.debug("ExpectedDeploymentScale custom resource is unavailable: %s", exc) + return {} + raise + + for item in resources.get("items", []): + spec = item.get("spec", {}) + deployment_name = spec.get("deploymentName") or item.get("metadata", {}).get("name") + expected_scale = spec.get("expectedScale") + if isinstance(deployment_name, str) and isinstance(expected_scale, int): + scales[deployment_name] = expected_scale + return scales + + def stop_pods(self) -> None: + for deployment in self._list_release_deployments(): + deployment_name = deployment.metadata.name + current_replicas = deployment.spec.replicas or 0 + if current_replicas == 0: + continue + + self._set_deployment_annotation( + deployment_name=deployment_name, + key=PREVIOUS_REPLICAS_ANNOTATION, + value=str(current_replicas), + ) + self._set_deployment_replicas(deployment_name=deployment_name, replicas=0) + + def start_pods(self) -> None: + expected_scales = self._get_expected_deployment_scales() + for deployment in self._list_release_deployments(): + deployment_name = deployment.metadata.name + current_replicas = deployment.spec.replicas or 0 + + wanted_replicas = expected_scales.get(deployment_name) + if wanted_replicas is None: + annotations = deployment.metadata.annotations or {} + previous_replicas = annotations.get(PREVIOUS_REPLICAS_ANNOTATION) + if previous_replicas and previous_replicas.isdigit(): + wanted_replicas = int(previous_replicas) + + if wanted_replicas is None or wanted_replicas == 0 or current_replicas == wanted_replicas: + continue + + self._set_deployment_replicas(deployment_name=deployment_name, replicas=wanted_replicas) + + def is_deployment_stopped(self, deployment_name: str) -> bool: + self._ensure_clients_initialized() + assert self.client_appsv1_api is not None + deployment = self.client_appsv1_api.read_namespaced_deployment( + namespace=self.namespace, + name=deployment_name, + ) + wanted_replicas = deployment.spec.replicas or 0 + replicas = deployment.status.replicas or 0 + ready_replicas = deployment.status.ready_replicas or 0 + available_replicas = deployment.status.available_replicas or 0 + return wanted_replicas == 0 and replicas == 0 and ready_replicas == 0 and available_replicas == 0