From 50a7afd7af185b27e002ec1497c98c675d8c5d99 Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Thu, 19 Feb 2026 14:57:32 +0100 Subject: [PATCH 01/11] Add token management functions for TOML config --- src/hermes/commands/harvest/util/token.py | 71 +++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 src/hermes/commands/harvest/util/token.py diff --git a/src/hermes/commands/harvest/util/token.py b/src/hermes/commands/harvest/util/token.py new file mode 100644 index 00000000..a9a86a96 --- /dev/null +++ b/src/hermes/commands/harvest/util/token.py @@ -0,0 +1,71 @@ +# SPDX-FileCopyrightText: 2026 OFFIS e.V. +# +# SPDX-License-Identifier: Apache-2.0 + +# SPDX-FileContributor: Stephan Ferenz +# SPDX-FileContributor: Aida Jafarbigloo + +from pathlib import Path +import base64 +import toml + + +def _load_config(config_path: str) -> dict: + path = Path(config_path) + if path.exists(): + try: + with path.open("r") as f: + return toml.load(f) + except toml.TomlDecodeError: + return {} + return {} + + +def _save_config(config: dict, config_path: str) -> None: + with Path(config_path).open("w") as f: + toml.dump(config, f) + + +def update_token_to_toml(token: str, config_path: str = "hermes.toml") -> None: + """ + Update the token in the TOML configuration file, encoding it with base64. + + Args: + token: The personal token key to be set. + config_path: Path to the TOML config file. + """ + encoded_token = base64.b64encode(token.encode()).decode() + config = _load_config(config_path) + + config.setdefault("harvest", {}) + config["harvest"]["token"] = encoded_token + + _save_config(config, config_path) + + +def load_token_from_toml(config_path: str = "hermes.toml") -> str | None: + """ + Load and decode the token from the TOML configuration file. + + Args: + config_path: Path to the TOML config file. + + Returns: + The decoded token if present, otherwise None. + """ + config = _load_config(config_path) + encoded_token = config.get("harvest", {}).get("token") + return base64.b64decode(encoded_token.encode()).decode() if encoded_token else None + + +def remove_token_from_toml(config_path: str = "hermes.toml") -> None: + """ + Remove the 'token' field from the 'harvest' section of the TOML file. + + Args: + config_path: Path to the TOML config file. + """ + config = _load_config(config_path) + if "token" in config.get("harvest", {}): + del config["harvest"]["token"] + _save_config(config, config_path) From d27c6d5ff6fb6f654ca453e8440477fc69a531b3 Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Thu, 19 Feb 2026 15:55:56 +0100 Subject: [PATCH 02/11] Implement repository cloning Added clone utility functions for repository cloning with error handling and cleanup. --- src/hermes/commands/harvest/util/clone.py | 249 ++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 src/hermes/commands/harvest/util/clone.py diff --git a/src/hermes/commands/harvest/util/clone.py b/src/hermes/commands/harvest/util/clone.py new file mode 100644 index 00000000..4c302923 --- /dev/null +++ b/src/hermes/commands/harvest/util/clone.py @@ -0,0 +1,249 @@ +# SPDX-FileCopyrightText: 2026 OFFIS e.V. +# +# SPDX-License-Identifier: Apache-2.0 + +# SPDX-FileContributor: Stephan Ferenz +# SPDX-FileContributor: Aida Jafarbigloo + +import os +import re +import shutil +import subprocess +import tempfile +import time +import stat +from pathlib import Path +from urllib.parse import urlparse +from typing import Sequence + +# ---------------- utilities ---------------- + +def _normalize_clone_url(url: str) -> str: + s = str(url).strip() + if re.match(r'^[\w.-]+@[\w.-]+:.*', s): + return s if s.endswith('.git') else s + '.git' + if s.startswith('file://'): + return s + if os.path.exists(s): + return s + p = urlparse(s) + if p.scheme in ('http', 'https'): + path = p.path if p.path.endswith('.git') else (p.path.rstrip('/') + '.git') + return f"{p.scheme}://{p.netloc}{path}" + if s.endswith('.git'): + return s + raise ValueError(f"Unsupported repository URL format: {url!r}") + +def _clear_readonly(func, path, excinfo): + """ + onerror handler for shutil.rmtree: try to remove read-only flag and retry. + """ + try: + os.chmod(path, stat.S_IWRITE) + except Exception: + pass + try: + if os.path.isdir(path): + shutil.rmtree(path) + else: + os.remove(path) + except Exception: + pass + +def rmtree_with_retries(path: Path, retries: int = 6, initial_wait: float = 0.1): + """ + Best-effort removal of path with retries and read-only handling. + - retries: number of attempts + - initial_wait: initial sleep (multiplies by 2 each retry) + Logs exceptions but never raises. + """ + if not path.exists(): + return + + wait = initial_wait + for attempt in range(1, retries + 1): + try: + # Ensure files are writable where possible + for root, dirs, files in os.walk(path, topdown=False): + for name in files: + p = os.path.join(root, name) + try: + os.chmod(p, stat.S_IWRITE) + except Exception: + pass + for name in dirs: + p = os.path.join(root, name) + try: + os.chmod(p, stat.S_IWRITE) + except Exception: + pass + + shutil.rmtree(path, onerror=_clear_readonly) + + if not path.exists(): + return + except Exception as e: + print(f"warn: rmtree attempt {attempt} failed for {path!s}: {e!r}") + time.sleep(wait) + wait *= 2 + + try: + alt = path.with_name(path.name + "_TO_DELETE") + try: + os.replace(str(path), str(alt)) + shutil.rmtree(alt, onerror=_clear_readonly) + return + except Exception: + pass + except Exception: + pass + + if path.exists(): + print(f"error: failed to remove temp dir {path!s} after {retries} attempts. " + f"Please remove it manually. (Often caused by antivirus or open handles.)") + +def _move_or_copy(src: Path, dst: Path): + try: + os.replace(str(src), str(dst)) + except Exception: + shutil.copytree(str(src), str(dst)) + rmtree_with_retries(src) + +# ---------------- clone logic ---------------- + +def clone_repository( + url: str, + dest_dir: str, + recursive: bool = True, + depth: int | None = 1, + filter_blobs: bool = True, + sparse: bool = False, + branch: str | None = None, + insecure_ssl: bool = False, + *, + root_only: bool = False, + include_files: Sequence[str] | None = None, + verbose: bool = False, +) -> None: + """ + Robust clone that guarantees best-effort cleanup of temp dirs. + - Creates temp directories next to the target dest_dir. + - Always tries to remove temp dirs (even on success/failure). + """ + dest_path = Path(dest_dir) + parent = dest_path.parent + parent.mkdir(parents=True, exist_ok=True) + + clone_url = _normalize_clone_url(url) + is_gitlab = "gitlab.com" in url.lower() + if is_gitlab: + if verbose: + print("⚠️ GitLab detected: disabling --depth and --filter=blob:none for safety.") + depth = None + filter_blobs = False + + env = os.environ.copy() + if insecure_ssl: + env["GIT_SSL_NO_VERIFY"] = "1" + + created_temp_dirs: list[Path] = [] + + def build_cmd_for(temp_path: Path, optimized: bool): + cmd = ["git", "clone"] + if optimized: + if branch: + cmd += ["--branch", branch] + if depth is not None: + cmd += ["--depth", str(depth)] + if filter_blobs: + cmd += ["--filter=blob:none"] + if sparse or root_only or (include_files and len(include_files) > 0): + cmd += ["--sparse"] + if recursive: + cmd += ["--recurse-submodules"] + else: + if branch: + cmd += ["--branch", branch] + cmd += [clone_url, str(temp_path)] + return cmd + + def attempt_clone(optimized: bool): + tmp = Path(tempfile.mkdtemp(prefix="clone_tmp_", dir=str(parent))) + created_temp_dirs.append(tmp) + cmd = build_cmd_for(tmp, optimized) + if verbose: + print("running:", " ".join(cmd)) + proc = subprocess.run(cmd, capture_output=True, text=True, env=env) + return proc.returncode, proc, tmp + + try: + # Try optimized + rc1, p1, tmp1 = attempt_clone(optimized=True) + if rc1 != 0: + if verbose: + print("warn: optimized clone failed. stderr:") + print(p1.stderr.strip() or "(no stderr)") + # Try fallback plain clone + rc2, p2, tmp2 = attempt_clone(optimized=False) + if rc2 != 0: + # both failed -> raise with both stderr + raise RuntimeError( + "Both optimized clone AND fallback clone failed.\n\n" + f"Optimized STDERR:\n{p1.stderr}\n\n" + f"Fallback STDERR:\n{p2.stderr}\n" + ) + + # fallback succeeded: move into place + if dest_path.exists(): + if any(dest_path.iterdir()): + raise RuntimeError(f"Destination '{dest_path}' already exists and is not empty. Won't overwrite.") + else: + rmtree_with_retries(dest_path) + + _move_or_copy(tmp2, dest_path) + if verbose: + print("✅ Repository cloned successfully (fallback/full clone).") + return + + # optimized succeeded (tmp1) + if dest_path.exists(): + if any(dest_path.iterdir()): + raise RuntimeError(f"Destination '{dest_path}' already exists and is not empty. Won't overwrite.") + else: + rmtree_with_retries(dest_path) + + _move_or_copy(tmp1, dest_path) + if verbose: + print("✅ Repository cloned successfully (optimized clone).") + + # if sparse/root_only/include_files were requested, apply sparse-checkout + if sparse or root_only or (include_files and len(include_files) > 0): + try: + subprocess.run( + ["git", "-C", str(dest_path), "sparse-checkout", "init", "--no-cone"], + check=True + ) + patterns: list[str] = [] + if root_only: + patterns += ["/*", "!/*/"] + if include_files: + for p in include_files: + p = p.strip() + if p: + patterns.append(p if p.startswith("/") else f"/{p}") + if patterns: + subprocess.run( + ["git", "-C", str(dest_path), "sparse-checkout", "set", "--no-cone", *patterns], + check=True + ) + if verbose: + print("📁 Sparse checkout applied:", patterns) + except subprocess.CalledProcessError as e: + print("warn: sparse-checkout setup failed:", e) + + finally: + for t in created_temp_dirs: + try: + rmtree_with_retries(t) + except Exception as e: + print(f"warn: final cleanup failed for {t}: {e!r}") From 17c8b807b77ae94a9a1ee70c1184a448d41794a7 Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Thu, 19 Feb 2026 16:02:56 +0100 Subject: [PATCH 03/11] Add URL and token arguments for harvesting Added new command-line arguments for URL and token in harvest command. --- src/hermes/commands/base.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/hermes/commands/base.py b/src/hermes/commands/base.py index f04399dd..63c6dd7d 100644 --- a/src/hermes/commands/base.py +++ b/src/hermes/commands/base.py @@ -104,6 +104,19 @@ def init_common_parser(self, parser: argparse.ArgumentParser) -> None: type=pathlib.Path, help="Configuration file in TOML format", ) + # Add a new argument to accept a URL for harvesting (in harvest command) + parser.add_argument( + "--url", + type=str, + help="URL from which to extract metadata (GitHub or GitLab))" + ) + # Add a new argument to accept a token (from GitHub or GitLab) for harvesting (in harvest command) + parser.add_argument( + "--token", + type=str, + required=False, + help="Access token for GitHub/GitLab (optional, only needed for private repos or GitHub/GitLab API plugin)" + ) plugin_args = parser.add_argument_group("Extra options") plugin_args.add_argument( From 459845f91db25fc8f872aa724fd2d3c2c647210d Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Thu, 19 Feb 2026 16:33:22 +0100 Subject: [PATCH 04/11] Implement repository cloning and token updates Add temporary directory handling for cloning repositories and update token management. --- src/hermes/commands/harvest/base.py | 30 ++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/hermes/commands/harvest/base.py b/src/hermes/commands/harvest/base.py index 460345ea..3e9f6337 100644 --- a/src/hermes/commands/harvest/base.py +++ b/src/hermes/commands/harvest/base.py @@ -7,13 +7,17 @@ import argparse import typing as t from datetime import datetime +import tempfile +import pathlib +from hermes import logger from pydantic import BaseModel from hermes.commands.base import HermesCommand, HermesPlugin from hermes.model.context import HermesContext, HermesHarvestContext from hermes.model.errors import HermesValidationError, MergeError - +from hermes.commands.harvest.util.token import update_token_to_toml, remove_token_from_toml +from hermes.commands.harvest.util.clone import clone_repository class HermesHarvestPlugin(HermesPlugin): """Base plugin that does harvesting. @@ -44,6 +48,30 @@ def __call__(self, args: argparse.Namespace) -> None: # Initialize the harvest cache directory here to indicate the step ran ctx.init_cache("harvest") + logger.init_logging() + log = logger.getLogger("hermes.cli") + + if args.url: + with tempfile.TemporaryDirectory(dir=".") as temp_dir: + temp_path = pathlib.Path(temp_dir) + log.info(f"Cloning repository {args.url} into {temp_path}") + + try: + clone_repository(args.url, temp_path, recursive=True, depth=1, filter_blobs=True, sparse=False, verbose=True) + except Exception as exc: + print("ERROR:", exc) + args.path = temp_path # Overwrite args.path to temp directory + + if args.token: + update_token_to_toml(args.token) + self._harvest(ctx) + if args.token: + remove_token_from_toml('hermes.toml') + else: + self._harvest(ctx) + + def _harvest(self, ctx: HermesContext) -> None: + """Harvest metadata from configured sources using plugins.""" for plugin_name in self.settings.sources: try: plugin_func = self.plugins[plugin_name]() From a6e45d142924ff8afd76343401f2ed822041856a Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Thu, 19 Feb 2026 16:43:32 +0100 Subject: [PATCH 05/11] Integrate logging shutdown in clean command Added an explicit logging shutdown step before clearing the HERMES caches. Without shutting down logging first, the `clean` command fails on Windows with: `An error occurred during execution of clean (Find details in './hermes.log')` "Original exception was: [WinError 32] The process cannot access the file because it is being used by another process: '.hermes\\audit.log'" --- src/hermes/commands/clean/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/hermes/commands/clean/base.py b/src/hermes/commands/clean/base.py index b588faf5..65477851 100644 --- a/src/hermes/commands/clean/base.py +++ b/src/hermes/commands/clean/base.py @@ -6,6 +6,7 @@ import argparse import shutil +import logging from pydantic import BaseModel @@ -25,6 +26,7 @@ class HermesCleanCommand(HermesCommand): def __call__(self, args: argparse.Namespace) -> None: self.log.info("Removing HERMES caches...") + logging.shutdown() # Naive implementation for now... check errors, validate directory, don't construct the path ourselves, etc. shutil.rmtree(args.path / '.hermes') From e0a527e5435fb7cb40fa4e289e99e9d5b22bf29f Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Mon, 23 Feb 2026 16:45:34 +0100 Subject: [PATCH 06/11] Update copyright holder --- src/hermes/commands/harvest/util/clone.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hermes/commands/harvest/util/clone.py b/src/hermes/commands/harvest/util/clone.py index 4c302923..7e62dc87 100644 --- a/src/hermes/commands/harvest/util/clone.py +++ b/src/hermes/commands/harvest/util/clone.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2026 OFFIS e.V. +# SPDX-FileCopyrightText: 2026 UOL # # SPDX-License-Identifier: Apache-2.0 From 1f3b31f8a5517aa225e77edff54f863579604732 Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Mon, 23 Feb 2026 16:46:03 +0100 Subject: [PATCH 07/11] Update copyright holder --- src/hermes/commands/harvest/util/token.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hermes/commands/harvest/util/token.py b/src/hermes/commands/harvest/util/token.py index a9a86a96..5b2559a8 100644 --- a/src/hermes/commands/harvest/util/token.py +++ b/src/hermes/commands/harvest/util/token.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2026 OFFIS e.V. +# SPDX-FileCopyrightText: 2026 UOL # # SPDX-License-Identifier: Apache-2.0 From e128e1510e41c58afc50b9b54a0b5098162778d7 Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Wed, 25 Feb 2026 10:58:15 +0100 Subject: [PATCH 08/11] Add detailed docstrings for better clarity on functions --- src/hermes/commands/harvest/util/clone.py | 121 +++++++++++++++++++--- 1 file changed, 109 insertions(+), 12 deletions(-) diff --git a/src/hermes/commands/harvest/util/clone.py b/src/hermes/commands/harvest/util/clone.py index 7e62dc87..2e0f5e3a 100644 --- a/src/hermes/commands/harvest/util/clone.py +++ b/src/hermes/commands/harvest/util/clone.py @@ -19,29 +19,62 @@ # ---------------- utilities ---------------- def _normalize_clone_url(url: str) -> str: + """ + Normalize a repository "clone target" into a format that `git clone` accepts. + + Supported inputs: + - SSH scp-like form: git@host:group/repo(.git) + - HTTPS URLs: https://host/group/repo(.git) + + Normalization rules: + - For SSH and HTTPS, append ".git" when missing (common, but not required by all hosts). + """ s = str(url).strip() + + # SSH scp-style: git@github.com:org/repo if re.match(r'^[\w.-]+@[\w.-]+:.*', s): return s if s.endswith('.git') else s + '.git' + + # file:// URLs should be passed as-is. if s.startswith('file://'): return s + + # If it's an existing local path if os.path.exists(s): return s + + # Parse normal URLs (http/https). p = urlparse(s) if p.scheme in ('http', 'https'): path = p.path if p.path.endswith('.git') else (p.path.rstrip('/') + '.git') return f"{p.scheme}://{p.netloc}{path}" + + # If the user already provided a .git suffix but it isn't http/https, accept it as-is. if s.endswith('.git'): return s raise ValueError(f"Unsupported repository URL format: {url!r}") def _clear_readonly(func, path, excinfo): """ - onerror handler for shutil.rmtree: try to remove read-only flag and retry. + Error handler for `shutil.rmtree(..., onerror=...)`. + + Purpose: + Some platforms/tools (notably Windows, antivirus scanners, or git itself) can leave files + marked read-only, causing deletion failures. This handler attempts to: + 1) Remove the read-only attribute, then + 2) Retry removal of the file/directory. + + Parameters: + func: The function that raised the exception (provided by shutil). + path: The filesystem path that couldn't be removed. + excinfo: Exception info tuple (type, value, traceback). """ + # make the path writable. try: os.chmod(path, stat.S_IWRITE) except Exception: pass + # retry deletion. try: if os.path.isdir(path): shutil.rmtree(path) @@ -52,10 +85,22 @@ def _clear_readonly(func, path, excinfo): def rmtree_with_retries(path: Path, retries: int = 6, initial_wait: float = 0.1): """ - Best-effort removal of path with retries and read-only handling. - - retries: number of attempts - - initial_wait: initial sleep (multiplies by 2 each retry) - Logs exceptions but never raises. + Recursive directory deletion with retries and read-only handling, for environments where temporary directories may be locked + or marked read-only (e.g., Windows, CI systems, antivirus interference). + + Behavior: + - If `path` doesn't exist: return immediately. + - Attempts deletion up to `retries` times. + - Between attempts, sleeps with exponential backoff: + wait = initial_wait, then wait *= 2 each retry. + - Makes files/directories writable before trying `shutil.rmtree`. + - Uses `_clear_readonly` for additional resilience. + - Never raises: logs warnings/errors and returns. + + Parameters: + path: Directory to remove. + retries: Number of attempts before giving up. + initial_wait: Starting sleep duration (seconds) for exponential backoff. """ if not path.exists(): return @@ -80,6 +125,7 @@ def rmtree_with_retries(path: Path, retries: int = 6, initial_wait: float = 0.1) shutil.rmtree(path, onerror=_clear_readonly) + # If deletion succeeded, stop. if not path.exists(): return except Exception as e: @@ -98,14 +144,31 @@ def rmtree_with_retries(path: Path, retries: int = 6, initial_wait: float = 0.1) except Exception: pass + # If still present, report and exit without raising. if path.exists(): print(f"error: failed to remove temp dir {path!s} after {retries} attempts. " f"Please remove it manually. (Often caused by antivirus or open handles.)") def _move_or_copy(src: Path, dst: Path): + """ + Move a directory into place, falling back to copy+delete when a move isn't possible. + + Primary strategy: + - `os.replace(src, dst)` performs an atomic rename/move when possible. + + Fallback strategy: + - If atomic move fails (commonly due to cross-device boundaries or permission issues), + copy the directory tree to `dst`, then remove `src` using robust cleanup. + + Parameters: + src: Source directory path (typically a temp clone directory). + dst: Destination directory path. + """ try: + # Fast path: atomic rename (preferred when possible) os.replace(str(src), str(dst)) except Exception: + # Cross-device or permission failure — fall back to copy + cleanup shutil.copytree(str(src), str(dst)) rmtree_with_retries(src) @@ -126,15 +189,42 @@ def clone_repository( verbose: bool = False, ) -> None: """ - Robust clone that guarantees best-effort cleanup of temp dirs. - - Creates temp directories next to the target dest_dir. - - Always tries to remove temp dirs (even on success/failure). + Clone a Git repository into a destination directory with optional + optimization, fallback, and sparse checkout support. + + Workflow: + 1. Normalize the repository URL. + 2. Attempt an optimized clone (shallow, filtered, sparse-enabled). + 3. If optimized clone fails, retry with a plain clone. + 4. Clone into a temporary directory and atomically move into place. + 5. Optionally configure sparse checkout after cloning. + 6. Clean up temporary directories. + + Parameters: + url: Repository URL or local path. + dest_dir: Target directory for the clone. + recursive: Whether to clone submodules. + depth: Shallow clone depth (None disables shallow clone). + filter_blobs: Use partial clone filter (`--filter=blob:none`). + sparse: Enable sparse checkout mode. + branch: Specific branch to checkout. + insecure_ssl: Disable SSL verification for Git (not recommended). + root_only: Restrict checkout to root-level files only. + include_files: Specific file patterns to include in sparse checkout. + verbose: Print command execution details and warnings. + + Raises: + RuntimeError: If both optimized and fallback clones fail, + or if destination exists and is non-empty. + ValueError: If the repository URL format is invalid. """ dest_path = Path(dest_dir) parent = dest_path.parent parent.mkdir(parents=True, exist_ok=True) clone_url = _normalize_clone_url(url) + + # Some GitLab setups have compatibility issues with partial/shallow clones is_gitlab = "gitlab.com" in url.lower() if is_gitlab: if verbose: @@ -144,12 +234,15 @@ def clone_repository( env = os.environ.copy() if insecure_ssl: + # Disables SSL verification (security risk; use only when necessary) env["GIT_SSL_NO_VERIFY"] = "1" created_temp_dirs: list[Path] = [] def build_cmd_for(temp_path: Path, optimized: bool): + """Construct the git clone command for optimized or fallback mode.""" cmd = ["git", "clone"] + if optimized: if branch: cmd += ["--branch", branch] @@ -162,12 +255,14 @@ def build_cmd_for(temp_path: Path, optimized: bool): if recursive: cmd += ["--recurse-submodules"] else: + # Fallback clone uses minimal options for maximum compatibility if branch: cmd += ["--branch", branch] cmd += [clone_url, str(temp_path)] return cmd def attempt_clone(optimized: bool): + """Execute a clone attempt into a new temporary directory.""" tmp = Path(tempfile.mkdtemp(prefix="clone_tmp_", dir=str(parent))) created_temp_dirs.append(tmp) cmd = build_cmd_for(tmp, optimized) @@ -177,13 +272,13 @@ def attempt_clone(optimized: bool): return proc.returncode, proc, tmp try: - # Try optimized + # First attempt: optimized clone rc1, p1, tmp1 = attempt_clone(optimized=True) if rc1 != 0: if verbose: print("warn: optimized clone failed. stderr:") print(p1.stderr.strip() or "(no stderr)") - # Try fallback plain clone + # Second attempt: plain clone rc2, p2, tmp2 = attempt_clone(optimized=False) if rc2 != 0: # both failed -> raise with both stderr @@ -193,7 +288,7 @@ def attempt_clone(optimized: bool): f"Fallback STDERR:\n{p2.stderr}\n" ) - # fallback succeeded: move into place + # Ensure destination is safe to populate if dest_path.exists(): if any(dest_path.iterdir()): raise RuntimeError(f"Destination '{dest_path}' already exists and is not empty. Won't overwrite.") @@ -205,7 +300,7 @@ def attempt_clone(optimized: bool): print("✅ Repository cloned successfully (fallback/full clone).") return - # optimized succeeded (tmp1) + # Optimized clone succeeded if dest_path.exists(): if any(dest_path.iterdir()): raise RuntimeError(f"Destination '{dest_path}' already exists and is not empty. Won't overwrite.") @@ -225,6 +320,7 @@ def attempt_clone(optimized: bool): ) patterns: list[str] = [] if root_only: + # Include root-level files but exclude subdirectories patterns += ["/*", "!/*/"] if include_files: for p in include_files: @@ -242,6 +338,7 @@ def attempt_clone(optimized: bool): print("warn: sparse-checkout setup failed:", e) finally: + # Always attempt to clean up temporary directories for t in created_temp_dirs: try: rmtree_with_retries(t) From 52d995f939aaa5d74378c8a6ffe8d7b9cd5d4e3d Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Wed, 25 Feb 2026 11:10:57 +0100 Subject: [PATCH 09/11] Add detailed docstrings for better clarity on functions --- src/hermes/commands/harvest/util/token.py | 48 +++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/hermes/commands/harvest/util/token.py b/src/hermes/commands/harvest/util/token.py index 5b2559a8..4faac741 100644 --- a/src/hermes/commands/harvest/util/token.py +++ b/src/hermes/commands/harvest/util/token.py @@ -11,17 +11,47 @@ def _load_config(config_path: str) -> dict: + """ + Load a TOML configuration file. + + If the file exists and contains valid TOML, its content is returned as a + dictionary. If the file does not exist or contains invalid TOML, an empty + dictionary is returned. + + Args: + config_path: Path to the TOML configuration file. + + Returns: + A dictionary representing the parsed TOML content, + or an empty dictionary if loading fails. + """ path = Path(config_path) + + # Check whether the configuration file exists if path.exists(): try: + # Open the file in read mode and parse TOML content with path.open("r") as f: return toml.load(f) except toml.TomlDecodeError: + # Return empty config if TOML is malformed return {} + + # Return empty config if file does not exist return {} def _save_config(config: dict, config_path: str) -> None: + """ + Save a dictionary to a TOML configuration file. + + This function overwrites the target file if it already exists. + + Args: + config: Dictionary containing configuration data. + config_path: Path to the TOML configuration file. + """ + # Open the file in write mode and dump TOML content with Path(config_path).open("w") as f: toml.dump(config, f) @@ -34,12 +64,19 @@ def update_token_to_toml(token: str, config_path: str = "hermes.toml") -> None: token: The personal token key to be set. config_path: Path to the TOML config file. """ + # Encode the token using base64 encoded_token = base64.b64encode(token.encode()).decode() + + # Load existing configuration (or empty dict if not present) config = _load_config(config_path) + # Ensure the "harvest" section exists config.setdefault("harvest", {}) + + # Store the encoded token in the "harvest" section config["harvest"]["token"] = encoded_token + # Persist updated configuration back to file _save_config(config, config_path) @@ -53,8 +90,13 @@ def load_token_from_toml(config_path: str = "hermes.toml") -> str | None: Returns: The decoded token if present, otherwise None. """ + # Load configuration from file config = _load_config(config_path) + + # Safely retrieve the encoded token from nested structure encoded_token = config.get("harvest", {}).get("token") + + # Decode and return the token if available, return None if no token is stored return base64.b64decode(encoded_token.encode()).decode() if encoded_token else None @@ -65,7 +107,13 @@ def remove_token_from_toml(config_path: str = "hermes.toml") -> None: Args: config_path: Path to the TOML config file. """ + # Load existing configuration config = _load_config(config_path) + + # Check whether the token exists in the "harvest" section if "token" in config.get("harvest", {}): + # Delete the token entry del config["harvest"]["token"] + + # Save updated configuration back to file _save_config(config, config_path) From abca6b3879366b3006b412c8dc3d933bd57d8a9e Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Thu, 26 Feb 2026 12:05:44 +0100 Subject: [PATCH 10/11] Add bulk repository test for HERMES metadata harvesting This script bulk-tests HERMES metadata harvesting across multiple repositories, checking for expected metadata files and generating a CSV report. --- src/hermes/tests/bulk_repository_test.py | 283 +++++++++++++++++++++++ 1 file changed, 283 insertions(+) create mode 100644 src/hermes/tests/bulk_repository_test.py diff --git a/src/hermes/tests/bulk_repository_test.py b/src/hermes/tests/bulk_repository_test.py new file mode 100644 index 00000000..3a66a8d4 --- /dev/null +++ b/src/hermes/tests/bulk_repository_test.py @@ -0,0 +1,283 @@ +# SPDX-FileCopyrightText: 2026 UOL +# +# SPDX-License-Identifier: Apache-2.0 + +# SPDX-FileContributor: Stephan Ferenz +# SPDX-FileContributor: Aida Jafarbigloo + +""" +Bulk-test HERMES metadata harvesting across multiple repositories. + +This script: +- Loads a list of repository URLs from `test_repositories.json` (located next to this script). +- For each repository, runs `hermes clean` followed by `hermes harvest`. +- Checks whether expected harvested metadata files exist under `.hermes/harvest/`. +- Compares current results to a persisted state file (`hermes_bulk_test_state.json`) to detect regressions + (previously "yes" turning into "no"). +- Writes a CSV report (`hermes_bulk_test_results.csv`). + +Notes: +- Tokens are currently set as empty strings, need to be set before use. +""" + +import subprocess +import pandas as pd +from pathlib import Path +import json + + +# ---------------- REPOSITORIES ---------------- +# Path to this script's directory +HERE = Path(__file__).resolve().parent + +# JSON file containing a list of repository URLs to test. +repositories_file = HERE / "test_repositories.json" + +# Load repositories list, fall back to empty list if the file doesn't exist. +if Path(repositories_file).exists(): + with open(repositories_file, "r", encoding="utf-8") as fh: + repositories = json.load(fh) +else: + repositories = [] + print(f"Warning: {repositories_file} not found. Creating empty list.") + +# ---------------- TOKENS ---------------- +# Access tokens for private repos / rate limits. +# Set these before running. +GITHUB_TOKEN = "" +GITLAB_TOKEN = "" + +# ---------------- RESULTS STORAGE ---------------- +# In-memory accumulation of per-repository run results. +results = [] + +# JSON file that stores previous yes/no answers per URL to detect regressions across runs. +state_file = "hermes_bulk_test_state.json" + +# Load previous state (if any). +if Path(state_file).exists(): + with open(state_file, "r", encoding="utf-8") as fh: + prev_state = json.load(fh) +else: + prev_state = {} + +# ---------------- HELPER FUNCTIONS---------------- +def get_token_for_repo(url: str) -> str: + """ + Return the correct access token for a given repository URL. + + Heuristic: + - If the URL contains "github" (case-insensitive), return `GITHUB_TOKEN`. + - Otherwise return `GITLAB_TOKEN`. + + Args: + url: Repository URL. + + Returns: + The token string to use. + """ + if "github" in url.lower(): + return GITHUB_TOKEN + else: + return GITLAB_TOKEN + +def get_token_type_for_repo(url: str) -> str: + """ + Return a human-readable label for the token type selected for the URL. + + Args: + url: Repository URL. + + Returns: + "GitHub token" if URL appears to be GitHub, else "GitLab token". + """ + if "github" in url.lower(): + return "GitHub token" + else: + return "GitLab token" + +def get_repo_name(url: str) -> str: + """ + Extract the repository name from a URL. + + Example: + "https://github.com/org/repo/" -> "repo" + + Args: + url: Repository URL. + + Returns: + The last non-empty path segment. + """ + return url.rstrip("/").split("/")[-1] + +def check_harvested_metadata(repo_name: str) -> str: + """ + Check whether expected HERMES-harvested metadata files (under .hermes/harvest/) exist. + + Files checked: + - githublab.json -> "githublab_metadata" + - cff.json -> "cff_metadata" + - codemeta.json -> "codemeta_metadata" + + The overall "harvest_result" is: + - "success" if *any* of the files exists + - "failed" otherwise + + Args: + repo_name: Repository name (currently unused, kept for potential future per-repo paths/logging). + + Returns: + A dict with yes/no flags and an overall "harvest_result" field: + { + "githublab_metadata": "yes"|"no", + "cff_metadata": "yes"|"no", + "codemeta_metadata": "yes"|"no", + "harvest_result": "success"|"failed" + } + """ + hermes_dir = Path(".hermes") / "harvest" + + # Default: nothing found. + files_exist = { + "githublab_metadata": "no", + "cff_metadata": "no", + "codemeta_metadata": "no", + "harvest_result": "failed" + } + + # If HERMES didn't create the directory, there is nothing to check. + if not hermes_dir.exists(): + return files_exist + + # Check file existence for each expected output. + if (hermes_dir / "githublab.json").exists(): + files_exist["githublab_metadata"] = "yes" + if (hermes_dir / "cff.json").exists(): + files_exist["cff_metadata"] = "yes" + if (hermes_dir / "codemeta.json").exists(): + files_exist["codemeta_metadata"] = "yes" + + # general harvest_result: success if any metadata exists + if "yes" in (files_exist["githublab_metadata"], files_exist["cff_metadata"], files_exist["codemeta_metadata"]): + files_exist["harvest_result"] = "success" + + # Debug output (kept verbose to help diagnose missing outputs). + print("Files exist status:") + print(files_exist) + + return files_exist + +# ---------------- BULK TEST LOOP ---------------- +for url in repositories: + # Select token and labels based on URL host. + token = get_token_for_repo(url) + token_type = get_token_type_for_repo(url) + repo_name = get_repo_name(url) + + print(f"\n=== Testing repository: {url} ===") + + # Default values for this repository run. + error_message = "" + metadata_info = {} + + try: + # Step 1: Clean previous metadata + subprocess.run(["hermes", "clean"], check=True) + print("✅ 'hermes clean' finished.") + + # Step 2: Run harvest for the repository. + # Capture stdout/stderr to include HERMES output as an error message when needed. + proc = subprocess.run( + ["hermes", "harvest", "--url", url, "--token", token], + capture_output=True, text=True, + cwd=Path(".").resolve() + ) + print("✅ 'hermes harvest' finished.") + + + # Step 3: Verify expected harvested metadata files are present. + metadata_info = check_harvested_metadata(repo_name) + + # If we failed to find any metadata, use HERMES CLI output to aid debugging. + if metadata_info["harvest_result"] == "failed": + error_message = proc.stderr.strip() or proc.stdout.strip() + + except Exception as e: + # Any exception is treated as a failed harvest with no metadata files. + metadata_info = { + "githublab_metadata": "no", + "cff_metadata": "no", + "codemeta_metadata": "no", + "harvest_result": "failed" + } + error_message = str(e) + + # ---------------- REGRESSION CHECK vs PREVIOUS RUN ---------------- + # Default is "true" (= no regression detected). + # A regression is defined as: previously "yes" but now "no" for any check (use prev_state). + compare_result = "true" + prev_for_url = prev_state.get(url, {}) + + # map displayed columns to metadata_info keys + checks = [ + ("CFF", "cff_metadata"), + ("CodeMeta", "codemeta_metadata"), + ("GitHubLab", "githublab_metadata"), + ] + for prev_col, curr_key in checks: + prev_val = str(prev_for_url.get(prev_col, "")).strip().lower() + curr_val = str(metadata_info.get(curr_key, "no")).strip().lower() + # if previously yes and now no => incorrect + if prev_val == "yes" and curr_val == "no": + compare_result = "false" + break + + # Store Results + # save current answers into results and into state for next run + results.append({ + "url": url, + "token_used": token_type, + "error_message": error_message, + "harvest_result": compare_result, + "githublab_metadata": metadata_info["githublab_metadata"], + "cff_metadata": metadata_info["cff_metadata"], + "codemeta_metadata": metadata_info["codemeta_metadata"] + }) + + # update prev_state for this url to current answers (CFF/CodeMeta/GitHubLab) + prev_state[url] = { + "CFF": metadata_info["cff_metadata"], + "CodeMeta": metadata_info["codemeta_metadata"], + "GitHubLab": metadata_info["githublab_metadata"] + } + +# ---------------- PERSIST UPDATED STATE ---------------- +# Save updated per-URL state so the next run can detect regressions. +with open(state_file, "w", encoding="utf-8") as fh: + json.dump(prev_state, fh, indent=2, ensure_ascii=False) + +# ---------------- BUILD DATAFRAME AND PRINT TABLE ---------------- +# build DataFrame from accumulated results +df = pd.DataFrame(results) + +# normalize/rename columns for nicer CLI output +col_map = { + "url": "URL", + "token_used": "Token", + "result": "Compared Result", + "githublab_metadata": "GitHubLab", + "cff_metadata": "CFF", + "codemeta_metadata": "CodeMeta", + "error_message": "Error" +} +df = df.rename(columns=col_map) + +# ensure column order +cols = ["URL", "Token", "Compared Result", "GitHubLab", "CFF", "CodeMeta", "Error"] +df = df[[c for c in cols if c in df.columns]] + +# ---------------- SAVE CSV ---------------- +csv_file = "hermes_bulk_test_results.csv" +df.to_csv(csv_file, index=False) +print(f"\nResults saved to {csv_file}") From c0c71d178286e11eea8d59dbf4e56a0667a348ad Mon Sep 17 00:00:00 2001 From: Aida Jafarbigloo Date: Thu, 26 Feb 2026 12:07:07 +0100 Subject: [PATCH 11/11] Create test_repositories.json with sample URLs Add test repositories for bulk testing. --- src/hermes/tests/test_repositories.json | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 src/hermes/tests/test_repositories.json diff --git a/src/hermes/tests/test_repositories.json b/src/hermes/tests/test_repositories.json new file mode 100644 index 00000000..2d06b60b --- /dev/null +++ b/src/hermes/tests/test_repositories.json @@ -0,0 +1,6 @@ +[ + "https://github.com/NFDI4Energy/SMECS", + "https://github.com/softwarepub/hermes", + "https://github.com/KnowledgeCaptureAndDiscovery/somef", + "https://gitlab.com/zdin-zle/zle-platform/repository/meta_tool" +]