Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions features/feature-spec-discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ Add shared discovery infrastructure for Issues #125 and #126: centralized parser
**When** the content is trivial (10 chars or fewer)
**Then** it is skipped and not emitted as a discovery item.

### Story 8: Python test-function mining
**Scenario:** As a discovery pipeline, I need to extract executable Python test signals.
**Given** Python test files selected from `FileIndex`
**When** `PythonTestMiner` runs
**Then** it emits `DiscoveredItem(kind=TEST_FUNCTION)` entries for top-level `test_` functions and `test_` methods under `Test*` classes.

**Scenario:** As a miner maintainer, I need framework and metadata fidelity.
**Given** framework detection from `ctx.frameworks[SupportedLanguage.PYTHON]`
**When** test items are emitted
**Then** metadata validates against `TestFunctionMeta`, including decorator names, docstring flags, class context, and parametrization detection.

**Scenario:** As a pipeline operator, I need resilient parse handling.
**Given** one malformed Python test file and one valid file
**When** `PythonTestMiner` executes
**Then** it reports `MinerErrorKind.PARSE_ERROR` for parse failures and still returns items from valid files.

## Acceptance Criteria
- Language abstraction returns `SupportedLanguage` members for `.py`, `.ts`, `.tsx`, `.js`, `.jsx`, `.mjs` and `None` otherwise.
- `LanguageRegistry().parse(path_to_py_file)` returns `(node, SupportedLanguage.PYTHON)` for valid Python input.
Expand All @@ -104,3 +120,7 @@ Add shared discovery infrastructure for Issues #125 and #126: centralized parser
- TypeScript/JavaScript JSDoc comments immediately preceding declarations are emitted with the correct `SupportedLanguage`.
- Test files are excluded from docstring mining and configured `source_dirs` scope is respected.
- Trivial `__init__` docstrings (<=10 chars) are skipped.
- `PythonTestMiner` reads candidate files from `ctx.file_index.files_matching("test_*.py", "*_test.py")` and does not walk the filesystem directly.
- `PythonTestMiner` uses precomputed frameworks from `ctx.frameworks[SupportedLanguage.PYTHON]` rather than re-detecting frameworks.
- Python test metadata validates against `TestFunctionMeta`, including `is_parametrized` and `class_name` values.
- Parse failures in individual test files set `MinerResult.error_kind=PARSE_ERROR` without aborting extraction from remaining files.
2 changes: 1 addition & 1 deletion src/specleft/discovery/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _resolve_toml_loader() -> Any | None:
return tomllib
except ModuleNotFoundError:
try:
import tomli # type: ignore[import-not-found]
import tomli

return tomli
except ModuleNotFoundError:
Expand Down
2 changes: 1 addition & 1 deletion src/specleft/discovery/frameworks/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def resolve_toml_loader() -> Any | None:
return tomllib
except ModuleNotFoundError:
try:
import tomli # type: ignore[import-not-found]
import tomli

return tomli
except ModuleNotFoundError:
Expand Down
8 changes: 4 additions & 4 deletions src/specleft/discovery/language_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def _parser_for(self, language: SupportedLanguage) -> Any | None:
return None

try:
from tree_sitter import Parser # type: ignore[import-untyped]
from tree_sitter import Parser

parser = Parser()
parser.set_language(language_obj)
parser.language = language_obj
self._parser_cache[language] = parser
return parser
except Exception:
Expand All @@ -96,14 +96,14 @@ def _language_for(self, language: SupportedLanguage) -> Any | None:

if language == SupportedLanguage.PYTHON:
try:
import tree_sitter_python # type: ignore[import-not-found]
import tree_sitter_python

language_obj = tree_sitter_python.language()
except Exception:
return None
elif language in (SupportedLanguage.TYPESCRIPT, SupportedLanguage.JAVASCRIPT):
try:
import tree_sitter_typescript # type: ignore[import-not-found]
import tree_sitter_typescript

if language == SupportedLanguage.TYPESCRIPT:
language_obj = tree_sitter_typescript.language_typescript()
Expand Down
11 changes: 9 additions & 2 deletions src/specleft/discovery/miners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
"""Discovery miner implementations."""

from specleft.discovery.miners.defaults import default_miners
from specleft.discovery.miners.shared import DocstringMiner, ReadmeOverviewMiner
from specleft.discovery.miners.python.tests import PythonTestMiner
from specleft.discovery.miners.shared.docstrings import DocstringMiner
from specleft.discovery.miners.shared.readme import ReadmeOverviewMiner

