From 95d534bbeadca3e893401c0b1775797ddc1db013 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Wed, 18 Mar 2026 14:30:03 -0400 Subject: [PATCH] Replicas should atomically update their distributions It's not ideal for distributions to pick up replica changes at random time intervals as various tasks complete, ideally the entire replica is presented as updated at once (or with the smallest possible window). closes #7333 Assisted-By: claude-opus-4.6 --- CHANGES/7333.bugfix | 1 + pulpcore/app/replica.py | 17 ++++++++++++--- pulpcore/app/tasks/replica.py | 21 +++++++++++++++---- .../tests/functional/api/test_replication.py | 17 ++++++++++----- 4 files changed, 44 insertions(+), 12 deletions(-) create mode 100644 CHANGES/7333.bugfix diff --git a/CHANGES/7333.bugfix b/CHANGES/7333.bugfix new file mode 100644 index 00000000000..0724ba0df5b --- /dev/null +++ b/CHANGES/7333.bugfix @@ -0,0 +1 @@ +Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed. diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index 73e2fc45855..1b338acf899 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -172,8 +172,11 @@ def distribution_extra_fields(self, repository, upstream_distribution): """ Return the fields that need to be updated/cleared on distributions for idempotence. """ + latest = repository.latest_version() + repo_version_href = get_url(repository) + "versions/{}/".format(latest.number) return { - "repository": get_url(repository), + "repository": None, + "repository_version": repo_version_href, "publication": None, "base_path": upstream_distribution["base_path"], } @@ -187,7 +190,15 @@ def create_or_update_distribution(self, repository, upstream_distribution): ) if not self._is_managed(distro): return None - needs_update = self.needs_update(distribution_data, distro) + # Don't update repository_version here — that happens atomically in + # finalize_replication after all syncs complete. + update_data = { + k: v + for k, v in distribution_data.items() + if k not in ("repository_version", "repository", "publication") + } + update_data["pulp_labels"] = distribution_data["pulp_labels"] + needs_update = self.needs_update(update_data, distro) if needs_update: # Update the distribution dispatch( @@ -197,7 +208,7 @@ def create_or_update_distribution(self, repository, upstream_distribution): exclusive_resources=self.distros_uris, args=(distro.pk, self.app_label, self.distribution_serializer_name), kwargs={ - "data": distribution_data, + "data": update_data, "partial": True, }, ) diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 137239a49f1..ca63ccc759c 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -3,11 +3,12 @@ import sys from tempfile import NamedTemporaryFile +from django.db import transaction from django.db.models import Min from pulpcore.constants import TASK_STATES from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig -from pulpcore.app.models import UpstreamPulp, Task, TaskGroup +from pulpcore.app.models import Distribution, Repository, UpstreamPulp, Task, TaskGroup from pulpcore.app.replica import ReplicaContext from pulpcore.tasking.tasks import dispatch @@ -77,6 +78,7 @@ def replicate_distributions(server_pk): replicator = replicator_class(ctx, task_group, tls_settings, server) supported_replicators.append(replicator) + distro_repo_pairs = [] for replicator in supported_replicators: distros = replicator.upstream_distributions(q=server.q_select) distro_names = [] @@ -90,7 +92,7 @@ def replicate_distributions(server_pk): # Check if there is already a repository repository = replicator.create_or_update_repository(remote=remote) if not repository: - # No update occured because server.policy==LABELED and there was + # No update occurred because server.policy==LABELED and there was # an already existing local repository with the same name continue @@ -103,6 +105,7 @@ def replicate_distributions(server_pk): # Add name to the list of known distribution names distro_names.append(distro["name"]) + distro_repo_pairs.append((distro["name"], str(repository.pk))) replicator.remove_missing(distro_names) @@ -110,17 +113,27 @@ def replicate_distributions(server_pk): finalize_replication, task_group=task_group, exclusive_resources=[server], - args=[server.pk], + args=[server.pk, distro_repo_pairs], ) -def finalize_replication(server_pk): +def finalize_replication(server_pk, distro_repo_pairs): task = Task.current() task_group = TaskGroup.current() server = UpstreamPulp.objects.get(pk=server_pk) if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists(): raise Exception("Replication failed.") + # Atomically update all managed distributions to point to their repo's latest version. + with transaction.atomic(): + for distro_name, repo_pk in distro_repo_pairs: + distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain) + repo = Repository.objects.get(pk=repo_pk) + latest_version = repo.latest_version() + if latest_version and distro.repository_version != latest_version: + distro.repository_version = latest_version + distro.save(update_fields=["repository_version"]) + # Record timestamp of last successful replication. started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"] server.set_last_replication_timestamp(started_at) diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index c2694738852..385a6afcb4a 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -117,6 +117,13 @@ def test_replication_idempotence( assert "UpstreamPulp" in obj.pulp_labels assert upstream_pulp.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"] + # Verify the replica distribution uses repository_version (not repository) + replica_distro = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results[0] + assert replica_distro.repository is None + assert replica_distro.repository_version is not None + # Now replicate backwards upstream_pulp_body = { @@ -147,7 +154,9 @@ def test_replication_idempotence( assert result.count == 1 new_distribution = result.results[0] assert new_distribution.pulp_href == distro.pulp_href - assert new_distribution.repository == new_repository.pulp_href + assert new_distribution.repository is None + assert new_distribution.repository_version is not None + assert new_distribution.repository_version.startswith(new_repository.pulp_href) assert new_distribution.publication is None assert "UpstreamPulp" in new_distribution.pulp_labels assert upstream_pulp2.prn.split(":")[-1] == new_distribution.pulp_labels["UpstreamPulp"] @@ -373,11 +382,9 @@ def _check_replication( assert upstream_pulp.last_replication > old_replication # check if the content was correctly replicated - local_version = file_bindings.RepositoriesFileApi.read( - distribution.repository - ).latest_version_href + assert distribution.repository_version is not None local_present = file_bindings.RepositoriesFileVersionsApi.read( - local_version + distribution.repository_version ).content_summary.present upstream_version = file_bindings.PublicationsFileApi.read( upstream_distribution.publication