Skip to content

feat(ENG-373): add configurable table_scope to FunctionNode and OperatorNode#129

Merged
eywalker merged 6 commits intodevfrom
eng-373/table-scope-pipeline-hash
Apr 7, 2026
Merged

feat(ENG-373): add configurable table_scope to FunctionNode and OperatorNode#129
eywalker merged 6 commits intodevfrom
eng-373/table-scope-pipeline-hash

Conversation

@kurodo3
Copy link
Copy Markdown

@kurodo3 kurodo3 bot commented Apr 7, 2026

Summary

Implements ENG-373: add configurable table scoping based on pipeline_hash instead of node_hash.

  • table_scope: Literal["pipeline_hash", "content_hash"] = "pipeline_hash" added to both FunctionNode and OperatorNode
  • pipeline_hash (new default): pipeline tables are scoped at the pipeline-hash level — all runs with the same function/operator structure share one table (uri + schema:{pipeline_hash}). A new _node_content_hash row column is written to every record and used to filter reads per data run, then always dropped before returning results to callers
  • content_hash (legacy): preserves the previous two-level path (uri + schema:{pipeline_hash} + instance:{content_hash}), giving each run its own isolated table

Changes

Core:

  • system_constants.py: add NODE_CONTENT_HASH_COL (_node_content_hash) module constant and SystemConstant property
  • function_node.py: table_scope init param, _node_identity_path_cache (invalidated on attach_databases/clear_cache), _filter_by_content_hash() helper, NODE_CONTENT_HASH_COL written in add_pipeline_record and dropped at all consumer-facing read boundaries (get_cached_results, iter_packets, async_execute, get_all_records)
  • operator_node.py: same additions for _store_output_stream, _load_cached_stream_from_db, _replay_from_cache, get_all_records
  • graph.py: table_scope serialized into node descriptors; from_descriptor on both node types raises ValueError when field is absent

Tests:

  • 12 existing test files updated to reflect the new default path shape (single schema: segment, no instance:)
  • New tests/test_core/test_table_scope.py: 26 dedicated tests covering both scopes for FunctionNode and OperatorNode — path structure, shared-table per-run isolation, _node_content_hash column presence/absence, from_descriptor validation, and cache invalidation

Test plan

  • uv run pytest tests/ -q → 3148 passed, 56 skipped, 0 failures

🤖 Generated with Claude Code

…OperatorNode

Introduces `table_scope: Literal["pipeline_hash", "content_hash"] = "pipeline_hash"` on
both `FunctionNode` and `OperatorNode`.

With the new default (`pipeline_hash`), pipeline tables are scoped at the pipeline_hash
level — all runs sharing the same function/operator structure write to one shared table
path (`uri + schema:{pipeline_hash}`). Per-run isolation is provided by a new
`_node_content_hash` row-level column (always written, always dropped before returning
results to callers).

The legacy `content_hash` scope preserves the previous two-level path
(`uri + schema:{pipeline_hash} + instance:{content_hash}`), giving each data run its own
isolated table with no row-level filtering.

Key changes:
- `system_constants.py`: add `NODE_CONTENT_HASH_COL` / `SystemConstant.NODE_CONTENT_HASH_COL`
- `function_node.py`: `table_scope` param, `_node_identity_path_cache`, `_filter_by_content_hash()`,
  `NODE_CONTENT_HASH_COL` written to every pipeline record and dropped at all read boundaries
- `operator_node.py`: same additions for stream storage/replay/get_all_records paths
- `graph.py`: serialize `table_scope` into node descriptors; both `from_descriptor` impls
  raise `ValueError` when `table_scope` is absent
- Tests: update 12 existing test files for new default path shape; add
  `tests/test_core/test_table_scope.py` with 26 dedicated tests covering both scopes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 89.13043% with 10 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/core/nodes/function_node.py 87.80% 5 Missing ⚠️