__all__ = ["DocstringMiner", "ReadmeOverviewMiner", "default_miners"]
__all__ = [
"DocstringMiner",
"PythonTestMiner",
"ReadmeOverviewMiner",
"default_miners",
]
6 changes: 4 additions & 2 deletions src/specleft/discovery/miners/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

from typing import TYPE_CHECKING

from specleft.discovery.miners.shared import DocstringMiner, ReadmeOverviewMiner
from specleft.discovery.miners.python.tests import PythonTestMiner
from specleft.discovery.miners.shared.docstrings import DocstringMiner
from specleft.discovery.miners.shared.readme import ReadmeOverviewMiner

if TYPE_CHECKING:
from specleft.discovery.pipeline import BaseMiner


def default_miners() -> list[BaseMiner]:
"""Return default miners in deterministic execution order."""
return [ReadmeOverviewMiner(), DocstringMiner()]
return [ReadmeOverviewMiner(), PythonTestMiner(), DocstringMiner()]
3 changes: 2 additions & 1 deletion src/specleft/discovery/miners/python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"""Python-specific discovery miners."""

from specleft.discovery.miners.python.docstrings import extract_python_items
from specleft.discovery.miners.python.tests import PythonTestMiner

__all__ = ["extract_python_items"]
__all__ = ["PythonTestMiner", "extract_python_items"]
258 changes: 258 additions & 0 deletions src/specleft/discovery/miners/python/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2026 SpecLeft Contributors

"""Python test-function miner."""

from __future__ import annotations

import ast
import time
import uuid
from pathlib import Path
from typing import Any

from specleft.discovery.context import MinerContext
from specleft.discovery.miners.shared.common import line_number, node_text
from specleft.discovery.models import (
DiscoveredItem,
ItemKind,
MinerErrorKind,
MinerResult,
SupportedLanguage,
TestFunctionMeta,
)

_TEST_PATTERNS = ("test_*.py", "*_test.py")
_KNOWN_FRAMEWORKS = {"pytest", "unittest"}


class PythonTestMiner:
"""Extract Python test functions from indexed test files."""

miner_id = uuid.UUID("a7b21db5-0d22-41be-9902-7c725e63892e")
name = "python_test_functions"
languages = frozenset({SupportedLanguage.PYTHON})

def mine(self, ctx: MinerContext) -> MinerResult:
started = time.perf_counter()
framework = _primary_framework(ctx)
items: list[DiscoveredItem] = []
parse_failures: list[Path] = []

for rel_path in ctx.file_index.files_matching(*_TEST_PATTERNS):
abs_path = ctx.root / rel_path
parsed = ctx.registry.parse(abs_path)
if parsed is None:
parse_failures.append(rel_path)
continue

root_node, language = parsed
if language is not SupportedLanguage.PYTHON:
continue

try:
source_bytes = abs_path.read_bytes()
except OSError:
parse_failures.append(rel_path)
continue

items.extend(
_extract_test_items(
root_node=root_node,
source_bytes=source_bytes,
file_path=rel_path,
framework=framework,
)
)

error_kind: MinerErrorKind | None = None
error: str | None = None
if parse_failures:
error_kind = MinerErrorKind.PARSE_ERROR
files = ", ".join(path.as_posix() for path in parse_failures)
error = f"Failed to parse Python test files: {files}"

return MinerResult(
miner_id=self.miner_id,
miner_name=self.name,
items=items,
error=error,
error_kind=error_kind,
duration_ms=max(0, int((time.perf_counter() - started) * 1000)),
)


def _primary_framework(ctx: MinerContext) -> str:
frameworks = ctx.frameworks.get(SupportedLanguage.PYTHON, [])
return frameworks[0] if frameworks else "unknown"


def _extract_test_items(
*,
root_node: Any,
source_bytes: bytes,
file_path: Path,
framework: str,
) -> list[DiscoveredItem]:
items: list[DiscoveredItem] = []
for node in getattr(root_node, "named_children", ()):
test_function = _extract_function(node, source_bytes)
if test_function is not None:
function_node, decorators = test_function
item = _to_discovered_item(
function_node=function_node,
decorators=decorators,
source_bytes=source_bytes,
file_path=file_path,
framework=framework,
class_name=None,
)
if item is not None:
items.append(item)
continue

