Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7333.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed.
17 changes: 14 additions & 3 deletions pulpcore/app/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ def distribution_extra_fields(self, repository, upstream_distribution):
"""
Return the fields that need to be updated/cleared on distributions for idempotence.
"""
latest = repository.latest_version()
repo_version_href = get_url(repository) + "versions/{}/".format(latest.number)
return {
"repository": get_url(repository),
"repository": None,
"repository_version": repo_version_href,
"publication": None,
"base_path": upstream_distribution["base_path"],
}
Expand All @@ -187,7 +190,15 @@ def create_or_update_distribution(self, repository, upstream_distribution):
)
if not self._is_managed(distro):
return None
needs_update = self.needs_update(distribution_data, distro)
# Don't update repository_version here — that happens atomically in
# finalize_replication after all syncs complete.
update_data = {
k: v
for k, v in distribution_data.items()
if k not in ("repository_version", "repository", "publication")
}
update_data["pulp_labels"] = distribution_data["pulp_labels"]
needs_update = self.needs_update(update_data, distro)
if needs_update:
# Update the distribution
dispatch(
Expand All @@ -197,7 +208,7 @@ def create_or_update_distribution(self, repository, upstream_distribution):
exclusive_resources=self.distros_uris,
args=(distro.pk, self.app_label, self.distribution_serializer_name),
kwargs={
"data": distribution_data,
"data": update_data,
"partial": True,
},
)
Expand Down
21 changes: 17 additions & 4 deletions pulpcore/app/tasks/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import sys
from tempfile import NamedTemporaryFile

from django.db import transaction
from django.db.models import Min

from pulpcore.constants import TASK_STATES
from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig
from pulpcore.app.models import UpstreamPulp, Task, TaskGroup
from pulpcore.app.models import Distribution, Repository, UpstreamPulp, Task, TaskGroup
from pulpcore.app.replica import ReplicaContext
from pulpcore.tasking.tasks import dispatch

Expand Down Expand Up @@ -77,6 +78,7 @@ def replicate_distributions(server_pk):
replicator = replicator_class(ctx, task_group, tls_settings, server)
supported_replicators.append(replicator)

distro_repo_pairs = []
for replicator in supported_replicators:
distros = replicator.upstream_distributions(q=server.q_select)
distro_names = []
Expand All @@ -90,7 +92,7 @@ def replicate_distributions(server_pk):
# Check if there is already a repository
repository = replicator.create_or_update_repository(remote=remote)
if not repository:
# No update occured because server.policy==LABELED and there was
# No update occurred because server.policy==LABELED and there was
# an already existing local repository with the same name
continue

Expand All @@ -103,24 +105,35 @@ def replicate_distributions(server_pk):

# Add name to the list of known distribution names
distro_names.append(distro["name"])
distro_repo_pairs.append((distro["name"], str(repository.pk)))

replicator.remove_missing(distro_names)

dispatch(
finalize_replication,
task_group=task_group,
exclusive_resources=[server],
args=[server.pk],
args=[server.pk, distro_repo_pairs],
)


def finalize_replication(server_pk):
def finalize_replication(server_pk, distro_repo_pairs):
task = Task.current()
task_group = TaskGroup.current()
server = UpstreamPulp.objects.get(pk=server_pk)
if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists():
raise Exception("Replication failed.")

# Atomically update all managed distributions to point to their repo's latest version.
with transaction.atomic():
for distro_name, repo_pk in distro_repo_pairs:
distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain)
repo = Repository.objects.get(pk=repo_pk)
latest_version = repo.latest_version()
if latest_version and distro.repository_version != latest_version:
distro.repository_version = latest_version
distro.save(update_fields=["repository_version"])

# Record timestamp of last successful replication.
started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"]
server.set_last_replication_timestamp(started_at)
17 changes: 12 additions & 5 deletions pulpcore/tests/functional/api/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ def test_replication_idempotence(
assert "UpstreamPulp" in obj.pulp_labels
assert upstream_pulp.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"]

# Verify the replica distribution uses repository_version (not repository)
replica_distro = file_bindings.DistributionsFileApi.list(
pulp_domain=replica_domain.name
).results[0]
assert replica_distro.repository is None
assert replica_distro.repository_version is not None

# Now replicate backwards

upstream_pulp_body = {
Expand Down Expand Up @@ -147,7 +154,9 @@ def test_replication_idempotence(
assert result.count == 1
new_distribution = result.results[0]
assert new_distribution.pulp_href == distro.pulp_href
assert new_distribution.repository == new_repository.pulp_href
assert new_distribution.repository is None
assert new_distribution.repository_version is not None
assert new_distribution.repository_version.startswith(new_repository.pulp_href)
assert new_distribution.publication is None
assert "UpstreamPulp" in new_distribution.pulp_labels
assert upstream_pulp2.prn.split(":")[-1] == new_distribution.pulp_labels["UpstreamPulp"]
Expand Down Expand Up @@ -373,11 +382,9 @@ def _check_replication(
assert upstream_pulp.last_replication > old_replication

# check if the content was correctly replicated
local_version = file_bindings.RepositoriesFileApi.read(
distribution.repository
).latest_version_href
assert distribution.repository_version is not None
local_present = file_bindings.RepositoriesFileVersionsApi.read(
local_version
distribution.repository_version
).content_summary.present
upstream_version = file_bindings.PublicationsFileApi.read(
upstream_distribution.publication
Expand Down
Loading