src/orcapod/core/nodes/operator_node.py 89.36% 5 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a configurable table_scope to FunctionNode and OperatorNode to control how pipeline tables are scoped: defaulting to shared tables keyed by pipeline_hash with per-run isolation via a _node_content_hash row column, while preserving the legacy per-run table behavior keyed by content_hash. This updates core node identity/path behavior, serialization, and expands/adjusts test coverage accordingly.

Changes:

  • Introduce table_scope: Literal["pipeline_hash", "content_hash"] on FunctionNode/OperatorNode, including node identity path caching + cache invalidation.
  • Implement row-level _node_content_hash write + read-time filtering (and dropping) for the shared-table default (pipeline_hash scope).
  • Serialize table_scope in graph/node descriptors, enforce presence on load, and update/add tests for both scopes.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/orcapod/system_constants.py Adds NODE_CONTENT_HASH_COL constant/property used for row-level run discrimination.
src/orcapod/pipeline/graph.py Includes table_scope in serialized node descriptors.
src/orcapod/core/nodes/function_node.py Adds table_scope, identity-path caching, _node_content_hash write/filter/drop across read boundaries.
src/orcapod/core/nodes/operator_node.py Adds table_scope, identity-path caching, _node_content_hash write/filter/drop across replay/read boundaries.
tests/test_core/test_table_scope.py New dedicated tests covering both scopes, isolation behavior, descriptor validation, cache invalidation.
tests/test_pipeline/test_serialization.py Updates expectations for default schema-only identity path under pipeline_hash scope.
tests/test_pipeline/test_serialization_helpers.py Updates descriptors to include table_scope.
tests/test_pipeline/test_node_descriptors.py Ensures node descriptors round-trip table_scope.
tests/test_core/test_caching_integration.py Updates integration assertions for shared-table default + per-run filtering.
tests/test_core/operators/test_operator_node.py Updates operator-node identity path expectations (schema-only by default).
tests/test_core/function_pod/test_pipeline_hash_integration.py Updates behavior/expectations for shared-table default and per-run disambiguation.
tests/test_core/function_pod/test_function_pod_node.py Updates identity path tests for schema-only default.
tests/test_core/function_pod/test_function_node_caching.py Updates caching tests for shared table accumulation + per-run isolation.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

f"FunctionNode descriptor is missing required 'table_scope' field: "
f"{descriptor.get('label', '<unlabeled>')}"
)
table_scope: Literal["pipeline_hash", "content_hash"] = descriptor["table_scope"]
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table_scope is read from the descriptor without runtime validation. If the descriptor contains an unexpected value (typo/unknown), the node will silently fall back to the legacy path logic (the else branch) and may scope tables incorrectly. Consider validating table_scope against the allowed values ("pipeline_hash" | "content_hash") and raising a ValueError for anything else.

Suggested change
table_scope: Literal["pipeline_hash", "content_hash"] = descriptor["table_scope"]
raw_table_scope = descriptor["table_scope"]
if raw_table_scope not in ("pipeline_hash", "content_hash"):
raise ValueError(
f"FunctionNode descriptor has invalid 'table_scope' value "
f"{raw_table_scope!r} for {descriptor.get('label', '<unlabeled>')}; "
f"expected one of ('pipeline_hash', 'content_hash')"
)
table_scope = cast(
Literal["pipeline_hash", "content_hash"],
raw_table_scope,
)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added validation after the missing-key check. If the stored value is not one of ('pipeline_hash', 'content_hash') a ValueError is raised with the node label for easy diagnosis. Used cast(Literal[...], raw_table_scope) to preserve the narrowed type for the rest of the method.

f"OperatorNode descriptor is missing required 'table_scope' field: "
f"{descriptor.get('label', '<unlabeled>')}"
)
table_scope: Literal["pipeline_hash", "content_hash"] = descriptor["table_scope"]
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table_scope is taken directly from the descriptor without validating allowed values. An invalid string would be accepted and later treated as legacy scoping (the else branch in node_identity_path), which can lead to incorrect DB path computation. Suggest validating descriptor["table_scope"] against ("pipeline_hash", "content_hash") and raising ValueError on unknown values.