if node.type != "class_definition":
continue

class_name = _field_text(node, "name", source_bytes)
if not class_name or not class_name.startswith("Test"):
continue

body = node.child_by_field_name("body")
if body is None:
continue

for member in getattr(body, "named_children", ()):
method = _extract_function(member, source_bytes)
if method is None:
continue
function_node, decorators = method
item = _to_discovered_item(
function_node=function_node,
decorators=decorators,
source_bytes=source_bytes,
file_path=file_path,
framework=framework,
class_name=class_name,
)
if item is not None:
items.append(item)

return items


def _extract_function(node: Any, source_bytes: bytes) -> tuple[Any, list[str]] | None:
if node.type in {"function_definition", "async_function_definition"}:
return node, []

if node.type != "decorated_definition":
return None

definition = node.child_by_field_name("definition")
if definition is None or definition.type not in {
"function_definition",
"async_function_definition",
}:
return None

decorators = [
_normalize_decorator(node_text(child, source_bytes))
for child in getattr(node, "named_children", ())
if child.type == "decorator"
]
return definition, [value for value in decorators if value]


def _to_discovered_item(
*,
function_node: Any,
decorators: list[str],
source_bytes: bytes,
file_path: Path,
framework: str,
class_name: str | None,
) -> DiscoveredItem | None:
name = _field_text(function_node, "name", source_bytes)
if not name or not name.startswith("test_"):
return None

docstring = _extract_docstring(function_node, source_bytes)
is_parametrized = any(decorator.endswith("parametrize") for decorator in decorators)
confidence = 0.9 if framework in _KNOWN_FRAMEWORKS else 0.7

metadata = TestFunctionMeta(
framework=framework,
class_name=class_name,
decorators=decorators,
has_docstring=docstring is not None,
docstring=docstring,
is_parametrized=is_parametrized,
)

return DiscoveredItem(
kind=ItemKind.TEST_FUNCTION,
name=name,
file_path=file_path,
line_number=line_number(function_node),
language=SupportedLanguage.PYTHON,
raw_text=docstring,
metadata=metadata.model_dump(),
confidence=confidence,
)


def _normalize_decorator(raw: str) -> str:
normalized = raw.strip()
if normalized.startswith("@"):
normalized = normalized[1:]
return normalized.split("(", 1)[0].strip()


def _extract_docstring(function_node: Any, source_bytes: bytes) -> str | None:
body = function_node.child_by_field_name("body")
if body is None:
return None

named_children = list(getattr(body, "named_children", ()))
if not named_children:
return None

first = named_children[0]
if first.type != "expression_statement":
return None

for child in getattr(first, "named_children", ()):
if child.type in {"string", "concatenated_string"}:
return _clean_python_string(node_text(child, source_bytes))
return None


def _clean_python_string(value: str) -> str | None:
stripped = value.strip()
if not stripped:
return None

try:
parsed = ast.literal_eval(stripped)
except (SyntaxError, ValueError):
parsed = _strip_wrapping_quotes(stripped)
if not isinstance(parsed, str):
return None

cleaned = parsed.strip()
return cleaned or None


def _strip_wrapping_quotes(value: str) -> str:
for quote in ('"""', "'''", '"', "'"):
if value.startswith(quote) and value.endswith(quote) and len(value) >= 2:
return value[len(quote) : len(value) - len(quote)].strip()
return value


def _field_text(node: Any, field: str, source_bytes: bytes) -> str | None:
field_node = node.child_by_field_name(field)
if field_node is None:
return None
text = node_text(field_node, source_bytes).strip()
return text or None
15 changes: 13 additions & 2 deletions src/specleft/discovery/miners/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,18 @@

"""Shared miners used by multiple discovery workflows."""

from specleft.discovery.miners.shared.docstrings import DocstringMiner
from specleft.discovery.miners.shared.readme import ReadmeOverviewMiner
from __future__ import annotations

__all__ = ["DocstringMiner", "ReadmeOverviewMiner"]


def __getattr__(name: str) -> object:
if name == "DocstringMiner":
from specleft.discovery.miners.shared.docstrings import DocstringMiner

return DocstringMiner
if name == "ReadmeOverviewMiner":
from specleft.discovery.miners.shared.readme import ReadmeOverviewMiner

return ReadmeOverviewMiner
raise AttributeError(name)
Loading