diff --git a/CHANGES/7333.bugfix b/CHANGES/7333.bugfix new file mode 100644 index 0000000000..0724ba0df5 --- /dev/null +++ b/CHANGES/7333.bugfix @@ -0,0 +1 @@ +Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed. diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index 73e2fc4585..1b338acf89 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -172,8 +172,11 @@ def distribution_extra_fields(self, repository, upstream_distribution): """ Return the fields that need to be updated/cleared on distributions for idempotence. """ + latest = repository.latest_version() + repo_version_href = get_url(repository) + "versions/{}/".format(latest.number) return { - "repository": get_url(repository), + "repository": None, + "repository_version": repo_version_href, "publication": None, "base_path": upstream_distribution["base_path"], } @@ -187,7 +190,15 @@ def create_or_update_distribution(self, repository, upstream_distribution): ) if not self._is_managed(distro): return None - needs_update = self.needs_update(distribution_data, distro) + # Don't update repository_version here — that happens atomically in + # finalize_replication after all syncs complete. + update_data = { + k: v + for k, v in distribution_data.items() + if k not in ("repository_version", "repository", "publication") + } + update_data["pulp_labels"] = distribution_data["pulp_labels"] + needs_update = self.needs_update(update_data, distro) if needs_update: # Update the distribution dispatch( @@ -197,7 +208,7 @@ def create_or_update_distribution(self, repository, upstream_distribution): exclusive_resources=self.distros_uris, args=(distro.pk, self.app_label, self.distribution_serializer_name), kwargs={ - "data": distribution_data, + "data": update_data, "partial": True, }, ) diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 137239a49f..ca63ccc759 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -3,11 +3,12 @@ import sys from tempfile import NamedTemporaryFile +from django.db import transaction from django.db.models import Min from pulpcore.constants import TASK_STATES from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig -from pulpcore.app.models import UpstreamPulp, Task, TaskGroup +from pulpcore.app.models import Distribution, Repository, UpstreamPulp, Task, TaskGroup from pulpcore.app.replica import ReplicaContext from pulpcore.tasking.tasks import dispatch @@ -77,6 +78,7 @@ def replicate_distributions(server_pk): replicator = replicator_class(ctx, task_group, tls_settings, server) supported_replicators.append(replicator) + distro_repo_pairs = [] for replicator in supported_replicators: distros = replicator.upstream_distributions(q=server.q_select) distro_names = [] @@ -90,7 +92,7 @@ def replicate_distributions(server_pk): # Check if there is already a repository repository = replicator.create_or_update_repository(remote=remote) if not repository: - # No update occured because server.policy==LABELED and there was + # No update occurred because server.policy==LABELED and there was # an already existing local repository with the same name continue @@ -103,6 +105,7 @@ def replicate_distributions(server_pk): # Add name to the list of known distribution names distro_names.append(distro["name"]) + distro_repo_pairs.append((distro["name"], str(repository.pk))) replicator.remove_missing(distro_names) @@ -110,17 +113,27 @@ def replicate_distributions(server_pk): finalize_replication, task_group=task_group, exclusive_resources=[server], - args=[server.pk], + args=[server.pk, distro_repo_pairs], ) -def finalize_replication(server_pk): +def finalize_replication(server_pk, distro_repo_pairs): task = Task.current() task_group = TaskGroup.current() server = UpstreamPulp.objects.get(pk=server_pk) if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists(): raise Exception("Replication failed.") + # Atomically update all managed distributions to point to their repo's latest version. + with transaction.atomic(): + for distro_name, repo_pk in distro_repo_pairs: + distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain) + repo = Repository.objects.get(pk=repo_pk) + latest_version = repo.latest_version() + if latest_version and distro.repository_version != latest_version: + distro.repository_version = latest_version + distro.save(update_fields=["repository_version"]) + # Record timestamp of last successful replication. started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"] server.set_last_replication_timestamp(started_at) diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index c269473885..385a6afcb4 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -117,6 +117,13 @@ def test_replication_idempotence( assert "UpstreamPulp" in obj.pulp_labels assert upstream_pulp.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"] + # Verify the replica distribution uses repository_version (not repository) + replica_distro = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results[0] + assert replica_distro.repository is None + assert replica_distro.repository_version is not None + # Now replicate backwards upstream_pulp_body = { @@ -147,7 +154,9 @@ def test_replication_idempotence( assert result.count == 1 new_distribution = result.results[0] assert new_distribution.pulp_href == distro.pulp_href - assert new_distribution.repository == new_repository.pulp_href + assert new_distribution.repository is None + assert new_distribution.repository_version is not None + assert new_distribution.repository_version.startswith(new_repository.pulp_href) assert new_distribution.publication is None assert "UpstreamPulp" in new_distribution.pulp_labels assert upstream_pulp2.prn.split(":")[-1] == new_distribution.pulp_labels["UpstreamPulp"] @@ -373,11 +382,9 @@ def _check_replication( assert upstream_pulp.last_replication > old_replication # check if the content was correctly replicated - local_version = file_bindings.RepositoriesFileApi.read( - distribution.repository - ).latest_version_href + assert distribution.repository_version is not None local_present = file_bindings.RepositoriesFileVersionsApi.read( - local_version + distribution.repository_version ).content_summary.present upstream_version = file_bindings.PublicationsFileApi.read( upstream_distribution.publication