Suggested change
table_scope: Literal["pipeline_hash", "content_hash"] = descriptor["table_scope"]
raw_table_scope = descriptor["table_scope"]
if raw_table_scope not in ("pipeline_hash", "content_hash"):
raise ValueError(
f"OperatorNode descriptor has invalid 'table_scope' value "
f"{raw_table_scope!r} for {descriptor.get('label', '<unlabeled>')}; "
"expected one of ('pipeline_hash', 'content_hash')"
)
table_scope: Literal["pipeline_hash", "content_hash"] = raw_table_scope

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — same validation added to OperatorNode.from_descriptor. A ValueError with the node label is raised for any value outside ('pipeline_hash', 'content_hash'), rather than silently falling through to the else branch.

Comment on lines +237 to +238
mask = [v == own_hash for v in table.column(col_name).to_pylist()]
return table.filter(pa.array(mask))
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_filter_by_content_hash builds a Python mask via to_pylist() and then converts it back into an Arrow array. This is O(n) Python overhead and can be expensive for large cached tables. Prefer an Arrow-native filter (e.g., pyarrow.compute.equal(table[col_name], pa.scalar(own_hash))) to keep filtering vectorized and avoid materializing Python lists.

Suggested change
mask = [v == own_hash for v in table.column(col_name).to_pylist()]
return table.filter(pa.array(mask))
mask = pa.compute.equal(table.column(col_name), pa.scalar(own_hash))
return table.filter(mask)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — switched to pc.equal(table.column(col_name), own_hash) (Arrow vectorized compute). Added pc = LazyModule('pyarrow.compute') alongside the existing pa lazy import so no eager dependency is introduced.

Comment on lines +398 to +412
def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table":
"""Filter *table* to rows whose ``_node_content_hash`` matches this node.

Only applied when ``table_scope="pipeline_hash"`` because in that mode
multiple runs share the same DB table and must be disambiguated at read
time. In ``"content_hash"`` mode every run has its own table.
"""
if self._table_scope != "pipeline_hash":
return table
col_name = constants.NODE_CONTENT_HASH_COL
if col_name not in table.column_names:
return table
own_hash = self.content_hash().to_string()
mask = [v == own_hash for v in table.column(col_name).to_pylist()]
return table.filter(pa.array(mask))
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_filter_by_content_hash converts the entire column to a Python list to build a boolean mask, which is slow and memory-heavy for large tables. Consider using pyarrow.compute to generate the boolean mask and filter in Arrow (e.g. pc.equal(col, scalar)), avoiding to_pylist() and Python loops.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — same change applied here: pc.equal(table.column(col_name), own_hash) replaces the to_pylist() + Python loop. The pa.array(mask) wrapping is also gone since pc.equal returns an Arrow BooleanArray directly.

Comment on lines +457 to +466
# Add node content hash column for per-run disambiguation when using
# pipeline_hash table scope (multiple runs share the same table).
n_rows = output_table.num_rows
output_table = output_table.append_column(
constants.NODE_CONTENT_HASH_COL,
pa.array(
[self.content_hash().to_string()] * n_rows,
type=pa.large_string(),
),
)
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block always appends the node content-hash column, but the comment says it's only for table_scope="pipeline_hash". Either make the behavior conditional on self._table_scope (to avoid unnecessary storage and list allocation in legacy content_hash mode) or update the comment to reflect that the column is written in all scopes. Also, building [hash] * n_rows creates a potentially large Python list; consider an Arrow-native repeat to reduce memory overhead.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional design — NODE_CONTENT_HASH_COL is always written to every stored row regardless of table_scope. The column serves as a permanent run-identity tag on each record. Filtering on it (in _filter_by_content_hash) is only applied at read time when table_scope="pipeline_hash", where multiple runs share the same table. In content_hash scope each run already has its own isolated table so filtering is skipped, but the column is still present in storage.

Updated the comment in _store_output_stream to make this distinction explicit. Also took the opportunity to use pa.repeat() instead of [value]*n_rows for the array creation.

- Use pc.equal() (vectorized Arrow compute) instead of to_pylist() + Python
  loop in _filter_by_content_hash on both FunctionNode and OperatorNode
