diff --git a/features/feature-spec-discovery.md b/features/feature-spec-discovery.md index 5f858a6..416ff2c 100644 --- a/features/feature-spec-discovery.md +++ b/features/feature-spec-discovery.md @@ -78,6 +78,22 @@ Add shared discovery infrastructure for Issues #125 and #126: centralized parser **When** the content is trivial (10 chars or fewer) **Then** it is skipped and not emitted as a discovery item. +### Story 8: Python test-function mining +**Scenario:** As a discovery pipeline, I need to extract executable Python test signals. +**Given** Python test files selected from `FileIndex` +**When** `PythonTestMiner` runs +**Then** it emits `DiscoveredItem(kind=TEST_FUNCTION)` entries for top-level `test_` functions and `test_` methods under `Test*` classes. + +**Scenario:** As a miner maintainer, I need framework and metadata fidelity. +**Given** framework detection from `ctx.frameworks[SupportedLanguage.PYTHON]` +**When** test items are emitted +**Then** metadata validates against `TestFunctionMeta`, including decorator names, docstring flags, class context, and parametrization detection. + +**Scenario:** As a pipeline operator, I need resilient parse handling. +**Given** one malformed Python test file and one valid file +**When** `PythonTestMiner` executes +**Then** it reports `MinerErrorKind.PARSE_ERROR` for parse failures and still returns items from valid files. + ## Acceptance Criteria - Language abstraction returns `SupportedLanguage` members for `.py`, `.ts`, `.tsx`, `.js`, `.jsx`, `.mjs` and `None` otherwise. - `LanguageRegistry().parse(path_to_py_file)` returns `(node, SupportedLanguage.PYTHON)` for valid Python input. @@ -104,3 +120,7 @@ Add shared discovery infrastructure for Issues #125 and #126: centralized parser - TypeScript/JavaScript JSDoc comments immediately preceding declarations are emitted with the correct `SupportedLanguage`. - Test files are excluded from docstring mining and configured `source_dirs` scope is respected. - Trivial `__init__` docstrings (<=10 chars) are skipped. +- `PythonTestMiner` reads candidate files from `ctx.file_index.files_matching("test_*.py", "*_test.py")` and does not walk the filesystem directly. +- `PythonTestMiner` uses precomputed frameworks from `ctx.frameworks[SupportedLanguage.PYTHON]` rather than re-detecting frameworks. +- Python test metadata validates against `TestFunctionMeta`, including `is_parametrized` and `class_name` values. +- Parse failures in individual test files set `MinerResult.error_kind=PARSE_ERROR` without aborting extraction from remaining files. diff --git a/src/specleft/discovery/config.py b/src/specleft/discovery/config.py index 371112b..82f0667 100644 --- a/src/specleft/discovery/config.py +++ b/src/specleft/discovery/config.py @@ -114,7 +114,7 @@ def _resolve_toml_loader() -> Any | None: return tomllib except ModuleNotFoundError: try: - import tomli # type: ignore[import-not-found] + import tomli return tomli except ModuleNotFoundError: diff --git a/src/specleft/discovery/frameworks/io.py b/src/specleft/discovery/frameworks/io.py index 748ceb3..ad41c53 100644 --- a/src/specleft/discovery/frameworks/io.py +++ b/src/specleft/discovery/frameworks/io.py @@ -146,7 +146,7 @@ def resolve_toml_loader() -> Any | None: return tomllib except ModuleNotFoundError: try: - import tomli # type: ignore[import-not-found] + import tomli return tomli except ModuleNotFoundError: diff --git a/src/specleft/discovery/language_registry.py b/src/specleft/discovery/language_registry.py index c3e60b4..2214f89 100644 --- a/src/specleft/discovery/language_registry.py +++ b/src/specleft/discovery/language_registry.py @@ -81,10 +81,10 @@ def _parser_for(self, language: SupportedLanguage) -> Any | None: return None try: - from tree_sitter import Parser # type: ignore[import-untyped] + from tree_sitter import Parser parser = Parser() - parser.set_language(language_obj) + parser.language = language_obj self._parser_cache[language] = parser return parser except Exception: @@ -96,14 +96,14 @@ def _language_for(self, language: SupportedLanguage) -> Any | None: if language == SupportedLanguage.PYTHON: try: - import tree_sitter_python # type: ignore[import-not-found] + import tree_sitter_python language_obj = tree_sitter_python.language() except Exception: return None elif language in (SupportedLanguage.TYPESCRIPT, SupportedLanguage.JAVASCRIPT): try: - import tree_sitter_typescript # type: ignore[import-not-found] + import tree_sitter_typescript if language == SupportedLanguage.TYPESCRIPT: language_obj = tree_sitter_typescript.language_typescript() diff --git a/src/specleft/discovery/miners/__init__.py b/src/specleft/discovery/miners/__init__.py index 0f23a79..359cba5 100644 --- a/src/specleft/discovery/miners/__init__.py +++ b/src/specleft/discovery/miners/__init__.py @@ -4,6 +4,13 @@ """Discovery miner implementations.""" from specleft.discovery.miners.defaults import default_miners -from specleft.discovery.miners.shared import DocstringMiner, ReadmeOverviewMiner +from specleft.discovery.miners.python.tests import PythonTestMiner +from specleft.discovery.miners.shared.docstrings import DocstringMiner +from specleft.discovery.miners.shared.readme import ReadmeOverviewMiner -__all__ = ["DocstringMiner", "ReadmeOverviewMiner", "default_miners"] +__all__ = [ + "DocstringMiner", + "PythonTestMiner", + "ReadmeOverviewMiner", + "default_miners", +] diff --git a/src/specleft/discovery/miners/defaults.py b/src/specleft/discovery/miners/defaults.py index f6af3bf..51d8fd6 100644 --- a/src/specleft/discovery/miners/defaults.py +++ b/src/specleft/discovery/miners/defaults.py @@ -7,7 +7,9 @@ from typing import TYPE_CHECKING -from specleft.discovery.miners.shared import DocstringMiner, ReadmeOverviewMiner +from specleft.discovery.miners.python.tests import PythonTestMiner +from specleft.discovery.miners.shared.docstrings import DocstringMiner +from specleft.discovery.miners.shared.readme import ReadmeOverviewMiner if TYPE_CHECKING: from specleft.discovery.pipeline import BaseMiner @@ -15,4 +17,4 @@ def default_miners() -> list[BaseMiner]: """Return default miners in deterministic execution order.""" - return [ReadmeOverviewMiner(), DocstringMiner()] + return [ReadmeOverviewMiner(), PythonTestMiner(), DocstringMiner()] diff --git a/src/specleft/discovery/miners/python/__init__.py b/src/specleft/discovery/miners/python/__init__.py index b933028..1088682 100644 --- a/src/specleft/discovery/miners/python/__init__.py +++ b/src/specleft/discovery/miners/python/__init__.py @@ -4,5 +4,6 @@ """Python-specific discovery miners.""" from specleft.discovery.miners.python.docstrings import extract_python_items +from specleft.discovery.miners.python.tests import PythonTestMiner -__all__ = ["extract_python_items"] +__all__ = ["PythonTestMiner", "extract_python_items"] diff --git a/src/specleft/discovery/miners/python/tests.py b/src/specleft/discovery/miners/python/tests.py new file mode 100644 index 0000000..6027132 --- /dev/null +++ b/src/specleft/discovery/miners/python/tests.py @@ -0,0 +1,258 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 SpecLeft Contributors + +"""Python test-function miner.""" + +from __future__ import annotations + +import ast +import time +import uuid +from pathlib import Path +from typing import Any + +from specleft.discovery.context import MinerContext +from specleft.discovery.miners.shared.common import line_number, node_text +from specleft.discovery.models import ( + DiscoveredItem, + ItemKind, + MinerErrorKind, + MinerResult, + SupportedLanguage, + TestFunctionMeta, +) + +_TEST_PATTERNS = ("test_*.py", "*_test.py") +_KNOWN_FRAMEWORKS = {"pytest", "unittest"} + + +class PythonTestMiner: + """Extract Python test functions from indexed test files.""" + + miner_id = uuid.UUID("a7b21db5-0d22-41be-9902-7c725e63892e") + name = "python_test_functions" + languages = frozenset({SupportedLanguage.PYTHON}) + + def mine(self, ctx: MinerContext) -> MinerResult: + started = time.perf_counter() + framework = _primary_framework(ctx) + items: list[DiscoveredItem] = [] + parse_failures: list[Path] = [] + + for rel_path in ctx.file_index.files_matching(*_TEST_PATTERNS): + abs_path = ctx.root / rel_path + parsed = ctx.registry.parse(abs_path) + if parsed is None: + parse_failures.append(rel_path) + continue + + root_node, language = parsed + if language is not SupportedLanguage.PYTHON: + continue + + try: + source_bytes = abs_path.read_bytes() + except OSError: + parse_failures.append(rel_path) + continue + + items.extend( + _extract_test_items( + root_node=root_node, + source_bytes=source_bytes, + file_path=rel_path, + framework=framework, + ) + ) + + error_kind: MinerErrorKind | None = None + error: str | None = None + if parse_failures: + error_kind = MinerErrorKind.PARSE_ERROR + files = ", ".join(path.as_posix() for path in parse_failures) + error = f"Failed to parse Python test files: {files}" + + return MinerResult( + miner_id=self.miner_id, + miner_name=self.name, + items=items, + error=error, + error_kind=error_kind, + duration_ms=max(0, int((time.perf_counter() - started) * 1000)), + ) + + +def _primary_framework(ctx: MinerContext) -> str: + frameworks = ctx.frameworks.get(SupportedLanguage.PYTHON, []) + return frameworks[0] if frameworks else "unknown" + + +def _extract_test_items( + *, + root_node: Any, + source_bytes: bytes, + file_path: Path, + framework: str, +) -> list[DiscoveredItem]: + items: list[DiscoveredItem] = [] + for node in getattr(root_node, "named_children", ()): + test_function = _extract_function(node, source_bytes) + if test_function is not None: + function_node, decorators = test_function + item = _to_discovered_item( + function_node=function_node, + decorators=decorators, + source_bytes=source_bytes, + file_path=file_path, + framework=framework, + class_name=None, + ) + if item is not None: + items.append(item) + continue + + if node.type != "class_definition": + continue + + class_name = _field_text(node, "name", source_bytes) + if not class_name or not class_name.startswith("Test"): + continue + + body = node.child_by_field_name("body") + if body is None: + continue + + for member in getattr(body, "named_children", ()): + method = _extract_function(member, source_bytes) + if method is None: + continue + function_node, decorators = method + item = _to_discovered_item( + function_node=function_node, + decorators=decorators, + source_bytes=source_bytes, + file_path=file_path, + framework=framework, + class_name=class_name, + ) + if item is not None: + items.append(item) + + return items + + +def _extract_function(node: Any, source_bytes: bytes) -> tuple[Any, list[str]] | None: + if node.type in {"function_definition", "async_function_definition"}: + return node, [] + + if node.type != "decorated_definition": + return None + + definition = node.child_by_field_name("definition") + if definition is None or definition.type not in { + "function_definition", + "async_function_definition", + }: + return None + + decorators = [ + _normalize_decorator(node_text(child, source_bytes)) + for child in getattr(node, "named_children", ()) + if child.type == "decorator" + ] + return definition, [value for value in decorators if value] + + +def _to_discovered_item( + *, + function_node: Any, + decorators: list[str], + source_bytes: bytes, + file_path: Path, + framework: str, + class_name: str | None, +) -> DiscoveredItem | None: + name = _field_text(function_node, "name", source_bytes) + if not name or not name.startswith("test_"): + return None + + docstring = _extract_docstring(function_node, source_bytes) + is_parametrized = any(decorator.endswith("parametrize") for decorator in decorators) + confidence = 0.9 if framework in _KNOWN_FRAMEWORKS else 0.7 + + metadata = TestFunctionMeta( + framework=framework, + class_name=class_name, + decorators=decorators, + has_docstring=docstring is not None, + docstring=docstring, + is_parametrized=is_parametrized, + ) + + return DiscoveredItem( + kind=ItemKind.TEST_FUNCTION, + name=name, + file_path=file_path, + line_number=line_number(function_node), + language=SupportedLanguage.PYTHON, + raw_text=docstring, + metadata=metadata.model_dump(), + confidence=confidence, + ) + + +def _normalize_decorator(raw: str) -> str: + normalized = raw.strip() + if normalized.startswith("@"): + normalized = normalized[1:] + return normalized.split("(", 1)[0].strip() + + +def _extract_docstring(function_node: Any, source_bytes: bytes) -> str | None: + body = function_node.child_by_field_name("body") + if body is None: + return None + + named_children = list(getattr(body, "named_children", ())) + if not named_children: + return None + + first = named_children[0] + if first.type != "expression_statement": + return None + + for child in getattr(first, "named_children", ()): + if child.type in {"string", "concatenated_string"}: + return _clean_python_string(node_text(child, source_bytes)) + return None + + +def _clean_python_string(value: str) -> str | None: + stripped = value.strip() + if not stripped: + return None + + try: + parsed = ast.literal_eval(stripped) + except (SyntaxError, ValueError): + parsed = _strip_wrapping_quotes(stripped) + if not isinstance(parsed, str): + return None + + cleaned = parsed.strip() + return cleaned or None + + +def _strip_wrapping_quotes(value: str) -> str: + for quote in ('"""', "'''", '"', "'"): + if value.startswith(quote) and value.endswith(quote) and len(value) >= 2: + return value[len(quote) : len(value) - len(quote)].strip() + return value + + +def _field_text(node: Any, field: str, source_bytes: bytes) -> str | None: + field_node = node.child_by_field_name(field) + if field_node is None: + return None + text = node_text(field_node, source_bytes).strip() + return text or None diff --git a/src/specleft/discovery/miners/shared/__init__.py b/src/specleft/discovery/miners/shared/__init__.py index 8e11f15..3910fa5 100644 --- a/src/specleft/discovery/miners/shared/__init__.py +++ b/src/specleft/discovery/miners/shared/__init__.py @@ -3,7 +3,18 @@ """Shared miners used by multiple discovery workflows.""" -from specleft.discovery.miners.shared.docstrings import DocstringMiner -from specleft.discovery.miners.shared.readme import ReadmeOverviewMiner +from __future__ import annotations __all__ = ["DocstringMiner", "ReadmeOverviewMiner"] + + +def __getattr__(name: str) -> object: + if name == "DocstringMiner": + from specleft.discovery.miners.shared.docstrings import DocstringMiner + + return DocstringMiner + if name == "ReadmeOverviewMiner": + from specleft.discovery.miners.shared.readme import ReadmeOverviewMiner + + return ReadmeOverviewMiner + raise AttributeError(name) diff --git a/tests/discovery/miners/test_python_tests.py b/tests/discovery/miners/test_python_tests.py new file mode 100644 index 0000000..5d60de7 --- /dev/null +++ b/tests/discovery/miners/test_python_tests.py @@ -0,0 +1,268 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 SpecLeft Contributors + +"""Tests for Python test-function discovery miner.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from specleft.discovery.config import DiscoveryConfig +from specleft.discovery.context import MinerContext +from specleft.discovery.file_index import FileIndex +from specleft.discovery.miners.python.tests import PythonTestMiner +from specleft.discovery.models import ( + MinerErrorKind, + SupportedLanguage, + TestFunctionMeta as _TestFunctionMeta, +) + + +@dataclass +class _FakeNode: + type: str + text_value: str = "" + children: list[_FakeNode] = field(default_factory=list) + named_children: list[_FakeNode] = field(default_factory=list) + fields: dict[str, _FakeNode] = field(default_factory=dict) + start_point: tuple[int, int] = (0, 0) + end_point: tuple[int, int] = (0, 0) + + @property + def text(self) -> bytes: + return self.text_value.encode("utf-8") + + def child_by_field_name(self, name: str) -> _FakeNode | None: + return self.fields.get(name) + + +class _RegistryStub: + def __init__( + self, mapping: dict[Path, tuple[Any, SupportedLanguage] | None] + ) -> None: + self._mapping = mapping + self.calls: list[Path] = [] + + def parse(self, file_path: Path) -> tuple[Any, SupportedLanguage] | None: + self.calls.append(file_path) + return self._mapping.get(file_path) + + +def _identifier(value: str, row: int) -> _FakeNode: + return _FakeNode( + type="identifier", + text_value=value, + start_point=(row, 0), + end_point=(row, len(value)), + ) + + +def _docstring_expr(value: str, row: int) -> _FakeNode: + string_node = _FakeNode( + type="string", + text_value=value, + start_point=(row, 0), + end_point=(row, len(value)), + ) + return _FakeNode( + type="expression_statement", + children=[string_node], + named_children=[string_node], + start_point=(row, 0), + end_point=(row, len(value)), + ) + + +def _python_function(name: str, row: int, docstring: str | None = None) -> _FakeNode: + name_node = _identifier(name, row) + body_children: list[_FakeNode] = [] + if docstring is not None: + body_children.append(_docstring_expr(docstring, row + 1)) + body = _FakeNode( + type="block", + children=list(body_children), + named_children=list(body_children), + start_point=(row + 1, 0), + end_point=(row + 2, 0), + ) + return _FakeNode( + type="function_definition", + children=[name_node, body], + named_children=[name_node, body], + fields={"name": name_node, "body": body}, + start_point=(row, 0), + end_point=(row + 3, 0), + ) + + +def _decorated_function( + *, + name: str, + row: int, + decorators: list[str], + docstring: str | None = None, +) -> _FakeNode: + decorator_nodes: list[_FakeNode] = [] + for index, decorator in enumerate(decorators): + decorator_nodes.append( + _FakeNode( + type="decorator", + text_value=decorator, + start_point=(row + index, 0), + end_point=(row + index, len(decorator)), + ) + ) + + function_node = _python_function( + name=name, + row=row + len(decorator_nodes), + docstring=docstring, + ) + children = [*decorator_nodes, function_node] + return _FakeNode( + type="decorated_definition", + children=children, + named_children=children, + fields={"definition": function_node}, + start_point=(row, 0), + end_point=function_node.end_point, + ) + + +def _python_class(name: str, row: int, methods: list[_FakeNode]) -> _FakeNode: + name_node = _identifier(name, row) + body = _FakeNode( + type="block", + children=list(methods), + named_children=list(methods), + start_point=(row + 1, 0), + end_point=(row + 2, 0), + ) + return _FakeNode( + type="class_definition", + children=[name_node, body], + named_children=[name_node, body], + fields={"name": name_node, "body": body}, + start_point=(row, 0), + end_point=(row + 3, 0), + ) + + +def _python_test_tree() -> _FakeNode: + plain = _python_function("test_add", row=0) + parametrized = _decorated_function( + name="test_parametrized", + row=4, + decorators=['@pytest.mark.parametrize("value", [1, 2])'], + docstring='"""Parametrized case."""', + ) + testcase = _python_class( + "TestMath", + row=10, + methods=[ + _python_function("test_subtract", row=11), + _python_function("helper", row=15), + ], + ) + helper = _python_function("helper_function", row=20) + + children = [plain, parametrized, testcase, helper] + return _FakeNode( + type="module", + children=children, + named_children=children, + start_point=(0, 0), + end_point=(24, 0), + ) + + +def _context( + root: Path, + registry: _RegistryStub, + *, + frameworks: dict[SupportedLanguage, list[str]] | None = None, +) -> MinerContext: + return MinerContext( + root=root, + registry=registry, # type: ignore[arg-type] + file_index=FileIndex(root), + frameworks=frameworks or {}, + config=DiscoveryConfig.default(), + ) + + +def _fixture_dir() -> Path: + return Path(__file__).resolve().parents[2] / "fixtures" / "discovery" + + +def test_python_test_miner_extracts_plain_parametrized_and_testcase( + tmp_path: Path, +) -> None: + fixture_file = _fixture_dir() / "sample_tests.py" + test_file = tmp_path / "tests" / "test_sample_tests.py" + test_file.parent.mkdir(parents=True) + test_file.write_text(fixture_file.read_text(encoding="utf-8"), encoding="utf-8") + non_test_file = tmp_path / "src" / "sample_tests.py" + non_test_file.parent.mkdir(parents=True) + non_test_file.write_text("def helper():\n return 1\n", encoding="utf-8") + + registry = _RegistryStub( + { + test_file: (_python_test_tree(), SupportedLanguage.PYTHON), + non_test_file: (_python_test_tree(), SupportedLanguage.PYTHON), + } + ) + + result = PythonTestMiner().mine( + _context(tmp_path, registry, frameworks={SupportedLanguage.PYTHON: ["pytest"]}) + ) + + assert result.error is None + assert result.error_kind is None + assert len(result.items) == 3 + assert registry.calls == [test_file] + assert all(item.language == SupportedLanguage.PYTHON for item in result.items) + assert all( + isinstance(item.typed_meta(), _TestFunctionMeta) for item in result.items + ) + + by_name = {item.name: item for item in result.items} + assert set(by_name) == {"test_add", "test_parametrized", "test_subtract"} + assert by_name["test_add"].metadata["class_name"] is None + assert by_name["test_subtract"].metadata["class_name"] == "TestMath" + assert by_name["test_parametrized"].metadata["is_parametrized"] is True + assert by_name["test_parametrized"].metadata["decorators"] == [ + "pytest.mark.parametrize" + ] + assert by_name["test_parametrized"].metadata["has_docstring"] is True + assert by_name["test_add"].metadata["framework"] == "pytest" + assert all(item.confidence == 0.9 for item in result.items) + + +def test_python_test_miner_reports_parse_errors_and_keeps_items( + tmp_path: Path, +) -> None: + fixture_file = _fixture_dir() / "sample_tests.py" + good_file = tmp_path / "tests" / "test_good.py" + bad_file = tmp_path / "tests" / "test_bad.py" + good_file.parent.mkdir(parents=True) + good_file.write_text(fixture_file.read_text(encoding="utf-8"), encoding="utf-8") + bad_file.write_text("def broken(\n", encoding="utf-8") + + registry = _RegistryStub( + { + good_file: (_python_test_tree(), SupportedLanguage.PYTHON), + bad_file: None, + } + ) + + result = PythonTestMiner().mine(_context(tmp_path, registry)) + + assert result.error_kind == MinerErrorKind.PARSE_ERROR + assert result.error is not None + assert "tests/test_bad.py" in result.error + assert len(result.items) == 3 + assert all(item.metadata["framework"] == "unknown" for item in result.items) + assert all(item.confidence == 0.7 for item in result.items)