From c1a60c05bb345b57e4da450da4d811d4e3a6a799 Mon Sep 17 00:00:00 2001 From: Jakub Stepien Date: Mon, 16 Feb 2026 13:13:17 +0100 Subject: [PATCH] [DCO01A-121] Make snapshot zip smaller by using symlinks to identical entities archives --- .../snapshots/snapshot_create.py | 102 +++++++++++++++++- .../snapshots/snapshot_restore.py | 20 +++- 2 files changed, 114 insertions(+), 8 deletions(-) diff --git a/mgmtworker/cloudify_system_workflows/snapshots/snapshot_create.py b/mgmtworker/cloudify_system_workflows/snapshots/snapshot_create.py index edef6e94ec..14c19662fd 100644 --- a/mgmtworker/cloudify_system_workflows/snapshots/snapshot_create.py +++ b/mgmtworker/cloudify_system_workflows/snapshots/snapshot_create.py @@ -3,9 +3,12 @@ import pathlib import queue import shutil +import hashlib +import tarfile import tempfile import zipfile from collections import defaultdict +from contextlib import contextmanager from pathlib import Path from typing import Any @@ -84,6 +87,11 @@ def __init__( self._auditlog_queue = queue.Queue() self._auditlog_listener = AuditLogListener(self._client, self._auditlog_queue) + self._written_archives: dict[str, dict[tuple[str, ...], str]] + self._written_archives = { # track created entities archives + 'plugins': {}, + 'blueprints': {}, # will do for both blueprints and blueprint_revisions + } def create(self, timeout: float | None = None): """Dumps manager's data and some metadata into a single zip file""" @@ -263,6 +271,7 @@ def _write_files( if _should_append_entity(dump_type, entity): self._auditlog_listener.append_entity( tenant_name, dump_type, entity) + self._update_written_archives(entity_id, dump_type, output_dir) # Dump the data as JSON files filenum = _get_max_filenum_in_dir(output_dir) or 0 for (source, source_id), items in data_buckets.items(): @@ -308,17 +317,24 @@ def _create_archive(self): ) as zf: base_dir = os.path.join(root_dir, os.curdir) base_dir = os.path.normpath(base_dir) - for dirpath, dirnames, filenames in os.walk(base_dir): + for dirpath, dirnames, filenames in os.walk(base_dir, followlinks=False): + root_path = Path(dirpath) arcdirpath = os.path.relpath(dirpath, root_dir) for name in sorted(dirnames): path = os.path.join(dirpath, name) arcname = os.path.join(arcdirpath, name) zf.write(path, arcname) for name in filenames: - path = os.path.join(dirpath, name) - path = os.path.normpath(path) - if os.path.isfile(path): - arcname = os.path.join(arcdirpath, name) + path = root_path / name + arcname = path.relative_to(root_dir) + if path.is_symlink(): + zip_info = zipfile.ZipInfo(str(arcname)) + zip_info.create_system = 3 # Unix + st = os.lstat(path) + zip_info.external_attr = st.st_mode << 16 + link_target = os.readlink(path) + zf.writestr(zip_info, link_target) + elif os.path.isfile(path): zf.write(path, arcname) def _upload_archive(self): @@ -392,6 +408,26 @@ def _update_snapshot_status(self, status, error=None): error=error ) + def _update_written_archives(self, entity_id, dump_type, output_dir): + dest_dir = (output_dir / f'{dump_type}').resolve() + suffix = { + 'plugins': '.zip', + 'blueprints': '.tar.gz', + }.get(dump_type) + if not suffix: + return + entity_archive = dest_dir / f'{entity_id}{suffix}' + content_hashes = _get_archive_content_hashes(entity_archive) + if existing_path := self._written_archives[dump_type].get(content_hashes): + entity_archive.unlink(missing_ok=False) + os.symlink( + os.path.relpath(existing_path, entity_archive).split("/", 1)[-1], + entity_archive, + ) + ctx.logger.debug("Created symlink: %s to %s", entity_archive, existing_path) + return + self._written_archives[dump_type][content_hashes] = entity_archive + def _prepare_temp_dir() -> Path: """Prepare temporary (working) directory structure""" @@ -516,3 +552,59 @@ def get_all(method, kwargs=None): kwargs['_offset'] = len(data) return data + + +def _hash_it(content) -> str: + if isinstance(content, str): + content = content.encode('utf-8') + elif not isinstance(content, bytes): + content = str(content).encode('utf-8') + return hashlib.md5(content).hexdigest() + + +@contextmanager +def _open_archive(path: Path): + if path.name.endswith(".zip"): + with zipfile.ZipFile(path, "r") as arc: + yield "zip", arc + elif path.name.endswith(".tar.gz"): + with tarfile.open(path, "r:gz") as arc: + yield "tar.gz", arc + else: + raise RuntimeWarning("not supported archive type '{}'".format(path)) + + +def _iter_archive_members(arc_type: str, archive: zipfile.ZipFile | tarfile.TarFile): + if arc_type == "zip": + for info in archive.infolist(): + yield info.filename, info.is_dir(), lambda i=info: archive.open(i) + else: + for member in archive.getmembers(): + yield member.name, member.isdir(), lambda m=member: archive.extractfile(m) + + +def _get_archive_content_hashes(path: Path) -> tuple[str, ...]: + hashes: set[str] = set() + all_dirs: set[str] = set() + not_empty_dirs: set[str] = set() + filenames: set[str] = set() + + with _open_archive(path) as (arc_type, arc): + for name, is_dir, open_file in _iter_archive_members(arc_type, arc): + if not is_dir: + filenames.add(name) + parts = name.split('/') + for i in range(1, len(parts)): + not_empty_dirs.add('/'.join(parts[:i])) + with open_file() as fileobj: + if fileobj: + content_hash = _hash_it(fileobj.read()) + hashes.add(content_hash) + else: + all_dirs.add(name.rstrip("/")) + + if filenames: + hashes.add(_hash_it(":".join(filenames))) + if empty_dirs := all_dirs - not_empty_dirs: + hashes.add(_hash_it(":".join(empty_dirs))) + return tuple(hashes) diff --git a/mgmtworker/cloudify_system_workflows/snapshots/snapshot_restore.py b/mgmtworker/cloudify_system_workflows/snapshots/snapshot_restore.py index f007033102..58f68d1ed1 100644 --- a/mgmtworker/cloudify_system_workflows/snapshots/snapshot_restore.py +++ b/mgmtworker/cloudify_system_workflows/snapshots/snapshot_restore.py @@ -6,12 +6,14 @@ import uuid import base64 import shutil +import stat import zipfile import tempfile import threading import subprocess from contextlib import contextmanager from functools import partial +from pathlib import Path from typing import Any from cloudify.workflows import ctx @@ -76,6 +78,11 @@ # Reproduced/modified from patch for https://bugs.python.org/issue15795 class ZipFile(zipfile.ZipFile): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._all_entries = {info.filename.rstrip('/'): info for info in self.infolist()} + def _extract_member(self, member, targetpath, pwd): """Extract the ZipInfo object 'member' to a physical file on the path targetpath. @@ -112,11 +119,18 @@ def _extract_member(self, member, targetpath, pwd): os.mkdir(targetpath) return targetpath - with self.open(member, pwd=pwd) as source, \ - open(targetpath, "wb") as target: + _mode = member.external_attr >> 16 + if stat.S_ISLNK(_mode): + link = self.read(member.filename).decode('utf-8') + source_path = Path(member.filename).parent / link + member_to_extract = self._all_entries[os.path.normpath(source_path)] + else: + member_to_extract = member + + with self.open(member_to_extract, pwd=pwd) as source, open(targetpath, "wb") as target: shutil.copyfileobj(source, target) - mode = member.external_attr >> 16 & 0xFFF + mode = member_to_extract.external_attr >> 16 & 0xFFF os.chmod(targetpath, mode) return targetpath