- Use pa.repeat() instead of [value]*n_rows in OperatorNode._store_output_stream
  to create the NODE_CONTENT_HASH_COL array without materializing a Python list
- Add pc = LazyModule("pyarrow.compute") alongside pa in both files
- Validate table_scope value in from_descriptor for both FunctionNode and
  OperatorNode: raise ValueError on unknown values (not just missing key)
- Clarify NODE_CONTENT_HASH_COL comment: column is always written; filtering
  on it is only applied when table_scope="pipeline_hash"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3
Copy link
Copy Markdown
Author

kurodo3 bot commented Apr 7, 2026

Review round 1 — addressed

All 5 comments resolved in commit 7b11582:

Vectorized filtering (_filter_by_content_hash)
Both FunctionNode and OperatorNode: replaced to_pylist() + Python boolean list + pa.array(mask) with pc.equal(table.column(col_name), own_hash). Added pc = LazyModule('pyarrow.compute') alongside the existing pa lazy import so the dependency stays optional.

table_scope value validation in from_descriptor
Both nodes now validate the stored value against ('pipeline_hash', 'content_hash') after confirming the key is present, raising a descriptive ValueError (with node label) for any unknown string instead of silently falling through to the legacy else branch.

NODE_CONTENT_HASH_COL always written — comment clarified
The column is intentionally written on every stored row regardless of table_scope. It serves as a permanent run-identity tag. Filtering on it is only applied at read time in pipeline_hash scope (shared table). In content_hash scope each run has its own isolated table so filtering is skipped, but the column is still present. Updated the comment in _store_output_stream to make this explicit. Also replaced [value]*n_rows with pa.repeat(value, n_rows).cast(pa.large_string()) for Arrow-native array creation.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (2)

src/orcapod/core/nodes/function_node.py:90

  • table_scope is a new public init parameter but isn’t runtime-validated. Passing an unexpected string (e.g., a typo) will silently fall into the legacy else branches and change DB scoping behavior. Consider validating table_scope in __init__ against ("pipeline_hash", "content_hash") and raising ValueError on invalid values.
    def __init__(
        self,
        function_pod: FunctionPodProtocol,
        input_stream: StreamProtocol,
        tracker_manager: TrackerManagerProtocol | None = None,
        label: str | None = None,
        config: Config | None = None,
        # Optional DB params for persistent mode:
        pipeline_database: ArrowDatabaseProtocol | None = None,
        result_database: ArrowDatabaseProtocol | None = None,
        table_scope: Literal["pipeline_hash", "content_hash"] = "pipeline_hash",
    ):
        if tracker_manager is None:
            tracker_manager = DEFAULT_TRACKER_MANAGER
        self.tracker_manager = tracker_manager

src/orcapod/pipeline/graph.py:797

  • This PR makes table_scope a required field in node descriptors (and from_descriptor now raises when it’s missing), which is a serialization schema change. However PIPELINE_FORMAT_VERSION is still "0.1.0" and is treated as supported, so older saved pipelines with version 0.1.0 but without table_scope will now fail at load-time despite passing the version gate. Consider bumping the pipeline format version and/or providing a backward-compatible default for missing table_scope based on the saved format version.
    def _build_function_descriptor(self, node: "FunctionNode") -> dict[str, Any]:
        """Build function-specific descriptor fields for a FunctionNode.

        Args:
            node: The FunctionNode to describe.

        Returns:
            Dict with function-specific fields (uses function_config key).
        """
        return {
            "function_config": node._function_pod.to_config(),
            "table_scope": node._table_scope,
        }

    def _build_operator_descriptor(self, node: OperatorNode, level: str = "standard") -> dict[str, Any]:
        """Build operator-specific descriptor fields for a OperatorNode.

        Args:
            node: The OperatorNode to describe.
            level: Save detail level; cache_mode only included at standard+.

        Returns:
            Dict with operator-specific fields.
        """
        result: dict[str, Any] = {
            "operator_config": node._operator.to_config(),
            "table_scope": node._table_scope,
        }
        # cache_mode at standard+ only
        if level in ("standard", "full"):
            result["cache_mode"] = node._cache_mode.value
        return result

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +413 to +418
"""
if self._table_scope != "pipeline_hash":
return table
col_name = constants.NODE_CONTENT_HASH_COL
if col_name not in table.column_names:
return table
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In table_scope="pipeline_hash" mode, _filter_by_content_hash() must reliably isolate a run. Currently if the stored table is missing NODE_CONTENT_HASH_COL, the method returns unfiltered rows, which breaks isolation and can expose other runs’ data. Consider failing closed (raise/empty) when the column is absent in pipeline_hash scope.

Suggested change
"""
if self._table_scope != "pipeline_hash":
return table
col_name = constants.NODE_CONTENT_HASH_COL
if col_name not in table.column_names:
return table
In shared-table mode, absence of the row-level content-hash column is
unsafe because the table cannot be reliably isolated to a single run.
"""
if self._table_scope != "pipeline_hash":
return table
col_name = constants.NODE_CONTENT_HASH_COL
if col_name not in table.column_names:
raise ValueError(
"Cannot read records for table_scope='pipeline_hash': "
f"missing required column {col_name!r} used for per-run isolation."
)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — in pipeline_hash scope, if NODE_CONTENT_HASH_COL is absent the method now raises ValueError (with a message noting that older records may be the cause) rather than returning an unfiltered table. Applied identically to both FunctionNode and OperatorNode.

Comment on lines +451 to +476
output_table = output_table.append_column(
constants.NODE_CONTENT_HASH_COL,
pa.repeat(self.content_hash().to_string(), n_rows).cast(pa.large_string()),
)
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_store_output_stream() uses a per-row hash (HASH_COLUMN_NAME) as the record ID with skip_duplicates=True, but the hash is computed before NODE_CONTENT_HASH_COL is appended. In table_scope="pipeline_hash" (shared-table) mode, two different runs that produce identical output rows will collide on the same record ID, causing later runs’ rows (with their own content-hash discriminator) to be skipped; _filter_by_content_hash() will then hide the earlier run’s row and the later run will see missing cache/provenance. Make the record ID run-scoped when self._table_scope == "pipeline_hash" (e.g., include self.content_hash() in the hashed ID or use a composite key) so identical rows can exist per run in the shared table.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. When table_scope='pipeline_hash', the node's content_hash() is now mixed into each per-row record hash: the row hash and content hash string are placed into a 1-row Arrow table and hashed together using the same arrow_hasher. This makes each run's rows get distinct HASH_COLUMN_NAME values in the shared table, so skip_duplicates=True no longer causes a later run's identical rows to be silently dropped.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified further (commit 3e6d851): instead of conditionally computing a secondary combined hash, NODE_CONTENT_HASH_COL is now appended to output_table before the per-row hash loop. Since every row already carries the run-scoped content hash as a column, hashing the row naturally produces a run-scoped record ID with no special-casing or two-table construction needed.

@@ -108,6 +111,8 @@ def __init__(
self._stored_node_uri: tuple[str, ...] = ()
self._stored_pipeline_path: tuple[str, ...] = ()
self._descriptor: dict = {}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table_scope is accepted in __init__ but not validated at runtime. A typo/unknown value will silently change path computation (falls into the legacy else branch), which can lead to writing/reading from unexpected DB paths. Validate table_scope in __init__ and raise ValueError for unknown values.

Suggested change
self._descriptor: dict = {}
self._descriptor: dict = {}
if table_scope not in ("pipeline_hash", "content_hash"):
raise ValueError(
f"Unknown table_scope {table_scope!r}. Expected one of: "
"'pipeline_hash', 'content_hash'."
)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added a guard at the top of the _table_scope assignment in __init__ for both FunctionNode and OperatorNode. Any value outside ('pipeline_hash', 'content_hash') now raises ValueError immediately, before the attribute is set.

Comment on lines 922 to 930
constants.PACKET_RECORD_ID: pa.array(
[packet_record_id], type=pa.large_string()
),
constants.NODE_CONTENT_HASH_COL: pa.array(
[self.content_hash().to_string()], type=pa.large_string()
),
f"{constants.META_PREFIX}input_packet{constants.CONTEXT_KEY}": pa.array(
[input_packet.data_context_key], type=pa.large_string()
),
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NODE_CONTENT_HASH_COL is written per row, but the pipeline DB record ID (entry_id) is still computed from just (tag + system tags + input packet hash). In table_scope="pipeline_hash" mode the table is shared across runs, so overlapping inputs across runs will collide on the same entry_id; the second run will skip writing its row, and _filter_by_content_hash() will then hide the first run’s row—losing provenance for the second run. Consider making the pipeline record ID run-scoped when self._table_scope == "pipeline_hash" (e.g., include self.content_hash() in the entry ID or use a composite key) so identical packets can be recorded for multiple runs in the shared table.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in FunctionNode.compute_pipeline_entry_id. When table_scope='pipeline_hash', the node's content_hash() is appended as an extra column to the table being hashed, making the resulting entry ID run-scoped. This means two runs processing the same (tag, input_packet) produce distinct pipeline records in the shared table instead of the second run's record being silently skipped by the duplicate check.

… scope

- Validate table_scope in __init__ for both FunctionNode and OperatorNode;
  raise ValueError immediately on unknown values rather than silently falling
  through to the else branch in node_identity_path
- FunctionNode.compute_pipeline_entry_id: when table_scope="pipeline_hash",
  include the node's content_hash in the hashed ID so that two runs processing
  identical inputs produce distinct entry IDs in the shared table; without this
  the second run's pipeline record was silently skipped by the duplicate check
  and _filter_by_content_hash would then hide the first run's row
- OperatorNode._store_output_stream: when table_scope="pipeline_hash", mix the
  node's content_hash into each per-row HASH_COLUMN_NAME by hashing a 1-row
  table containing both the row hash and the content hash; without this,
  identical rows from two runs collided on the same record ID and skip_duplicates
  caused the second run's rows to never be stored
- _filter_by_content_hash (both nodes): fail closed with ValueError when
  table_scope="pipeline_hash" and NODE_CONTENT_HASH_COL is absent, rather than
  returning an unfiltered table that would expose other runs' data

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3
Copy link
Copy Markdown
Author

kurodo3 bot commented Apr 7, 2026

Review round 2 — addressed

All 4 actionable comments resolved in commit 9899000:

table_scope validation in __init__ (both nodes)
Added an explicit guard before assigning self._table_scope in both FunctionNode.__init__ and OperatorNode.__init__. Any value outside ('pipeline_hash', 'content_hash') raises ValueError immediately, before any state is set.

Record ID collision — FunctionNode.compute_pipeline_entry_id
When table_scope='pipeline_hash', the node's content_hash() is now appended as an extra column to the table being hashed for the entry ID. Two runs processing the same (tag, input_packet) now produce distinct pipeline records; previously the second run's record was silently skipped by the duplicate check and _filter_by_content_hash would then hide the first run's row from the second run.

Record ID collision — OperatorNode._store_output_stream
When table_scope='pipeline_hash', the node's content_hash() is mixed into each per-row HASH_COLUMN_NAME by hashing a 1-row Arrow table containing both the row hash and the content hash string (using the same arrow_hasher for consistency). Identical rows from different runs now get distinct record IDs in the shared table, so skip_duplicates=True no longer causes a later run's rows to be silently dropped.

_filter_by_content_hash — fail closed (both nodes)
When table_scope='pipeline_hash' and NODE_CONTENT_HASH_COL is absent from the stored table, the method now raises ValueError with an explanatory message (noting possible older records), rather than returning an unfiltered table that could expose other runs' data.

…OL first

Instead of conditionally computing a secondary combined hash, append
NODE_CONTENT_HASH_COL to the output table before the per-row HASH_COLUMN_NAME
loop.  The content hash column is then naturally included in each row's hash,
making record IDs run-scoped without any special-casing.  The column order
change has no functional effect on storage or filtering.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 752 to 754
joined = (
pl.DataFrame(taginfo)
.join(
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After switching to shared-table storage (table_scope="pipeline_hash"), get_cached_results() can join against a much larger taginfo table. The current implementation later filters joined to entry_ids by calling to_pylist() and building a Python boolean mask, which is slow and memory-heavy at scale. Prefer filtering in Polars/Arrow before materializing to Arrow (e.g., pl.col(PIPELINE_ENTRY_ID_COL).is_in(entry_ids)), keeping the filter vectorized.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — moved the entry_id filter into the Polars chain as .filter(pl.col(PIPELINE_ENTRY_ID_COL).is_in(entry_ids)) before .to_arrow(). This keeps the filter vectorized and avoids materializing the full joined table into Python. Also removed the now-unused entry_id_set variable; filtered_entry_ids is now obtained directly from filtered.column(PIPELINE_ENTRY_ID_COL).to_pylist() on the already-filtered result.

Comment on lines 488 to 494
@@ -422,7 +493,9 @@ def _store_output_stream(self, stream: StreamProtocol) -> None:
skip_duplicates=True,
)
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says "identical rows across runs naturally deduplicate", but _store_output_stream() now intentionally makes record IDs run-scoped by appending NODE_CONTENT_HASH_COL before hashing. That means identical output rows from different runs will not deduplicate (they produce different _record_hash values), which is important for per-run isolation in the shared table. Update the comment to reflect the current behavior to avoid confusion for future maintainers.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — updated the comment to accurately state that record IDs are run-scoped (because NODE_CONTENT_HASH_COL is included in the hash), so rows from different runs with identical output produce distinct record IDs and both get stored. skip_duplicates=True now only deduplicates exact re-runs of the same node within a single run.

…hed_results

- OperatorNode._store_output_stream: update comment to accurately describe that
  record IDs are now run-scoped (NODE_CONTENT_HASH_COL is included in the hash),
  so rows from different runs with identical output produce distinct record IDs
  and are both stored; skip_duplicates=True only deduplicates within a single run
- FunctionNode.get_cached_results: push the entry_id filter into the Polars chain
  as .filter(pl.col(PIPELINE_ENTRY_ID_COL).is_in(entry_ids)) before .to_arrow(),
  eliminating the to_pylist() + Python boolean mask + pa.array() roundtrip;
  also removes the now-unused entry_id_set variable

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3
Copy link
Copy Markdown
Author

kurodo3 bot commented Apr 7, 2026

Review round 3 — addressed

Both comments resolved in commit 2ee2c90:

Stale dedup comment in _store_output_stream
Updated to accurately reflect that record IDs are now run-scoped — NODE_CONTENT_HASH_COL is included in the hash before the loop runs, so identical output rows from different runs produce distinct record IDs and both are stored. skip_duplicates=True only deduplicates exact re-runs of the same node within a single run.

Vectorized entry_id filter in get_cached_results
Moved the filter into the Polars chain as .filter(pl.col(PIPELINE_ENTRY_ID_COL).is_in(entry_ids)) before .to_arrow(). This eliminates the to_pylist() + Python boolean list comprehension + pa.array(mask) roundtrip entirely, keeping the filter vectorized. Also removed the now-unused entry_id_set variable; filtered_entry_ids is extracted directly from the already-filtered Arrow result.

f"Unknown table_scope {table_scope!r}. "
"Expected one of: 'pipeline_hash', 'content_hash'."
)
self._table_scope: Literal["pipeline_hash", "content_hash"] = table_scope
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the type specification here necessary?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed — the type annotation on the assignment is redundant after the validation guard above already ensures the value is one of the two allowed literals. Simplified to self._table_scope = table_scope.

"or supply one via Pipeline.load(..., pipeline_database=<db>)."
)

def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remote quotes around typehints

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — removed the quotes from both the parameter and return type: def _filter_by_content_hash(self, table: pa.Table) -> pa.Table:.

)

def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table":
"""Filter *table* to rows whose ``_node_content_hash`` matches this node.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the column name is supposed to be based on the value of NODE_CONTENT_HASH_COL -- do not hardcode a specific value into the docstring

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — replaced the hardcoded _node_content_hash string in the docstring with a reference to NODE_CONTENT_HASH_COL so it stays in sync with the constant's value.

f"schema:{self.pipeline_hash().to_string()}",
f"instance:{self.content_hash().to_string()}",
)
if self._table_scope == "pipeline_hash":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's clean this up by doing:

path = pf.uri + (f"schema:{self.pipeline_hash().to_string()}",)
if self._table_scope != 'pipeline_hash':
    path += (f"instance:{self.content_hash().to_string()}",)
self._node_identity_path_cache = path

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied — replaced the if/else block with the suggested pattern: build the base schema: path unconditionally, then path += (f"instance:...",) only when table_scope != "pipeline_hash". Same change applied to OperatorNode.node_identity_path as well.

constants.INPUT_PACKET_HASH_COL,
pa.array([input_packet.content_hash().to_string()], type=pa.large_string()),
)
if self._table_scope == "pipeline_hash":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just always include NODE CONTENT_HASH_COL as part of the entry id computation for consistency

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — removed the if self._table_scope == 'pipeline_hash': guard and always append NODE_CONTENT_HASH_COL to the hashed table. Updated the docstring to match. The two append_column calls are now chained together unconditionally for consistency.

return self._node_identity_path_cache
if self._table_scope == "pipeline_hash":
path = self._operator.uri + (
f"schema:{self.pipeline_hash().to_string()}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider applying cleaner way to writing as suggested in my comment on function node side.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied — OperatorNode.node_identity_path now uses the same pattern: base schema: path first, instance: segment appended only when table_scope != 'pipeline_hash'.

function_node.py:
- Remove redundant Literal type annotation on self._table_scope assignment
- Remove quotes around pa.Table type hints in _filter_by_content_hash signature
- Replace hardcoded '_node_content_hash' with NODE_CONTENT_HASH_COL reference
  in _filter_by_content_hash docstring
- Simplify node_identity_path: build base path unconditionally, append
  instance: segment only when table_scope != "pipeline_hash"
- Always include NODE_CONTENT_HASH_COL in compute_pipeline_entry_id regardless
  of table_scope (remove the conditional); update docstring accordingly

operator_node.py:
- Remove redundant Literal type annotation on self._table_scope assignment
- Remove quotes around pa.Table type hints in _filter_by_content_hash signature
- Replace hardcoded docstring value with NODE_CONTENT_HASH_COL reference
- Apply same simplified node_identity_path pattern as function_node.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3
Copy link
Copy Markdown
Author

kurodo3 bot commented Apr 7, 2026

Review round 4 — addressed

All 6 comments resolved in commit 1066537:

Redundant type annotation on _table_scope (both nodes)
Removed Literal["pipeline_hash", "content_hash"] from the assignment — the validation guard immediately above already ensures the value is one of the two allowed literals, so the annotation adds no information.

Quoted type hints in _filter_by_content_hash (both nodes)
Removed string quotes: table: pa.Table and -> pa.Table.

Hardcoded column name in _filter_by_content_hash docstring (both nodes)
Replaced the hardcoded _node_content_hash string with NODE_CONTENT_HASH_COL so the docstring stays in sync with the constant.

Simplified node_identity_path (both nodes)
Replaced the if/else block with the suggested pattern — build the base schema: path unconditionally, then append the instance: segment only when table_scope != "pipeline_hash".

Always include NODE_CONTENT_HASH_COL in compute_pipeline_entry_id
Removed the if self._table_scope == "pipeline_hash": conditional — NODE_CONTENT_HASH_COL is now always appended as part of the chained append_column calls. Docstring updated to reflect the unconditional behavior.

@eywalker eywalker merged commit 3594ba3 into dev Apr 7, 2026
9 checks passed
@eywalker eywalker deleted the eng-373/table-scope-pipeline-hash branch April 7, 2026 05:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants