From 6d066b7240a98e765383ef828031ecff06abe0a9 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 02:43:31 +0000 Subject: [PATCH 1/6] feat(ENG-373): add configurable table_scope flag to FunctionNode and OperatorNode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces `table_scope: Literal["pipeline_hash", "content_hash"] = "pipeline_hash"` on both `FunctionNode` and `OperatorNode`. With the new default (`pipeline_hash`), pipeline tables are scoped at the pipeline_hash level — all runs sharing the same function/operator structure write to one shared table path (`uri + schema:{pipeline_hash}`). Per-run isolation is provided by a new `_node_content_hash` row-level column (always written, always dropped before returning results to callers). The legacy `content_hash` scope preserves the previous two-level path (`uri + schema:{pipeline_hash} + instance:{content_hash}`), giving each data run its own isolated table with no row-level filtering. Key changes: - `system_constants.py`: add `NODE_CONTENT_HASH_COL` / `SystemConstant.NODE_CONTENT_HASH_COL` - `function_node.py`: `table_scope` param, `_node_identity_path_cache`, `_filter_by_content_hash()`, `NODE_CONTENT_HASH_COL` written to every pipeline record and dropped at all read boundaries - `operator_node.py`: same additions for stream storage/replay/get_all_records paths - `graph.py`: serialize `table_scope` into node descriptors; both `from_descriptor` impls raise `ValueError` when `table_scope` is absent - Tests: update 12 existing test files for new default path shape; add `tests/test_core/test_table_scope.py` with 26 dedicated tests covering both scopes Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/core/nodes/function_node.py | 85 ++- src/orcapod/core/nodes/operator_node.py | 102 ++- src/orcapod/pipeline/graph.py | 2 + src/orcapod/system_constants.py | 5 + .../test_function_node_caching.py | 23 +- .../function_pod/test_function_pod_node.py | 26 +- .../test_pipeline_hash_integration.py | 72 +-- .../test_core/operators/test_operator_node.py | 7 +- tests/test_core/test_caching_integration.py | 34 +- tests/test_core/test_table_scope.py | 593 ++++++++++++++++++ tests/test_pipeline/test_node_descriptors.py | 3 + tests/test_pipeline/test_serialization.py | 36 +- .../test_serialization_helpers.py | 1 + 13 files changed, 870 insertions(+), 119 deletions(-) create mode 100644 tests/test_core/test_table_scope.py diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index e5799862..143db4e4 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -81,6 +81,7 @@ def __init__( # Optional DB params for persistent mode: pipeline_database: ArrowDatabaseProtocol | None = None, result_database: ArrowDatabaseProtocol | None = None, + table_scope: Literal["pipeline_hash", "content_hash"] = "pipeline_hash", ): if tracker_manager is None: tracker_manager = DEFAULT_TRACKER_MANAGER @@ -142,6 +143,8 @@ def __init__( self._stored_pipeline_path: tuple[str, ...] = () self._stored_result_record_path: tuple[str, ...] = () self._descriptor: dict = {} + self._table_scope: Literal["pipeline_hash", "content_hash"] = table_scope + self._node_identity_path_cache: tuple[str, ...] | None = None if pipeline_database is not None: self.attach_databases( @@ -187,6 +190,7 @@ def attach_databases( self._pipeline_database = pipeline_database # Clear all caches + self._node_identity_path_cache = None self.clear_cache() self._content_hash_cache.clear() self._pipeline_hash_cache.clear() @@ -216,6 +220,23 @@ def _require_pipeline_database(self) -> None: "or supply one via Pipeline.load(..., pipeline_database=)." ) + def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table": + """Filter *table* to rows whose ``_node_content_hash`` matches this node. + + Only applied when ``table_scope="pipeline_hash"`` because in that mode + multiple runs share the same DB table and must be disambiguated at read + time. In ``"content_hash"`` mode every run has its own table so no + filtering is needed. + """ + if self._table_scope != "pipeline_hash": + return table + col_name = constants.NODE_CONTENT_HASH_COL + if col_name not in table.column_names: + return table + own_hash = self.content_hash().to_string() + mask = [v == own_hash for v in table.column(col_name).to_pylist()] + return table.filter(pa.array(mask)) + # ------------------------------------------------------------------ # from_descriptor — reconstruct from a serialized pipeline descriptor # ------------------------------------------------------------------ @@ -253,6 +274,13 @@ def from_descriptor( pipeline_db = databases.get("pipeline") result_db = databases.get("result") # pre-scoped; None if not provided + if "table_scope" not in descriptor: + raise ValueError( + f"FunctionNode descriptor is missing required 'table_scope' field: " + f"{descriptor.get('label', '')}" + ) + table_scope: Literal["pipeline_hash", "content_hash"] = descriptor["table_scope"] + if function_pod is not None and input_stream is not None: # Full / READ_ONLY / CACHE_ONLY mode: construct normally via __init__. node = cls( @@ -261,6 +289,7 @@ def from_descriptor( pipeline_database=pipeline_db, result_database=result_db, label=descriptor.get("label"), + table_scope=table_scope, ) node._descriptor = descriptor @@ -329,6 +358,8 @@ def from_descriptor( node._stored_result_record_path = tuple( descriptor.get("result_record_path", ()) ) + node._table_scope = table_scope + node._node_identity_path_cache = None # Determine load status based on DB availability node._load_status = LoadStatus.UNAVAILABLE @@ -447,20 +478,31 @@ def keys( def node_identity_path(self) -> tuple[str, ...]: """Return the node identity path for observer contextualization. - The identity path is ``pod.uri + (schema_hash, instance_hash)`` and - is computable independently of whether a pipeline database is attached. + When ``table_scope="pipeline_hash"`` (default) the path is + ``pod.uri + (schema:{pipeline_hash},)`` — all runs that share the same + pipeline structure are routed to one shared table, with per-run + disambiguation via the ``_node_content_hash`` row-level column. + + When ``table_scope="content_hash"`` the legacy path is returned: + ``pod.uri + (schema:{pipeline_hash}, instance:{content_hash})``. - In live mode (pod present) the path is computed from the pod. In read-only/UNAVAILABLE mode (no pod) the path stored from the deserialized descriptor is returned (empty tuple when absent). """ if self._packet_function is None: return self._stored_pipeline_path + if self._node_identity_path_cache is not None: + return self._node_identity_path_cache pf = self._function_pod - return pf.uri + ( - f"schema:{self.pipeline_hash().to_string()}", - f"instance:{self.content_hash().to_string()}", - ) + if self._table_scope == "pipeline_hash": + path = pf.uri + (f"schema:{self.pipeline_hash().to_string()}",) + else: + path = pf.uri + ( + f"schema:{self.pipeline_hash().to_string()}", + f"instance:{self.content_hash().to_string()}", + ) + self._node_identity_path_cache = path + return path @property def node_uri(self) -> tuple[str, ...]: @@ -490,6 +532,7 @@ def clear_cache(self) -> None: self._cached_output_packets.clear() self._cached_output_table = None self._cached_content_hash_column = None + self._node_identity_path_cache = None self._update_modified_time() # ------------------------------------------------------------------ @@ -686,6 +729,8 @@ def get_cached_results( if taginfo is None or results is None: return {} + taginfo = self._filter_by_content_hash(taginfo) + joined = ( pl.DataFrame(taginfo) .join( @@ -711,7 +756,9 @@ def get_cached_results( drop_cols = [ c for c in filtered.column_names - if c.startswith(constants.META_PREFIX) or c == PIPELINE_ENTRY_ID_COL + if c.startswith(constants.META_PREFIX) + or c == PIPELINE_ENTRY_ID_COL + or c == constants.NODE_CONTENT_HASH_COL ] data_table = filtered.drop([c for c in drop_cols if c in filtered.column_names]) @@ -866,6 +913,9 @@ def add_pipeline_record( constants.PACKET_RECORD_ID: pa.array( [packet_record_id], type=pa.large_string() ), + constants.NODE_CONTENT_HASH_COL: pa.array( + [self.content_hash().to_string()], type=pa.large_string() + ), f"{constants.META_PREFIX}input_packet{constants.CONTEXT_KEY}": pa.array( [input_packet.data_context_key], type=pa.large_string() ), @@ -920,6 +970,8 @@ def get_all_records( if results is None or taginfo is None: return None + taginfo = self._filter_by_content_hash(taginfo) + joined = ( pl.DataFrame(taginfo) .join(pl.DataFrame(results), on=constants.PACKET_RECORD_ID, how="inner") @@ -929,6 +981,9 @@ def get_all_records( column_config = ColumnConfig.handle_config(columns, all_info=all_info) drop_columns = [] + # Always drop the node content hash column — it is an internal + # row-level discriminator, not a user-facing column. + drop_columns.append(constants.NODE_CONTENT_HASH_COL) if not column_config.meta and not column_config.all_info: drop_columns.extend( c for c in joined.column_names if c.startswith(constants.META_PREFIX) @@ -1000,6 +1055,8 @@ def _load_all_cached_records( if taginfo is None or results is None: return None + taginfo = self._filter_by_content_hash(taginfo) + joined = ( pl.DataFrame(taginfo) .join( @@ -1015,7 +1072,8 @@ def _load_all_cached_records( # Tag keys are the user-facing tag columns from the pipeline DB table. # Exclude: meta columns (__*), source columns (_source_*), - # system-tag columns (e.g. __tag_*), and the entry-ID column. + # system-tag columns (e.g. __tag_*), the entry-ID column, and the + # node content hash column. tag_keys = tuple( c for c in taginfo.column_names @@ -1023,12 +1081,15 @@ def _load_all_cached_records( and not c.startswith(constants.SOURCE_PREFIX) and not c.startswith(constants.SYSTEM_TAG_PREFIX) and c != PIPELINE_ENTRY_ID_COL + and c != constants.NODE_CONTENT_HASH_COL ) drop_cols = [ c for c in joined.column_names - if c.startswith(constants.META_PREFIX) or c == PIPELINE_ENTRY_ID_COL + if c.startswith(constants.META_PREFIX) + or c == PIPELINE_ENTRY_ID_COL + or c == constants.NODE_CONTENT_HASH_COL ] data_table = joined.drop([c for c in drop_cols if c in joined.column_names]) return tag_keys, data_table @@ -1124,6 +1185,7 @@ def iter_packets(self) -> Iterator[tuple[TagProtocol, PacketProtocol]]: ) if taginfo is not None and results is not None: + taginfo = self._filter_by_content_hash(taginfo) joined = ( pl.DataFrame(taginfo) .join( @@ -1148,6 +1210,7 @@ def iter_packets(self) -> Iterator[tuple[TagProtocol, PacketProtocol]]: for c in joined.column_names if c.startswith(constants.META_PREFIX) or c == PIPELINE_ENTRY_ID_COL + or c == constants.NODE_CONTENT_HASH_COL ] data_table = joined.drop( [c for c in drop_cols if c in joined.column_names] @@ -1444,6 +1507,7 @@ async def async_execute( ) if taginfo is not None and results is not None: + taginfo = self._filter_by_content_hash(taginfo) joined = ( pl.DataFrame(taginfo) .join( @@ -1463,6 +1527,7 @@ async def async_execute( for c in joined.column_names if c.startswith(constants.META_PREFIX) or c == PIPELINE_ENTRY_ID_COL + or c == constants.NODE_CONTENT_HASH_COL ] data_table = joined.drop( [c for c in drop_cols if c in joined.column_names] diff --git a/src/orcapod/core/nodes/operator_node.py b/src/orcapod/core/nodes/operator_node.py index 2f22f43b..26b022b8 100644 --- a/src/orcapod/core/nodes/operator_node.py +++ b/src/orcapod/core/nodes/operator_node.py @@ -5,7 +5,7 @@ import asyncio import logging from collections.abc import Iterator, Sequence -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal from orcapod import contexts from orcapod.channels import Channel, ReadableChannel, WritableChannel @@ -75,6 +75,7 @@ def __init__( # Optional DB params for persistent mode: pipeline_database: ArrowDatabaseProtocol | None = None, cache_mode: CacheMode = CacheMode.OFF, + table_scope: Literal["pipeline_hash", "content_hash"] = "pipeline_hash", ): if tracker_manager is None: tracker_manager = DEFAULT_TRACKER_MANAGER @@ -108,6 +109,8 @@ def __init__( self._stored_node_uri: tuple[str, ...] = () self._stored_pipeline_path: tuple[str, ...] = () self._descriptor: dict = {} + self._table_scope: Literal["pipeline_hash", "content_hash"] = table_scope + self._node_identity_path_cache: tuple[str, ...] | None = None if pipeline_database is not None: self.attach_databases( @@ -134,6 +137,7 @@ def attach_databases( self._cache_mode = cache_mode # Clear caches + self._node_identity_path_cache = None self.clear_cache() self._content_hash_cache.clear() self._pipeline_hash_cache.clear() @@ -172,6 +176,13 @@ def from_descriptor( """ from orcapod.pipeline.serialization import LoadStatus + if "table_scope" not in descriptor: + raise ValueError( + f"OperatorNode descriptor is missing required 'table_scope' field: " + f"{descriptor.get('label', '')}" + ) + table_scope: Literal["pipeline_hash", "content_hash"] = descriptor["table_scope"] + pipeline_db = databases.get("pipeline") cache_mode_str = descriptor.get("cache_mode", "off") try: @@ -185,6 +196,7 @@ def from_descriptor( operator=operator, input_streams=input_streams, label=descriptor.get("label"), + table_scope=table_scope, ) if pipeline_db is not None: node.attach_databases( @@ -237,6 +249,8 @@ def from_descriptor( node._stored_pipeline_hash = descriptor.get("pipeline_hash") node._stored_pipeline_path = tuple(descriptor.get("pipeline_path", ())) node._stored_node_uri = tuple(descriptor.get("node_uri") or []) + node._table_scope = table_scope + node._node_identity_path_cache = None # Determine load status based on DB availability and cache mode. # An uncached operator (cache_mode=OFF) never writes records to the @@ -354,23 +368,48 @@ def output_schema( def node_identity_path(self) -> tuple[str, ...]: """Return the node identity path for observer contextualization. - The identity path is ``operator.uri + (schema_hash, instance_hash)`` - and is computable independently of whether a pipeline database is - attached. + When ``table_scope="pipeline_hash"`` (default) the path is + ``operator.uri + (schema:{pipeline_hash},)`` — all runs that share + the same pipeline structure use one shared table, with per-run + disambiguation via the ``_node_content_hash`` row-level column. + + When ``table_scope="content_hash"`` the legacy path is returned: + ``operator.uri + (schema:{pipeline_hash}, instance:{content_hash})``. - In live mode (operator present) the path is computed from the operator. In read-only/UNAVAILABLE mode (no operator) the path stored from the deserialized descriptor is returned (empty tuple when absent). """ if self._operator is None: return self._stored_pipeline_path - return ( - self._operator.uri - + ( + if self._node_identity_path_cache is not None: + return self._node_identity_path_cache + if self._table_scope == "pipeline_hash": + path = self._operator.uri + ( + f"schema:{self.pipeline_hash().to_string()}", + ) + else: + path = self._operator.uri + ( f"schema:{self.pipeline_hash().to_string()}", f"instance:{self.content_hash().to_string()}", ) - ) + self._node_identity_path_cache = path + return path + + def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table": + """Filter *table* to rows whose ``_node_content_hash`` matches this node. + + Only applied when ``table_scope="pipeline_hash"`` because in that mode + multiple runs share the same DB table and must be disambiguated at read + time. In ``"content_hash"`` mode every run has its own table. + """ + if self._table_scope != "pipeline_hash": + return table + col_name = constants.NODE_CONTENT_HASH_COL + if col_name not in table.column_names: + return table + own_hash = self.content_hash().to_string() + mask = [v == own_hash for v in table.column(col_name).to_pylist()] + return table.filter(pa.array(mask)) @property def node_uri(self) -> tuple[str, ...]: @@ -391,6 +430,7 @@ def clear_cache(self) -> None: """Discard all in-memory cached state.""" self._cached_output_stream = None self._cached_output_table = None + self._node_identity_path_cache = None self._update_modified_time() def _store_output_stream(self, stream: StreamProtocol) -> None: @@ -414,6 +454,17 @@ def _store_output_stream(self, stream: StreamProtocol) -> None: pa.array(record_hashes, type=pa.large_string()), ) + # Add node content hash column for per-run disambiguation when using + # pipeline_hash table scope (multiple runs share the same table). + n_rows = output_table.num_rows + output_table = output_table.append_column( + constants.NODE_CONTENT_HASH_COL, + pa.array( + [self.content_hash().to_string()] * n_rows, + type=pa.large_string(), + ), + ) + # Store (identical rows across runs naturally deduplicate) self._pipeline_database.add_records( self.node_identity_path, @@ -422,7 +473,9 @@ def _store_output_stream(self, stream: StreamProtocol) -> None: skip_duplicates=True, ) - self._cached_output_table = output_table.drop(self.HASH_COLUMN_NAME) + self._cached_output_table = output_table.drop( + [self.HASH_COLUMN_NAME, constants.NODE_CONTENT_HASH_COL] + ) def _make_empty_table(self) -> "pa.Table": """Build a zero-row PyArrow table matching this node's full output schema. @@ -470,7 +523,15 @@ def _load_cached_stream_from_db(self) -> "ArrowTableStream | None": return None records_table = self._make_empty_table() else: - records_table = records + records_table = self._filter_by_content_hash(records) + # Drop internal columns that must not surface to stream consumers. + cols_to_drop = [ + c + for c in [constants.NODE_CONTENT_HASH_COL, self.HASH_COLUMN_NAME] + if c in records_table.column_names + ] + if cols_to_drop: + records_table = records_table.drop(cols_to_drop) tag_keys = self.keys()[0] return ArrowTableStream(records_table, tag_columns=tag_keys) @@ -568,6 +629,16 @@ def _replay_from_cache(self) -> None: records = self._pipeline_database.get_all_records(self.node_identity_path) if records is None: records = self._make_empty_table() + else: + records = self._filter_by_content_hash(records) + # Drop internal columns that must not surface to stream consumers. + cols_to_drop = [ + c + for c in [constants.NODE_CONTENT_HASH_COL, self.HASH_COLUMN_NAME] + if c in records.column_names + ] + if cols_to_drop: + records = records.drop(cols_to_drop) tag_keys = self.keys()[0] self._cached_output_stream = ArrowTableStream(records, tag_columns=tag_keys) @@ -664,9 +735,16 @@ def get_all_records( if results is None: return None + results = self._filter_by_content_hash(results) + column_config = ColumnConfig.handle_config(columns, all_info=all_info) - drop_columns = [] + # Always drop internal row-level columns. + drop_columns = [ + c + for c in [constants.NODE_CONTENT_HASH_COL, self.HASH_COLUMN_NAME] + if c in results.column_names + ] if not column_config.meta and not column_config.all_info: drop_columns.extend( c for c in results.column_names if c.startswith(constants.META_PREFIX) diff --git a/src/orcapod/pipeline/graph.py b/src/orcapod/pipeline/graph.py index 339bab00..6d2f5751 100644 --- a/src/orcapod/pipeline/graph.py +++ b/src/orcapod/pipeline/graph.py @@ -774,6 +774,7 @@ def _build_function_descriptor(self, node: "FunctionNode") -> dict[str, Any]: """ return { "function_config": node._function_pod.to_config(), + "table_scope": node._table_scope, } def _build_operator_descriptor(self, node: OperatorNode, level: str = "standard") -> dict[str, Any]: @@ -788,6 +789,7 @@ def _build_operator_descriptor(self, node: OperatorNode, level: str = "standard" """ result: dict[str, Any] = { "operator_config": node._operator.to_config(), + "table_scope": node._table_scope, } # cache_mode at standard+ only if level in ("standard", "full"): diff --git a/src/orcapod/system_constants.py b/src/orcapod/system_constants.py index 65d1d83e..dd469886 100644 --- a/src/orcapod/system_constants.py +++ b/src/orcapod/system_constants.py @@ -8,6 +8,7 @@ DATA_CONTEXT_KEY = "context_key" INPUT_PACKET_HASH_COL = "input_packet_hash" PACKET_RECORD_ID = "packet_id" +NODE_CONTENT_HASH_COL = "node_content_hash" SYSTEM_TAG_PREFIX_NAME = "tag" SYSTEM_TAG_SOURCE_ID_FIELD = "source_id" SYSTEM_TAG_RECORD_ID_FIELD = "record_id" @@ -67,6 +68,10 @@ def INPUT_PACKET_HASH_COL(self) -> str: def PACKET_RECORD_ID(self) -> str: return f"{self._global_prefix}{SYSTEM_COLUMN_PREFIX}{PACKET_RECORD_ID}" + @property + def NODE_CONTENT_HASH_COL(self) -> str: + return f"{self._global_prefix}{DATAGRAM_PREFIX}{NODE_CONTENT_HASH_COL}" + @property def SYSTEM_TAG_PREFIX(self) -> str: return f"{self._global_prefix}{DATAGRAM_PREFIX}{SYSTEM_TAG_PREFIX_NAME}_" diff --git a/tests/test_core/function_pod/test_function_node_caching.py b/tests/test_core/function_pod/test_function_node_caching.py index a6cacaea..2e9ddab8 100644 --- a/tests/test_core/function_pod/test_function_node_caching.py +++ b/tests/test_core/function_pod/test_function_node_caching.py @@ -258,9 +258,9 @@ def test_phase2_processes_novel_entry_ids_only(self): assert result_values == [20, 40, 60] def test_same_packet_new_tag_triggers_phase2(self): - """With the two-level pipeline_path, nodes with different input data have - different pipeline_paths. node2 (id=1, x=10) has a different content_hash - than node1 (id=0, x=10), so each has its own pipeline DB path.""" + """With pipeline_hash scope (default), nodes with same schema share one DB table. + node2 (id=1, x=10) has a different content_hash than node1 (id=0, x=10), + so Phase 1 finds no records for node2 and Phase 2 executes the packet.""" db = InMemoryArrowDatabase() # First run: tag=0, x=10 @@ -275,20 +275,19 @@ def test_same_packet_new_tag_triggers_phase2(self): stream2 = _make_stream([{"id": 1, "x": 10}]) node2, _ = _make_node(stream2, db=db) - # Different data → different pipeline_path (different instance: component) - assert node1.node_identity_path != node2.node_identity_path - assert node1.node_identity_path[-2] == node2.node_identity_path[-2] # same schema: + # Same schema → same pipeline_hash → SAME node_identity_path (shared table) + assert node1.node_identity_path == node2.node_identity_path + assert node1.node_identity_path[-1].startswith("schema:") results = list(node2.iter_packets()) - # node2 has a different pipeline_path: Phase 1 finds no records for node2's path. - # Phase 2 processes the single row → 1 result + # Phase 1 finds no records for node2's content_hash → Phase 2 processes the row assert len(results) == 1 - # node2's pipeline DB should have 1 record - pipeline_records = db.get_all_records(node2.node_identity_path) - assert pipeline_records is not None - assert pipeline_records.num_rows == 1 + # Shared table now has 2 records (one per node/content_hash) + all_pipeline_records = db.get_all_records(node2.node_identity_path) + assert all_pipeline_records is not None + assert all_pipeline_records.num_rows == 2 # Result DB should still have only 1 record (same packet hash, cache hit) result_records = node2._cached_function_pod._result_database.get_all_records( diff --git a/tests/test_core/function_pod/test_function_pod_node.py b/tests/test_core/function_pod/test_function_pod_node.py index 3076cadd..352425b1 100644 --- a/tests/test_core/function_pod/test_function_pod_node.py +++ b/tests/test_core/function_pod/test_function_pod_node.py @@ -116,10 +116,11 @@ def test_node_hash_is_equal_to_wrapped_function_pod_output_stream( stream_hash = output_stream.content_hash().to_string() assert node_hash == stream_hash - def test_pipeline_path_ends_with_node_hash(self, node): + def test_pipeline_path_ends_with_schema_hash(self, node): path = node.node_identity_path - assert path[-2].startswith("schema:") - assert path[-1].startswith("instance:") + # With pipeline_hash scope (default): path ends with schema:{hash} only + assert path[-1].startswith("schema:") + assert not any(seg.startswith("instance:") for seg in path) def test_pipeline_path_contains_packet_function_uri(self, node): pf_uri = node._packet_function.uri @@ -379,8 +380,9 @@ def test_pipeline_hash_is_consistent(self, double_pf): assert node.pipeline_hash() == node.pipeline_hash() def test_pipeline_node_hash_in_uri_is_schema_based(self, double_pf): - """node_identity_path uses schema:{pipeline_hash} and instance:{content_hash}. - Two nodes with same schema share the schema: component but differ in instance:.""" + """node_identity_path uses only schema:{pipeline_hash} (pipeline_hash scope default). + Two nodes with same schema share the same full path; per-run isolation + is achieved via the _node_content_hash row column.""" db = InMemoryArrowDatabase() node1 = FunctionNode( function_pod=FunctionPod(packet_function=double_pf), @@ -392,12 +394,9 @@ def test_pipeline_node_hash_in_uri_is_schema_based(self, double_pf): input_stream=make_int_stream(n=99), # different data pipeline_database=db, ) - # Same schema → same schema: component - assert node1.node_identity_path[-2] == node2.node_identity_path[-2] - assert node1.node_identity_path[-2].startswith("schema:") - # Different data → different instance: component - assert node1.node_identity_path[-1] != node2.node_identity_path[-1] - assert node1.node_identity_path[-1].startswith("instance:") + # Same schema → same pipeline_hash → SAME full path + assert node1.node_identity_path == node2.node_identity_path + assert node1.node_identity_path[-1].startswith("schema:") # --------------------------------------------------------------------------- @@ -659,8 +658,9 @@ def test_node_identity_path_starts_with_pf_uri(self, double_pf): ) pf_uri = node._packet_function.uri assert node.node_identity_path[: len(pf_uri)] == pf_uri - assert node.node_identity_path[-2].startswith("schema:") - assert node.node_identity_path[-1].startswith("instance:") + # With pipeline_hash scope (default): ends with schema:{hash} only + assert node.node_identity_path[-1].startswith("schema:") + assert not any(seg.startswith("instance:") for seg in node.node_identity_path) # --------------------------------------------------------------------------- diff --git a/tests/test_core/function_pod/test_pipeline_hash_integration.py b/tests/test_core/function_pod/test_pipeline_hash_integration.py index 5dbbcf34..e907a15b 100644 --- a/tests/test_core/function_pod/test_pipeline_hash_integration.py +++ b/tests/test_core/function_pod/test_pipeline_hash_integration.py @@ -277,12 +277,14 @@ def test_table_stream_pipeline_hash_equals_source_pipeline_hash(self): class TestFunctionNodePipelineHashFix: """ - The pipeline_path now uses a two-level formula: - prefix + pf.uri + (f"schema:{pipeline_hash}", f"instance:{content_hash}") + With ``table_scope="pipeline_hash"`` (the default), nodes sharing the same + pipeline structure (same function + same input schema) route to a single + shared DB table. The path formula is: - Nodes with the same schema share the schema: component. - Nodes with different data have different instance: components, isolating - data-distinct runs in separate DB locations. + prefix + pf.uri + (f"schema:{pipeline_hash}",) + + Per-run isolation is achieved via the ``_node_content_hash`` row-level + column stored in every pipeline record. """ def test_different_data_same_schema_share_pipeline_path(self, double_pf): @@ -297,15 +299,14 @@ def test_different_data_same_schema_share_pipeline_path(self, double_pf): input_stream=make_int_stream(n=5), pipeline_database=db, ) - # Same schema → same schema: component - assert node1.node_identity_path[-2] == node2.node_identity_path[-2] - assert node1.node_identity_path[-2].startswith("schema:") - # Different data → different instance: component → different full path - assert node1.node_identity_path[-1] != node2.node_identity_path[-1] - assert node1.node_identity_path != node2.node_identity_path + # Same schema → same pipeline_hash → SAME node_identity_path + assert node1.node_identity_path == node2.node_identity_path + assert node1.node_identity_path[-1].startswith("schema:") + # Per-run disambiguation via content_hash (not in the path) + assert node1.content_hash() != node2.content_hash() def test_different_data_same_schema_share_uri(self, double_pf): - """URI is schema-based; two nodes with same schema share URI prefix.""" + """With pipeline_hash scope, two nodes with same schema share the full path.""" db = InMemoryArrowDatabase() node1 = FunctionNode( function_pod=FunctionPod(packet_function=double_pf), @@ -331,11 +332,8 @@ def test_different_data_same_schema_share_uri(self, double_pf): ), pipeline_database=db, ) - # URI (pf.uri) portion is identical for same function - pf_uri_len = len(node1._packet_function.uri) - assert node1.node_identity_path[:pf_uri_len] == node2.node_identity_path[:pf_uri_len] - # But full path differs (different instance: suffix) - assert node1.node_identity_path != node2.node_identity_path + # With pipeline_hash scope, same function + schema → identical full path + assert node1.node_identity_path == node2.node_identity_path def test_different_data_yields_different_content_hash(self, double_pf): """Same schema, different actual data → content_hash must differ.""" @@ -389,8 +387,11 @@ def test_node_identity_path_starts_with_pf_uri(self, double_pf): ) pf_uri = node._packet_function.uri assert node.node_identity_path[: len(pf_uri)] == pf_uri - assert node.node_identity_path[-2].startswith("schema:") - assert node.node_identity_path[-1].startswith("instance:") + # With pipeline_hash scope (default): path ends with schema:{hash} only + assert node.node_identity_path[-1].startswith("schema:") + assert not any( + seg.startswith("instance:") for seg in node.node_identity_path + ) # --------------------------------------------------------------------------- @@ -408,11 +409,13 @@ class TestPipelineDbScoping: def test_shared_db_overlapping_inputs_avoids_recomputation(self, double_pf): """ node1 processes {0,1,2}. node2 processes {0,1,2,3,4}. - With the two-level formula, node1 and node2 have different pipeline_paths - (different content hashes), so pipeline DB records are isolated per node. - However, the result DB (CachedFunctionPod) is shared by packet hash, so - node2 still avoids recomputing packets that node1 already computed. - Total function calls: 3 (node1) + 2 (node2, only x=3 and x=4 are new). + With pipeline_hash scope (default), node1 and node2 share the SAME + node_identity_path (same function + schema). Per-run isolation happens + via the ``_node_content_hash`` row column. node2 Phase 1 finds no + matching records (different content_hash), so Phase 2 executes all 5; + however the result DB (CachedFunctionPod) is shared, so only x=3 and + x=4 require actual function invocations. + Total function calls: 3 (node1) + 2 (node2). """ call_count = 0 @@ -435,17 +438,16 @@ def counting_double(x: int) -> int: pipeline_database=db, ) - # Different data → different instance: component → different full path - assert node1.node_identity_path != node2.node_identity_path - assert node1.node_identity_path[-2] == node2.node_identity_path[-2] # same schema: + # Same schema → same pipeline_hash → same node_identity_path (shared table) + assert node1.node_identity_path == node2.node_identity_path + assert node1.node_identity_path[-1].startswith("schema:") node1.run() assert call_count == 3 node2.run() - # node2 has a different pipeline_path but shares the result DB cache. - # Phase 2 calls execute_packet for all 5 entries; CachedFunctionPod - # only invokes the function for novel packets (x=3, x=4 → 2 calls). + # node2 Phase 1 finds no records for its content_hash → Phase 2 runs all 5. + # CachedFunctionPod only invokes the function for novel packets (x=3, x=4). assert call_count == 5 def test_shared_db_all_inputs_pre_computed_zero_recomputation(self, double_pf): @@ -565,7 +567,8 @@ def test_pipeline_hash_chain_root_to_function_node(self, double_pf): def test_chained_nodes_share_pipeline_path(self, double_pf): """ Two independent two-node pipelines with same schema but different data - have the same schema: component but different instance: components. + share the same node_identity_path (pipeline_hash scope default). + Per-run isolation is via _node_content_hash at row level. """ db = InMemoryArrowDatabase() @@ -589,10 +592,9 @@ def test_chained_nodes_share_pipeline_path(self, double_pf): node1_b.run() src_b = node1_b.as_source() - # Same schema → same schema: component; different data → different full path - assert node1_a.node_identity_path[-2] == node1_b.node_identity_path[-2] - assert node1_a.node_identity_path[-2].startswith("schema:") - assert node1_a.node_identity_path != node1_b.node_identity_path + # Same schema → same pipeline_hash → SAME full path (shared table) + assert node1_a.node_identity_path == node1_b.node_identity_path + assert node1_a.node_identity_path[-1].startswith("schema:") # At the DerivedSource level, pipeline_hash is schema-only assert src_a.pipeline_hash() == src_b.pipeline_hash() diff --git a/tests/test_core/operators/test_operator_node.py b/tests/test_core/operators/test_operator_node.py index 28b9bb65..6b64f307 100644 --- a/tests/test_core/operators/test_operator_node.py +++ b/tests/test_core/operators/test_operator_node.py @@ -166,12 +166,13 @@ def test_pipeline_path_contains_operator_uri(self, simple_stream): # operator.uri is a tuple starting with the class name assert any("MapPackets" in segment for segment in path) - def test_pipeline_path_ends_with_node_hash(self, simple_stream): + def test_pipeline_path_ends_with_schema_hash(self, simple_stream): op = MapPackets({"x": "renamed_x"}) node = _make_node(op, (simple_stream,)) path = node.node_identity_path - assert path[-2].startswith("schema:") - assert path[-1].startswith("instance:") + # With pipeline_hash scope (default): ends with schema:{hash} only + assert path[-1].startswith("schema:") + assert not any(seg.startswith("instance:") for seg in path) def test_no_tag_schema_hash_in_path(self, simple_stream): op = MapPackets({"x": "renamed_x"}) diff --git a/tests/test_core/test_caching_integration.py b/tests/test_core/test_caching_integration.py index 8b9b5f9f..be57cf63 100644 --- a/tests/test_core/test_caching_integration.py +++ b/tests/test_core/test_caching_integration.py @@ -309,8 +309,9 @@ def test_function_node_stores_records( def test_cross_source_sharing_same_pipeline_path( self, clinic_a, clinic_b, source_db, pipeline_db, result_db, pod ): - """Different source identities, same schema → same schema: component but - different instance: components (different data → different content_hash).""" + """Different source identities, same schema → same pipeline_hash → same path. + With pipeline_hash scope (default), both nodes share a single DB table. + Per-run isolation is via the _node_content_hash row column.""" patients_a, labs_a = clinic_a patients_b, labs_b = clinic_b @@ -346,17 +347,17 @@ def test_cross_source_sharing_same_pipeline_path( result_database=result_db, ) - # Same schema → same schema: component - assert fn_a.node_identity_path[-2] == fn_b.node_identity_path[-2] - assert fn_a.node_identity_path[-2].startswith("schema:") - # Different data → different full path - assert fn_a.node_identity_path != fn_b.node_identity_path + # Same schema → same pipeline_hash → SAME full path (shared table) + assert fn_a.node_identity_path == fn_b.node_identity_path + assert fn_a.node_identity_path[-1].startswith("schema:") + # Content hashes differ (different source data) + assert fn_a.content_hash() != fn_b.content_hash() def test_cross_source_records_accumulate_in_shared_table( self, clinic_a, clinic_b, source_db, pipeline_db, result_db, pod ): - """With the two-level formula, each pipeline writes to its own DB path. - Records from pipeline A and B are stored separately.""" + """With pipeline_hash scope, A and B share one DB table. + get_all_records() filters by content_hash so each node sees only its own rows.""" patients_a, labs_a = clinic_a patients_b, labs_b = clinic_b @@ -485,7 +486,8 @@ def test_replay_empty_cache_returns_empty_stream(self, clinic_a, source_db): def test_content_hash_scoping_isolates_source_combinations( self, clinic_a, clinic_b, source_db, operator_db ): - """Different source combinations → different pipeline_paths.""" + """Different source combinations, same schema → same pipeline_hash → shared path. + With pipeline_hash scope (default), per-run isolation is via _node_content_hash.""" patients_a, labs_a = clinic_a patients_b, labs_b = clinic_b @@ -518,7 +520,10 @@ def test_content_hash_scoping_isolates_source_combinations( pipeline_database=operator_db, cache_mode=CacheMode.LOG, ) - assert node_a.node_identity_path != node_b.node_identity_path + # Same schema → same pipeline_hash → SAME path (shared table) + assert node_a.node_identity_path == node_b.node_identity_path + # Per-run isolation is via content_hash + assert node_a.content_hash() != node_b.content_hash() # --------------------------------------------------------------------------- @@ -595,11 +600,10 @@ def test_full_pipeline_source_to_function_to_operator( pipeline_database=pipeline_db, result_database=result_db, ) - # Same schema → same schema: component; different data → different full path - assert fn_node.node_identity_path[-2] == fn_node_b.node_identity_path[-2] - assert fn_node.node_identity_path != fn_node_b.node_identity_path + # Same schema → same pipeline_hash → same path (shared table) + assert fn_node.node_identity_path == fn_node_b.node_identity_path fn_node_b.run() - # fn_node_b has its own path: 2 records from clinic B only + # get_all_records() filters by content_hash: fn_node_b sees only its 2 records assert fn_node_b.get_all_records().num_rows == 2 # Verify source caches are populated diff --git a/tests/test_core/test_table_scope.py b/tests/test_core/test_table_scope.py new file mode 100644 index 00000000..ff323604 --- /dev/null +++ b/tests/test_core/test_table_scope.py @@ -0,0 +1,593 @@ +"""Tests for the table_scope flag on FunctionNode and OperatorNode. + +Covers the two modes explicitly: + - "pipeline_hash" (default): nodes with the same structure share one DB table, + distinguished at read-time by the ``_node_content_hash`` row column. + - "content_hash" (legacy): every node gets its own DB table path + (``instance:{content_hash}`` segment appended). +""" + +from __future__ import annotations + +import pyarrow as pa +import pytest + +from orcapod.core.function_pod import FunctionPod +from orcapod.core.nodes import FunctionNode, OperatorNode +from orcapod.core.operators import Join +from orcapod.core.packet_function import PythonPacketFunction +from orcapod.core.sources import ArrowTableSource, DictSource +from orcapod.databases import InMemoryArrowDatabase +from orcapod.system_constants import SystemConstant +from orcapod.types import CacheMode + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_pod() -> FunctionPod: + """Pod that doubles the 'y' packet column (tag column is 'x').""" + + def double(y: int) -> int: + return y * 2 + + pf = PythonPacketFunction(function=double, output_keys=["result"]) + return FunctionPod(packet_function=pf) + + +def _make_source(data: list[dict], source_id: str = "src") -> DictSource: + """Source with tag column 'x' and packet column 'y'.""" + return DictSource(data=data, tag_columns=["x"], source_id=source_id) + + +def _make_join_streams( + vals: list[int], source_id_suffix: str = "1" +) -> tuple[ArrowTableSource, ArrowTableSource]: + """Two ArrowTableSources with key/val and key/score columns. + + Distinct ``source_id_suffix`` values guarantee different content_hash when + the ArrowTableSource identity includes its source_id. + """ + table_a = pa.table( + { + "key": pa.array([str(v) for v in vals], type=pa.large_string()), + "val": pa.array(vals, type=pa.int64()), + } + ) + table_b = pa.table( + { + "key": pa.array([str(v) for v in vals], type=pa.large_string()), + "score": pa.array(vals, type=pa.int64()), + } + ) + src_a = ArrowTableSource( + table_a, tag_columns=["key"], source_id=f"src_a_{source_id_suffix}", infer_nullable=True + ) + src_b = ArrowTableSource( + table_b, tag_columns=["key"], source_id=f"src_b_{source_id_suffix}", infer_nullable=True + ) + return src_a, src_b + + +# --------------------------------------------------------------------------- +# FunctionNode — table_scope="pipeline_hash" (default) +# --------------------------------------------------------------------------- + + +class TestFunctionNodePipelineHashScope: + """Default table_scope="pipeline_hash": shared table, per-run isolation via content hash.""" + + def test_default_scope_is_pipeline_hash(self): + db = InMemoryArrowDatabase() + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + node = FunctionNode(function_pod=pod, input_stream=src, pipeline_database=db) + assert node._table_scope == "pipeline_hash" + + def test_node_identity_path_ends_with_schema_only(self): + db = InMemoryArrowDatabase() + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + node = FunctionNode(function_pod=pod, input_stream=src, pipeline_database=db) + path = node.node_identity_path + assert path[-1].startswith("schema:"), f"Expected schema:... got {path[-1]!r}" + assert not any(seg.startswith("instance:") for seg in path) + + def test_two_nodes_same_function_same_schema_share_path(self): + """Same function + same source schema → same pipeline_hash → same node_identity_path.""" + db = InMemoryArrowDatabase() + pod = _make_pod() + # Both use same source_id → same schema structure → same pipeline_hash + src_a = _make_source([{"x": 1, "y": 10}], source_id="src") + src_b = _make_source([{"x": 2, "y": 20}], source_id="src") + node_a = FunctionNode(function_pod=pod, input_stream=src_a, pipeline_database=db) + node_b = FunctionNode(function_pod=pod, input_stream=src_b, pipeline_database=db) + + assert node_a.node_identity_path == node_b.node_identity_path + assert node_a.pipeline_hash() == node_b.pipeline_hash() + + def test_node_content_hash_col_in_pipeline_records(self): + """After run(), pipeline records contain _node_content_hash column.""" + db = InMemoryArrowDatabase() + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + node = FunctionNode(function_pod=pod, input_stream=src, pipeline_database=db) + node.run() + + col_name = SystemConstant().NODE_CONTENT_HASH_COL + pipeline_table = node._pipeline_database.get_all_records(node.node_identity_path) + assert pipeline_table is not None + assert col_name in pipeline_table.column_names + + def test_get_all_records_drops_node_content_hash_col(self): + """Consumer-facing records must NOT expose _node_content_hash.""" + db = InMemoryArrowDatabase() + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + node = FunctionNode(function_pod=pod, input_stream=src, pipeline_database=db) + node.run() + + records = node.get_all_records() + col_name = SystemConstant().NODE_CONTENT_HASH_COL + assert col_name not in records.column_names + + def test_isolation_two_nodes_share_table_see_only_own_records(self): + """Two nodes sharing a DB path each see only their own records via content_hash filter.""" + db = InMemoryArrowDatabase() + pod = _make_pod() + # Different source_ids → different content_hash → different _node_content_hash rows + src_a = _make_source([{"x": 1, "y": 10}], source_id="source_a") + src_b = _make_source([{"x": 2, "y": 20}], source_id="source_b") + node_a = FunctionNode(function_pod=pod, input_stream=src_a, pipeline_database=db) + node_b = FunctionNode(function_pod=pod, input_stream=src_b, pipeline_database=db) + + node_a.run() + node_b.run() + + records_a = node_a.get_all_records() + records_b = node_b.get_all_records() + + # Each node returns only its own 1 record + assert records_a.num_rows == 1 + assert records_b.num_rows == 1 + + # Results are correct per-node + assert records_a.column("result")[0].as_py() == 20 # 10 * 2 + assert records_b.column("result")[0].as_py() == 40 # 20 * 2 + + +# --------------------------------------------------------------------------- +# FunctionNode — table_scope="content_hash" (legacy) +# --------------------------------------------------------------------------- + + +class TestFunctionNodeContentHashScope: + """Legacy table_scope="content_hash": per-run isolated tables.""" + + def test_scope_set_to_content_hash(self): + db = InMemoryArrowDatabase() + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + node = FunctionNode( + function_pod=pod, input_stream=src, pipeline_database=db, table_scope="content_hash" + ) + assert node._table_scope == "content_hash" + + def test_node_identity_path_ends_with_schema_and_instance(self): + db = InMemoryArrowDatabase() + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + node = FunctionNode( + function_pod=pod, input_stream=src, pipeline_database=db, table_scope="content_hash" + ) + path = node.node_identity_path + assert path[-2].startswith("schema:"), f"Expected schema:... got {path[-2]!r}" + assert path[-1].startswith("instance:"), f"Expected instance:... got {path[-1]!r}" + assert node.pipeline_hash().to_string() in path[-2] + assert node.content_hash().to_string() in path[-1] + + def test_two_nodes_different_source_id_have_different_paths(self): + """Different source_id → different content_hash → different node_identity_path.""" + db = InMemoryArrowDatabase() + pod = _make_pod() + src_a = _make_source([{"x": 1, "y": 10}], source_id="source_a") + src_b = _make_source([{"x": 2, "y": 20}], source_id="source_b") + node_a = FunctionNode( + function_pod=pod, input_stream=src_a, pipeline_database=db, table_scope="content_hash" + ) + node_b = FunctionNode( + function_pod=pod, input_stream=src_b, pipeline_database=db, table_scope="content_hash" + ) + assert node_a.content_hash() != node_b.content_hash() + assert node_a.node_identity_path != node_b.node_identity_path + + def test_pipeline_hash_still_equal_across_content_hash_nodes(self): + """Even in content_hash scope, pipeline_hash is the same for same function structure.""" + db = InMemoryArrowDatabase() + pod = _make_pod() + # Same source_id → same pipeline_hash + src_a = _make_source([{"x": 1, "y": 10}], source_id="src") + src_b = _make_source([{"x": 2, "y": 20}], source_id="src") + node_a = FunctionNode( + function_pod=pod, input_stream=src_a, pipeline_database=db, table_scope="content_hash" + ) + node_b = FunctionNode( + function_pod=pod, input_stream=src_b, pipeline_database=db, table_scope="content_hash" + ) + # pipeline_hash same → same schema: segment + assert node_a.pipeline_hash() == node_b.pipeline_hash() + assert node_a.node_identity_path[-2] == node_b.node_identity_path[-2] + + +# --------------------------------------------------------------------------- +# FunctionNode — from_descriptor validation +# --------------------------------------------------------------------------- + + +class TestFunctionNodeDescriptorTableScope: + def test_from_descriptor_missing_table_scope_raises(self): + """Descriptor without table_scope must raise ValueError.""" + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + db = InMemoryArrowDatabase() + node = FunctionNode(function_pod=pod, input_stream=src, pipeline_database=db) + tag_schema, packet_schema = node.output_schema() + descriptor = { + "node_type": "function", + "label": None, + "content_hash": node.content_hash().to_string(), + "pipeline_hash": node.pipeline_hash().to_string(), + "data_context_key": node.data_context_key, + # "table_scope" intentionally omitted + "output_schema": { + "tag": {k: str(v) for k, v in tag_schema.items()}, + "packet": {k: str(v) for k, v in packet_schema.items()}, + }, + } + with pytest.raises(ValueError, match="table_scope"): + FunctionNode.from_descriptor( + descriptor=descriptor, + function_pod=None, + input_stream=None, + databases={}, + ) + + def test_from_descriptor_preserves_pipeline_hash_scope(self): + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + db = InMemoryArrowDatabase() + node = FunctionNode(function_pod=pod, input_stream=src, pipeline_database=db) + tag_schema, packet_schema = node.output_schema() + descriptor = { + "node_type": "function", + "label": None, + "content_hash": node.content_hash().to_string(), + "pipeline_hash": node.pipeline_hash().to_string(), + "data_context_key": node.data_context_key, + "table_scope": "pipeline_hash", + "output_schema": { + "tag": {k: str(v) for k, v in tag_schema.items()}, + "packet": {k: str(v) for k, v in packet_schema.items()}, + }, + } + loaded = FunctionNode.from_descriptor( + descriptor=descriptor, + function_pod=None, + input_stream=None, + databases={}, + ) + assert loaded._table_scope == "pipeline_hash" + + def test_from_descriptor_preserves_content_hash_scope(self): + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + db = InMemoryArrowDatabase() + node = FunctionNode( + function_pod=pod, input_stream=src, pipeline_database=db, table_scope="content_hash" + ) + tag_schema, packet_schema = node.output_schema() + descriptor = { + "node_type": "function", + "label": None, + "content_hash": node.content_hash().to_string(), + "pipeline_hash": node.pipeline_hash().to_string(), + "data_context_key": node.data_context_key, + "table_scope": "content_hash", + "output_schema": { + "tag": {k: str(v) for k, v in tag_schema.items()}, + "packet": {k: str(v) for k, v in packet_schema.items()}, + }, + } + loaded = FunctionNode.from_descriptor( + descriptor=descriptor, + function_pod=None, + input_stream=None, + databases={}, + ) + assert loaded._table_scope == "content_hash" + + +# --------------------------------------------------------------------------- +# OperatorNode — table_scope="pipeline_hash" (default) +# --------------------------------------------------------------------------- + + +class TestOperatorNodePipelineHashScope: + def test_default_scope_is_pipeline_hash(self): + db = InMemoryArrowDatabase() + src_a, src_b = _make_join_streams([1, 2], "x") + node = OperatorNode( + operator=Join(), + input_streams=(src_a, src_b), + pipeline_database=db, + ) + assert node._table_scope == "pipeline_hash" + + def test_node_identity_path_ends_with_schema_only(self): + db = InMemoryArrowDatabase() + src_a, src_b = _make_join_streams([1, 2], "x") + node = OperatorNode( + operator=Join(), + input_streams=(src_a, src_b), + pipeline_database=db, + ) + path = node.node_identity_path + assert path[-1].startswith("schema:"), f"Expected schema:... got {path[-1]!r}" + assert not any(seg.startswith("instance:") for seg in path) + + def test_two_nodes_same_operator_same_schema_share_path(self): + """Same operator + same schema (same source_id prefix) → same pipeline_hash → same path.""" + db = InMemoryArrowDatabase() + # Same source_id_suffix → same source_id → same pipeline_hash + src_a1, src_b1 = _make_join_streams([1, 2], "x") + src_a2, src_b2 = _make_join_streams([3, 4], "x") + node1 = OperatorNode( + operator=Join(), + input_streams=(src_a1, src_b1), + pipeline_database=db, + ) + node2 = OperatorNode( + operator=Join(), + input_streams=(src_a2, src_b2), + pipeline_database=db, + ) + assert node1.node_identity_path == node2.node_identity_path + assert node1.pipeline_hash() == node2.pipeline_hash() + + def test_two_nodes_different_source_ids_have_different_content_hash(self): + """Different source_ids → different content_hash for OperatorNode.""" + db = InMemoryArrowDatabase() + src_a1, src_b1 = _make_join_streams([1, 2], "run1") + src_a2, src_b2 = _make_join_streams([1, 2], "run2") + node1 = OperatorNode( + operator=Join(), + input_streams=(src_a1, src_b1), + pipeline_database=db, + ) + node2 = OperatorNode( + operator=Join(), + input_streams=(src_a2, src_b2), + pipeline_database=db, + ) + assert node1.content_hash() != node2.content_hash() + + def test_isolation_two_nodes_share_table_see_only_own_records(self): + """Two OperatorNodes sharing a DB path each see only their own records.""" + db = InMemoryArrowDatabase() + src_a1, src_b1 = _make_join_streams([1, 2], "run1") + src_a2, src_b2 = _make_join_streams([3, 4], "run2") + node1 = OperatorNode( + operator=Join(), + input_streams=(src_a1, src_b1), + pipeline_database=db, + cache_mode=CacheMode.LOG, + ) + node2 = OperatorNode( + operator=Join(), + input_streams=(src_a2, src_b2), + pipeline_database=db, + cache_mode=CacheMode.LOG, + ) + + node1.run() + node2.run() + + records1 = node1.get_all_records() + records2 = node2.get_all_records() + + assert records1 is not None + assert records2 is not None + + # Each has 2 join results (matching keys) + assert records1.num_rows == 2 + assert records2.num_rows == 2 + + # Values are correct per node + keys1 = set(records1.column("key").to_pylist()) + keys2 = set(records2.column("key").to_pylist()) + assert keys1 == {"1", "2"} + assert keys2 == {"3", "4"} + + +# --------------------------------------------------------------------------- +# OperatorNode — table_scope="content_hash" (legacy) +# --------------------------------------------------------------------------- + + +class TestOperatorNodeContentHashScope: + def test_node_identity_path_ends_with_schema_and_instance(self): + db = InMemoryArrowDatabase() + src_a, src_b = _make_join_streams([1, 2], "x") + node = OperatorNode( + operator=Join(), + input_streams=(src_a, src_b), + pipeline_database=db, + table_scope="content_hash", + ) + path = node.node_identity_path + assert path[-2].startswith("schema:"), f"Expected schema:... got {path[-2]!r}" + assert path[-1].startswith("instance:"), f"Expected instance:... got {path[-1]!r}" + assert node.pipeline_hash().to_string() in path[-2] + assert node.content_hash().to_string() in path[-1] + + def test_two_nodes_different_source_ids_have_different_paths(self): + db = InMemoryArrowDatabase() + src_a1, src_b1 = _make_join_streams([1, 2], "run1") + src_a2, src_b2 = _make_join_streams([1, 2], "run2") + node1 = OperatorNode( + operator=Join(), + input_streams=(src_a1, src_b1), + pipeline_database=db, + table_scope="content_hash", + ) + node2 = OperatorNode( + operator=Join(), + input_streams=(src_a2, src_b2), + pipeline_database=db, + table_scope="content_hash", + ) + assert node1.content_hash() != node2.content_hash() + assert node1.node_identity_path != node2.node_identity_path + + def test_pipeline_hash_same_across_content_hash_nodes_with_same_schema(self): + """Even in content_hash scope, pipeline_hash is the same for same operator+schema.""" + db = InMemoryArrowDatabase() + # Same source_id_suffix → same source_ids → same pipeline_hash AND same content_hash + # (ArrowTableSource hashes by schema/source_id, not raw data values) + src_a1, src_b1 = _make_join_streams([1, 2], "x") + src_a2, src_b2 = _make_join_streams([3, 4], "x") + node1 = OperatorNode( + operator=Join(), + input_streams=(src_a1, src_b1), + pipeline_database=db, + table_scope="content_hash", + ) + node2 = OperatorNode( + operator=Join(), + input_streams=(src_a2, src_b2), + pipeline_database=db, + table_scope="content_hash", + ) + assert node1.pipeline_hash() == node2.pipeline_hash() + # schema: segments match + assert node1.node_identity_path[-2] == node2.node_identity_path[-2] + + +# --------------------------------------------------------------------------- +# OperatorNode — from_descriptor validation +# --------------------------------------------------------------------------- + + +class TestOperatorNodeDescriptorTableScope: + def test_from_descriptor_missing_table_scope_raises(self): + from orcapod.core.nodes.operator_node import OperatorNode as ON + + db = InMemoryArrowDatabase() + descriptor = { + "node_type": "operator", + "label": None, + "content_hash": "fake_hash", + "pipeline_hash": "fake_pipeline_hash", + "data_context_key": "std:v0.1:default", + # "table_scope" intentionally omitted + "output_schema": {"tag": {"key": "large_string"}, "packet": {"val": "int64"}}, + "operator": { + "class_name": "Join", + "module_path": "orcapod.core.operators.join", + "config": {}, + }, + "cache_mode": "OFF", + } + with pytest.raises(ValueError, match="table_scope"): + ON.from_descriptor( + descriptor=descriptor, + operator=None, + input_streams=(), + databases={"pipeline": db}, + ) + + def test_from_descriptor_preserves_pipeline_hash_scope(self): + from orcapod.core.nodes.operator_node import OperatorNode as ON + + db = InMemoryArrowDatabase() + descriptor = { + "node_type": "operator", + "label": None, + "content_hash": "fake_hash", + "pipeline_hash": "fake_pipeline_hash", + "data_context_key": "std:v0.1:default", + "table_scope": "pipeline_hash", + "output_schema": {"tag": {"key": "large_string"}, "packet": {"val": "int64"}}, + "operator": { + "class_name": "Join", + "module_path": "orcapod.core.operators.join", + "config": {}, + }, + "cache_mode": "OFF", + } + loaded = ON.from_descriptor( + descriptor=descriptor, + operator=None, + input_streams=(), + databases={"pipeline": db}, + ) + assert loaded._table_scope == "pipeline_hash" + + def test_from_descriptor_preserves_content_hash_scope(self): + from orcapod.core.nodes.operator_node import OperatorNode as ON + + db = InMemoryArrowDatabase() + descriptor = { + "node_type": "operator", + "label": None, + "content_hash": "fake_hash", + "pipeline_hash": "fake_pipeline_hash", + "data_context_key": "std:v0.1:default", + "table_scope": "content_hash", + "output_schema": {"tag": {"key": "large_string"}, "packet": {"val": "int64"}}, + "operator": { + "class_name": "Join", + "module_path": "orcapod.core.operators.join", + "config": {}, + }, + "cache_mode": "OFF", + } + loaded = ON.from_descriptor( + descriptor=descriptor, + operator=None, + input_streams=(), + databases={"pipeline": db}, + ) + assert loaded._table_scope == "content_hash" + + +# --------------------------------------------------------------------------- +# Cache invalidation — node_identity_path_cache cleared on clear_cache +# --------------------------------------------------------------------------- + + +class TestNodeIdentityPathCacheInvalidation: + def test_function_node_cache_cleared_on_clear_cache(self): + db = InMemoryArrowDatabase() + pod = _make_pod() + src = _make_source([{"x": 1, "y": 2}]) + node = FunctionNode(function_pod=pod, input_stream=src, pipeline_database=db) + _ = node.node_identity_path # populate cache + assert node._node_identity_path_cache is not None + node.clear_cache() + assert node._node_identity_path_cache is None + + def test_operator_node_cache_cleared_on_clear_cache(self): + db = InMemoryArrowDatabase() + src_a, src_b = _make_join_streams([1], "x") + node = OperatorNode( + operator=Join(), + input_streams=(src_a, src_b), + pipeline_database=db, + ) + _ = node.node_identity_path # populate cache + assert node._node_identity_path_cache is not None + node.clear_cache() + assert node._node_identity_path_cache is None diff --git a/tests/test_pipeline/test_node_descriptors.py b/tests/test_pipeline/test_node_descriptors.py index 863f161c..b9809322 100644 --- a/tests/test_pipeline/test_node_descriptors.py +++ b/tests/test_pipeline/test_node_descriptors.py @@ -147,6 +147,7 @@ def _make_function_node_descriptor(self): "content_hash": node.content_hash().to_string(), "pipeline_hash": node.pipeline_hash().to_string(), "data_context_key": node.data_context_key, + "table_scope": node._table_scope, "output_schema": { "tag": {k: str(v) for k, v in tag_schema.items()}, "packet": {k: str(v) for k, v in packet_schema.items()}, @@ -198,6 +199,7 @@ def test_from_descriptor_read_only(self): "content_hash": "fake_hash", "pipeline_hash": "fake_pipeline_hash", "data_context_key": "std:v0.1:default", + "table_scope": "pipeline_hash", "output_schema": { "tag": {"a": "int64"}, "packet": {"b": "int64", "c": "int64"}, @@ -236,6 +238,7 @@ def test_from_descriptor_full_mode(self): "content_hash": node.content_hash().to_string(), "pipeline_hash": node.pipeline_hash().to_string(), "data_context_key": node.data_context_key, + "table_scope": node._table_scope, "output_schema": { "tag": {"a": "int64"}, "packet": {"b": "int64", "c": "int64"}, diff --git a/tests/test_pipeline/test_serialization.py b/tests/test_pipeline/test_serialization.py index c2fa77b3..68c38eb2 100644 --- a/tests/test_pipeline/test_serialization.py +++ b/tests/test_pipeline/test_serialization.py @@ -1634,7 +1634,7 @@ def test_read_only_mode_uncached_operator_is_also_unavailable(self, tmp_path): def test_function_node_pipeline_path_two_level(tmp_path): - """node_identity_path must end with schema:... and instance:... components.""" + """node_identity_path must end with schema:... component (pipeline_hash scope).""" db = InMemoryArrowDatabase() def add_one(y: int) -> int: @@ -1660,14 +1660,13 @@ def add_one(y: int) -> int: fn_node = pipeline._nodes["fn"] path = fn_node.node_identity_path - assert path[-2].startswith("schema:"), f"Expected schema:... got {path[-2]!r}" - assert path[-1].startswith("instance:"), f"Expected instance:... got {path[-1]!r}" - assert fn_node.pipeline_hash().to_string() in path[-2] - assert fn_node.content_hash().to_string() in path[-1] + assert path[-1].startswith("schema:"), f"Expected schema:... got {path[-1]!r}" + assert fn_node.pipeline_hash().to_string() in path[-1] + assert not any(seg.startswith("instance:") for seg in path) def test_operator_node_pipeline_path_two_level(tmp_path): - """OperatorNode node_identity_path must also use two-level schema/instance formula.""" + """OperatorNode node_identity_path must end with schema:... component (pipeline_hash scope).""" db = InMemoryArrowDatabase() table_a = pa.table( @@ -1694,10 +1693,9 @@ def test_operator_node_pipeline_path_two_level(tmp_path): joined_node = pipeline._nodes["joined"] path = joined_node.node_identity_path - assert path[-2].startswith("schema:"), f"Expected schema:... got {path[-2]!r}" - assert path[-1].startswith("instance:"), f"Expected instance:... got {path[-1]!r}" - assert joined_node.pipeline_hash().to_string() in path[-2] - assert joined_node.content_hash().to_string() in path[-1] + assert path[-1].startswith("schema:"), f"Expected schema:... got {path[-1]!r}" + assert joined_node.pipeline_hash().to_string() in path[-1] + assert not any(seg.startswith("instance:") for seg in path) # --------------------------------------------------------------------------- @@ -1850,7 +1848,7 @@ def test_load_operator_node_identity_path_has_schema_instance_components(tmp_pat In the new format, pipeline_path is never stored in node descriptors. The pipeline name prefix lives in the database path, not in node_identity_path. - The node_identity_path must end with schema:... and instance:... components. + With pipeline_hash scope (default), the node_identity_path ends with schema:... only. """ from orcapod.core.operators import SelectPacketColumns @@ -1879,11 +1877,11 @@ def test_load_operator_node_identity_path_has_schema_instance_components(tmp_pat sel_node = loaded.compiled_nodes["sel"] pp = sel_node.node_identity_path - # The node_identity_path must be non-empty with schema:/instance: components. + # The node_identity_path must be non-empty ending with schema:... (pipeline_hash scope). # Pipeline name prefix is now encoded in the database path, not the identity path. - assert len(pp) >= 2, f"Expected non-empty node_identity_path, got {pp!r}" - assert pp[-2].startswith("schema:"), f"Expected schema:... got {pp[-2]!r}" - assert pp[-1].startswith("instance:"), f"Expected instance:... got {pp[-1]!r}" + assert len(pp) >= 1, f"Expected non-empty node_identity_path, got {pp!r}" + assert pp[-1].startswith("schema:"), f"Expected schema:... got {pp[-1]!r}" + assert not any(seg.startswith("instance:") for seg in pp) def test_load_raises_on_missing_result_database_registry_key(tmp_path): @@ -1966,10 +1964,10 @@ def test_load_function_node_identity_path_has_schema_instance_components(tmp_pat loaded = Pipeline.load(str(path), mode="full") fn_node = loaded.compiled_nodes["fn"] pp = fn_node.node_identity_path - # In full mode, node_identity_path is computed from the pod (schema/instance) - assert len(pp) >= 2, f"Expected non-empty node_identity_path, got {pp!r}" - assert pp[-2].startswith("schema:"), f"Expected schema:... got {pp[-2]!r}" - assert pp[-1].startswith("instance:"), f"Expected instance:... got {pp[-1]!r}" + # In full mode, node_identity_path is computed from the pod (schema only, pipeline_hash scope) + assert len(pp) >= 1, f"Expected non-empty node_identity_path, got {pp!r}" + assert pp[-1].startswith("schema:"), f"Expected schema:... got {pp[-1]!r}" + assert not any(seg.startswith("instance:") for seg in pp) # --------------------------------------------------------------------------- diff --git a/tests/test_pipeline/test_serialization_helpers.py b/tests/test_pipeline/test_serialization_helpers.py index 4623cb71..9cfb1afa 100644 --- a/tests/test_pipeline/test_serialization_helpers.py +++ b/tests/test_pipeline/test_serialization_helpers.py @@ -763,6 +763,7 @@ def test_function_node_stored_node_uri_from_descriptor(): "label": "fn", "content_hash": "semantic_v0.1:abc", "pipeline_hash": "semantic_v0.1:def", + "table_scope": "pipeline_hash", "node_uri": ["add_one", "v0", "python.function.v0", "schema_repr"], "output_schema": {"tag": {"x": "int64"}, "packet": {"result": "int64"}}, "data_context_key": "std:v0.1:default", From 7b11582731682f9b64feebfd13e1cdee6f61da41 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 02:54:32 +0000 Subject: [PATCH 2/6] refactor: address PR review comments on table_scope implementation - Use pc.equal() (vectorized Arrow compute) instead of to_pylist() + Python loop in _filter_by_content_hash on both FunctionNode and OperatorNode - Use pa.repeat() instead of [value]*n_rows in OperatorNode._store_output_stream to create the NODE_CONTENT_HASH_COL array without materializing a Python list - Add pc = LazyModule("pyarrow.compute") alongside pa in both files - Validate table_scope value in from_descriptor for both FunctionNode and OperatorNode: raise ValueError on unknown values (not just missing key) - Clarify NODE_CONTENT_HASH_COL comment: column is always written; filtering on it is only applied when table_scope="pipeline_hash" Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/core/nodes/function_node.py | 15 ++++++++++--- src/orcapod/core/nodes/operator_node.py | 28 +++++++++++++++++-------- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index 143db4e4..6d8e6672 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -46,8 +46,10 @@ if TYPE_CHECKING: import polars as pl import pyarrow as pa + import pyarrow.compute as pc else: pa = LazyModule("pyarrow") + pc = LazyModule("pyarrow.compute") pl = LazyModule("polars") @@ -234,8 +236,8 @@ def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table": if col_name not in table.column_names: return table own_hash = self.content_hash().to_string() - mask = [v == own_hash for v in table.column(col_name).to_pylist()] - return table.filter(pa.array(mask)) + mask = pc.equal(table.column(col_name), own_hash) + return table.filter(mask) # ------------------------------------------------------------------ # from_descriptor — reconstruct from a serialized pipeline descriptor @@ -279,7 +281,14 @@ def from_descriptor( f"FunctionNode descriptor is missing required 'table_scope' field: " f"{descriptor.get('label', '')}" ) - table_scope: Literal["pipeline_hash", "content_hash"] = descriptor["table_scope"] + raw_table_scope = descriptor["table_scope"] + if raw_table_scope not in ("pipeline_hash", "content_hash"): + raise ValueError( + f"FunctionNode descriptor has invalid 'table_scope' value " + f"{raw_table_scope!r} for {descriptor.get('label', '')}; " + f"expected one of ('pipeline_hash', 'content_hash')" + ) + table_scope = cast(Literal["pipeline_hash", "content_hash"], raw_table_scope) if function_pod is not None and input_stream is not None: # Full / READ_ONLY / CACHE_ONLY mode: construct normally via __init__. diff --git a/src/orcapod/core/nodes/operator_node.py b/src/orcapod/core/nodes/operator_node.py index 26b022b8..a4f6e718 100644 --- a/src/orcapod/core/nodes/operator_node.py +++ b/src/orcapod/core/nodes/operator_node.py @@ -30,10 +30,12 @@ if TYPE_CHECKING: import pyarrow as pa + import pyarrow.compute as pc from orcapod.protocols.observability_protocols import ExecutionObserverProtocol else: pa = LazyModule("pyarrow") + pc = LazyModule("pyarrow.compute") class OperatorNode(StreamBase): @@ -181,7 +183,14 @@ def from_descriptor( f"OperatorNode descriptor is missing required 'table_scope' field: " f"{descriptor.get('label', '')}" ) - table_scope: Literal["pipeline_hash", "content_hash"] = descriptor["table_scope"] + raw_table_scope = descriptor["table_scope"] + if raw_table_scope not in ("pipeline_hash", "content_hash"): + raise ValueError( + f"OperatorNode descriptor has invalid 'table_scope' value " + f"{raw_table_scope!r} for {descriptor.get('label', '')}; " + "expected one of ('pipeline_hash', 'content_hash')" + ) + table_scope: Literal["pipeline_hash", "content_hash"] = raw_table_scope pipeline_db = databases.get("pipeline") cache_mode_str = descriptor.get("cache_mode", "off") @@ -408,8 +417,8 @@ def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table": if col_name not in table.column_names: return table own_hash = self.content_hash().to_string() - mask = [v == own_hash for v in table.column(col_name).to_pylist()] - return table.filter(pa.array(mask)) + mask = pc.equal(table.column(col_name), own_hash) + return table.filter(mask) @property def node_uri(self) -> tuple[str, ...]: @@ -454,15 +463,16 @@ def _store_output_stream(self, stream: StreamProtocol) -> None: pa.array(record_hashes, type=pa.large_string()), ) - # Add node content hash column for per-run disambiguation when using - # pipeline_hash table scope (multiple runs share the same table). + # Always write the node content hash column so every stored row carries a + # run identity. When table_scope="pipeline_hash" this column is used at + # read time to filter rows belonging to the current run (multiple runs + # share the same table). In content_hash scope each run has its own + # isolated table so the column is present in storage but filtering is + # never applied on reads. n_rows = output_table.num_rows output_table = output_table.append_column( constants.NODE_CONTENT_HASH_COL, - pa.array( - [self.content_hash().to_string()] * n_rows, - type=pa.large_string(), - ), + pa.repeat(self.content_hash().to_string(), n_rows).cast(pa.large_string()), ) # Store (identical rows across runs naturally deduplicate) From 98990006278b4d8f100c647c4e74c991287404b9 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 03:07:41 +0000 Subject: [PATCH 3/6] fix: address record ID collision and hardening in pipeline_hash table scope - Validate table_scope in __init__ for both FunctionNode and OperatorNode; raise ValueError immediately on unknown values rather than silently falling through to the else branch in node_identity_path - FunctionNode.compute_pipeline_entry_id: when table_scope="pipeline_hash", include the node's content_hash in the hashed ID so that two runs processing identical inputs produce distinct entry IDs in the shared table; without this the second run's pipeline record was silently skipped by the duplicate check and _filter_by_content_hash would then hide the first run's row - OperatorNode._store_output_stream: when table_scope="pipeline_hash", mix the node's content_hash into each per-row HASH_COLUMN_NAME by hashing a 1-row table containing both the row hash and the content hash; without this, identical rows from two runs collided on the same record ID and skip_duplicates caused the second run's rows to never be stored - _filter_by_content_hash (both nodes): fail closed with ValueError when table_scope="pipeline_hash" and NODE_CONTENT_HASH_COL is absent, rather than returning an unfiltered table that would expose other runs' data Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/core/nodes/function_node.py | 33 ++++++++++++++++++++--- src/orcapod/core/nodes/operator_node.py | 36 +++++++++++++++++++++---- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index 6d8e6672..fb6c3909 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -145,6 +145,11 @@ def __init__( self._stored_pipeline_path: tuple[str, ...] = () self._stored_result_record_path: tuple[str, ...] = () self._descriptor: dict = {} + if table_scope not in ("pipeline_hash", "content_hash"): + raise ValueError( + f"Unknown table_scope {table_scope!r}. " + "Expected one of: 'pipeline_hash', 'content_hash'." + ) self._table_scope: Literal["pipeline_hash", "content_hash"] = table_scope self._node_identity_path_cache: tuple[str, ...] | None = None @@ -234,7 +239,11 @@ def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table": return table col_name = constants.NODE_CONTENT_HASH_COL if col_name not in table.column_names: - return table + raise ValueError( + f"Cannot isolate records for table_scope='pipeline_hash': " + f"required column {col_name!r} is missing from the stored table. " + "This may indicate records written by an older version of the code." + ) own_hash = self.content_hash().to_string() mask = pc.equal(table.column(col_name), own_hash) return table.filter(mask) @@ -857,20 +866,36 @@ def compute_pipeline_entry_id( ) -> str: """Compute a unique pipeline entry ID from tag + system tags + input packet hash. - This ID uniquely identifies a (tag, system_tags, input_packet) combination - and is used as the record ID in the pipeline database. + In ``table_scope="pipeline_hash"`` mode the node's own ``content_hash`` + is also included so that two runs processing identical inputs each get a + distinct entry ID in the shared table. Without this, the second run's + pipeline record would be silently skipped (duplicate entry_id check) and + ``_filter_by_content_hash`` would subsequently hide the first run's row + from the second run's view. + + In ``table_scope="content_hash"`` mode each run has its own isolated + table, so the entry ID is scoped to (tag, system_tags, input_packet) + alone as before. Args: tag: The tag (including system tags). input_packet: The input packet. Returns: - A hash string uniquely identifying this combination. + A hash string uniquely identifying this combination (and run, when + in pipeline_hash scope). """ tag_with_hash = tag.as_table(columns={"system_tags": True}).append_column( constants.INPUT_PACKET_HASH_COL, pa.array([input_packet.content_hash().to_string()], type=pa.large_string()), ) + if self._table_scope == "pipeline_hash": + # Scope the entry ID to this run so that identical inputs across + # different runs produce distinct pipeline records in the shared table. + tag_with_hash = tag_with_hash.append_column( + constants.NODE_CONTENT_HASH_COL, + pa.array([self.content_hash().to_string()], type=pa.large_string()), + ) return self.data_context.arrow_hasher.hash_table(tag_with_hash).to_string() def add_pipeline_record( diff --git a/src/orcapod/core/nodes/operator_node.py b/src/orcapod/core/nodes/operator_node.py index a4f6e718..b24609c5 100644 --- a/src/orcapod/core/nodes/operator_node.py +++ b/src/orcapod/core/nodes/operator_node.py @@ -111,6 +111,11 @@ def __init__( self._stored_node_uri: tuple[str, ...] = () self._stored_pipeline_path: tuple[str, ...] = () self._descriptor: dict = {} + if table_scope not in ("pipeline_hash", "content_hash"): + raise ValueError( + f"Unknown table_scope {table_scope!r}. " + "Expected one of: 'pipeline_hash', 'content_hash'." + ) self._table_scope: Literal["pipeline_hash", "content_hash"] = table_scope self._node_identity_path_cache: tuple[str, ...] | None = None @@ -415,7 +420,11 @@ def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table": return table col_name = constants.NODE_CONTENT_HASH_COL if col_name not in table.column_names: - return table + raise ValueError( + f"Cannot isolate records for table_scope='pipeline_hash': " + f"required column {col_name!r} is missing from the stored table. " + "This may indicate records written by an older version of the code." + ) own_hash = self.content_hash().to_string() mask = pc.equal(table.column(col_name), own_hash) return table.filter(mask) @@ -448,14 +457,31 @@ def _store_output_stream(self, stream: StreamProtocol) -> None: columns={"source": True, "system_tags": True}, ) - # Per-row record hashes for dedup: hash(tag + packet + system_tag) + # Per-row record hashes for dedup: hash(tag + packet + system_tag). + # In pipeline_hash scope the table is shared across runs, so the content + # hash of this node is mixed into each row hash. Without it, identical + # rows from different runs would share the same record ID and the second + # run's rows would be silently skipped by skip_duplicates=True, leaving + # _filter_by_content_hash with nothing to return for the later run. arrow_hasher = self.data_context.arrow_hasher + content_hash_suffix = ( + self.content_hash().to_string() if self._table_scope == "pipeline_hash" else None + ) record_hashes = [] for batch in output_table.to_batches(): for i in range(len(batch)): - record_hashes.append( - arrow_hasher.hash_table(batch.slice(i, 1)).to_hex() - ) + row_hash = arrow_hasher.hash_table(batch.slice(i, 1)).to_hex() + if content_hash_suffix is not None: + # Combine row hash + node content hash into a single run-scoped ID + # by hashing a tiny 1-row table containing both values. + combined = pa.table( + { + "_row": pa.array([row_hash], type=pa.large_string()), + "_run": pa.array([content_hash_suffix], type=pa.large_string()), + } + ) + row_hash = arrow_hasher.hash_table(combined).to_hex() + record_hashes.append(row_hash) output_table = output_table.add_column( 0, From 3e6d8512bc80a3e8fc02bce0f79e91d533176627 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 03:16:09 +0000 Subject: [PATCH 4/6] refactor: simplify operator row hash by appending NODE_CONTENT_HASH_COL first Instead of conditionally computing a secondary combined hash, append NODE_CONTENT_HASH_COL to the output table before the per-row HASH_COLUMN_NAME loop. The content hash column is then naturally included in each row's hash, making record IDs run-scoped without any special-casing. The column order change has no functional effect on storage or filtering. Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/core/nodes/operator_node.py | 50 +++++++++---------------- 1 file changed, 17 insertions(+), 33 deletions(-) diff --git a/src/orcapod/core/nodes/operator_node.py b/src/orcapod/core/nodes/operator_node.py index b24609c5..3a962089 100644 --- a/src/orcapod/core/nodes/operator_node.py +++ b/src/orcapod/core/nodes/operator_node.py @@ -457,31 +457,27 @@ def _store_output_stream(self, stream: StreamProtocol) -> None: columns={"source": True, "system_tags": True}, ) - # Per-row record hashes for dedup: hash(tag + packet + system_tag). - # In pipeline_hash scope the table is shared across runs, so the content - # hash of this node is mixed into each row hash. Without it, identical - # rows from different runs would share the same record ID and the second - # run's rows would be silently skipped by skip_duplicates=True, leaving - # _filter_by_content_hash with nothing to return for the later run. - arrow_hasher = self.data_context.arrow_hasher - content_hash_suffix = ( - self.content_hash().to_string() if self._table_scope == "pipeline_hash" else None + # Always append the node content hash column first so that it is included + # in the per-row HASH_COLUMN_NAME computation below. This makes record IDs + # run-scoped: identical rows from different runs carry a different + # NODE_CONTENT_HASH_COL value and therefore hash to different record IDs, + # preventing skip_duplicates=True from silently dropping a later run's rows. + # When table_scope="pipeline_hash" this column is also used at read time by + # _filter_by_content_hash to isolate rows belonging to the current run. + # In content_hash scope each run has its own isolated table so the column + # is present in storage but filtering is never applied on reads. + n_rows = output_table.num_rows + output_table = output_table.append_column( + constants.NODE_CONTENT_HASH_COL, + pa.repeat(self.content_hash().to_string(), n_rows).cast(pa.large_string()), ) + + # Per-row record hashes for dedup: hash(tag + packet + system_tags + node_content_hash). + arrow_hasher = self.data_context.arrow_hasher record_hashes = [] for batch in output_table.to_batches(): for i in range(len(batch)): - row_hash = arrow_hasher.hash_table(batch.slice(i, 1)).to_hex() - if content_hash_suffix is not None: - # Combine row hash + node content hash into a single run-scoped ID - # by hashing a tiny 1-row table containing both values. - combined = pa.table( - { - "_row": pa.array([row_hash], type=pa.large_string()), - "_run": pa.array([content_hash_suffix], type=pa.large_string()), - } - ) - row_hash = arrow_hasher.hash_table(combined).to_hex() - record_hashes.append(row_hash) + record_hashes.append(arrow_hasher.hash_table(batch.slice(i, 1)).to_hex()) output_table = output_table.add_column( 0, @@ -489,18 +485,6 @@ def _store_output_stream(self, stream: StreamProtocol) -> None: pa.array(record_hashes, type=pa.large_string()), ) - # Always write the node content hash column so every stored row carries a - # run identity. When table_scope="pipeline_hash" this column is used at - # read time to filter rows belonging to the current run (multiple runs - # share the same table). In content_hash scope each run has its own - # isolated table so the column is present in storage but filtering is - # never applied on reads. - n_rows = output_table.num_rows - output_table = output_table.append_column( - constants.NODE_CONTENT_HASH_COL, - pa.repeat(self.content_hash().to_string(), n_rows).cast(pa.large_string()), - ) - # Store (identical rows across runs naturally deduplicate) self._pipeline_database.add_records( self.node_identity_path, From 2ee2c9096f8a477459102237b0b4ed9ffd99e604 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 04:25:49 +0000 Subject: [PATCH 5/6] fix: update stale dedup comment; vectorize entry_id filter in get_cached_results - OperatorNode._store_output_stream: update comment to accurately describe that record IDs are now run-scoped (NODE_CONTENT_HASH_COL is included in the hash), so rows from different runs with identical output produce distinct record IDs and are both stored; skip_duplicates=True only deduplicates within a single run - FunctionNode.get_cached_results: push the entry_id filter into the Polars chain as .filter(pl.col(PIPELINE_ENTRY_ID_COL).is_in(entry_ids)) before .to_arrow(), eliminating the to_pylist() + Python boolean mask + pa.array() roundtrip; also removes the now-unused entry_id_set variable Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/core/nodes/function_node.py | 14 +++----------- src/orcapod/core/nodes/operator_node.py | 5 ++++- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index fb6c3909..d6b8db00 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -733,7 +733,6 @@ def get_cached_results( self._require_pipeline_database() PIPELINE_ENTRY_ID_COL = "__pipeline_entry_id" - entry_id_set = set(entry_ids) taginfo = self._pipeline_database.get_all_records( self.node_identity_path, @@ -749,24 +748,17 @@ def get_cached_results( taginfo = self._filter_by_content_hash(taginfo) - joined = ( + filtered = ( pl.DataFrame(taginfo) .join( pl.DataFrame(results), on=constants.PACKET_RECORD_ID, how="inner", ) + .filter(pl.col(PIPELINE_ENTRY_ID_COL).is_in(entry_ids)) .to_arrow() ) - if joined.num_rows == 0: - return {} - - # Filter to requested entry IDs - all_entry_ids = joined.column(PIPELINE_ENTRY_ID_COL).to_pylist() - mask = [eid in entry_id_set for eid in all_entry_ids] - filtered = joined.filter(pa.array(mask)) - if filtered.num_rows == 0: return {} @@ -781,7 +773,7 @@ def get_cached_results( data_table = filtered.drop([c for c in drop_cols if c in filtered.column_names]) stream = ArrowTableStream(data_table, tag_columns=tag_keys) - filtered_entry_ids = [eid for eid, m in zip(all_entry_ids, mask) if m] + filtered_entry_ids = filtered.column(PIPELINE_ENTRY_ID_COL).to_pylist() result_dict: dict[str, tuple[TagProtocol, PacketProtocol]] = {} for entry_id, (tag, packet) in zip(filtered_entry_ids, stream.iter_packets()): diff --git a/src/orcapod/core/nodes/operator_node.py b/src/orcapod/core/nodes/operator_node.py index 3a962089..cb64559a 100644 --- a/src/orcapod/core/nodes/operator_node.py +++ b/src/orcapod/core/nodes/operator_node.py @@ -485,7 +485,10 @@ def _store_output_stream(self, stream: StreamProtocol) -> None: pa.array(record_hashes, type=pa.large_string()), ) - # Store (identical rows across runs naturally deduplicate) + # Store — record IDs are run-scoped (NODE_CONTENT_HASH_COL is included in + # the hash), so rows from different runs with identical output will have + # distinct record IDs and both be stored. skip_duplicates=True still + # deduplicates exact re-runs of the same node within a single run. self._pipeline_database.add_records( self.node_identity_path, output_table, From 1066537a09fed27a9a0bea6db90b7d973e357d1d Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 05:11:02 +0000 Subject: [PATCH 6/6] =?UTF-8?q?style:=20address=20eywalker=20review=20comm?= =?UTF-8?q?ents=20=E2=80=94=20cleanup=20and=20consistency=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit function_node.py: - Remove redundant Literal type annotation on self._table_scope assignment - Remove quotes around pa.Table type hints in _filter_by_content_hash signature - Replace hardcoded '_node_content_hash' with NODE_CONTENT_HASH_COL reference in _filter_by_content_hash docstring - Simplify node_identity_path: build base path unconditionally, append instance: segment only when table_scope != "pipeline_hash" - Always include NODE_CONTENT_HASH_COL in compute_pipeline_entry_id regardless of table_scope (remove the conditional); update docstring accordingly operator_node.py: - Remove redundant Literal type annotation on self._table_scope assignment - Remove quotes around pa.Table type hints in _filter_by_content_hash signature - Replace hardcoded docstring value with NODE_CONTENT_HASH_COL reference - Apply same simplified node_identity_path pattern as function_node.py Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/core/nodes/function_node.py | 50 ++++++++++--------------- src/orcapod/core/nodes/operator_node.py | 18 +++------ 2 files changed, 26 insertions(+), 42 deletions(-) diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index d6b8db00..f4bf4b05 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -150,7 +150,7 @@ def __init__( f"Unknown table_scope {table_scope!r}. " "Expected one of: 'pipeline_hash', 'content_hash'." ) - self._table_scope: Literal["pipeline_hash", "content_hash"] = table_scope + self._table_scope = table_scope self._node_identity_path_cache: tuple[str, ...] | None = None if pipeline_database is not None: @@ -227,8 +227,8 @@ def _require_pipeline_database(self) -> None: "or supply one via Pipeline.load(..., pipeline_database=)." ) - def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table": - """Filter *table* to rows whose ``_node_content_hash`` matches this node. + def _filter_by_content_hash(self, table: pa.Table) -> pa.Table: + """Filter *table* to rows whose ``NODE_CONTENT_HASH_COL`` matches this node. Only applied when ``table_scope="pipeline_hash"`` because in that mode multiple runs share the same DB table and must be disambiguated at read @@ -512,13 +512,9 @@ def node_identity_path(self) -> tuple[str, ...]: if self._node_identity_path_cache is not None: return self._node_identity_path_cache pf = self._function_pod - if self._table_scope == "pipeline_hash": - path = pf.uri + (f"schema:{self.pipeline_hash().to_string()}",) - else: - path = pf.uri + ( - f"schema:{self.pipeline_hash().to_string()}", - f"instance:{self.content_hash().to_string()}", - ) + path = pf.uri + (f"schema:{self.pipeline_hash().to_string()}",) + if self._table_scope != "pipeline_hash": + path += (f"instance:{self.content_hash().to_string()}",) self._node_identity_path_cache = path return path @@ -858,36 +854,30 @@ def compute_pipeline_entry_id( ) -> str: """Compute a unique pipeline entry ID from tag + system tags + input packet hash. - In ``table_scope="pipeline_hash"`` mode the node's own ``content_hash`` - is also included so that two runs processing identical inputs each get a - distinct entry ID in the shared table. Without this, the second run's - pipeline record would be silently skipped (duplicate entry_id check) and - ``_filter_by_content_hash`` would subsequently hide the first run's row - from the second run's view. - - In ``table_scope="content_hash"`` mode each run has its own isolated - table, so the entry ID is scoped to (tag, system_tags, input_packet) - alone as before. + ``NODE_CONTENT_HASH_COL`` is always included so that two runs processing + identical inputs each get a distinct entry ID, regardless of table scope. + This prevents the second run's pipeline record from being silently skipped + by the duplicate entry_id check. Args: tag: The tag (including system tags). input_packet: The input packet. Returns: - A hash string uniquely identifying this combination (and run, when - in pipeline_hash scope). + A hash string uniquely identifying this (tag, input_packet, node run) + combination. """ - tag_with_hash = tag.as_table(columns={"system_tags": True}).append_column( - constants.INPUT_PACKET_HASH_COL, - pa.array([input_packet.content_hash().to_string()], type=pa.large_string()), - ) - if self._table_scope == "pipeline_hash": - # Scope the entry ID to this run so that identical inputs across - # different runs produce distinct pipeline records in the shared table. - tag_with_hash = tag_with_hash.append_column( + tag_with_hash = ( + tag.as_table(columns={"system_tags": True}) + .append_column( + constants.INPUT_PACKET_HASH_COL, + pa.array([input_packet.content_hash().to_string()], type=pa.large_string()), + ) + .append_column( constants.NODE_CONTENT_HASH_COL, pa.array([self.content_hash().to_string()], type=pa.large_string()), ) + ) return self.data_context.arrow_hasher.hash_table(tag_with_hash).to_string() def add_pipeline_record( diff --git a/src/orcapod/core/nodes/operator_node.py b/src/orcapod/core/nodes/operator_node.py index cb64559a..e083d442 100644 --- a/src/orcapod/core/nodes/operator_node.py +++ b/src/orcapod/core/nodes/operator_node.py @@ -116,7 +116,7 @@ def __init__( f"Unknown table_scope {table_scope!r}. " "Expected one of: 'pipeline_hash', 'content_hash'." ) - self._table_scope: Literal["pipeline_hash", "content_hash"] = table_scope + self._table_scope = table_scope self._node_identity_path_cache: tuple[str, ...] | None = None if pipeline_database is not None: @@ -397,20 +397,14 @@ def node_identity_path(self) -> tuple[str, ...]: return self._stored_pipeline_path if self._node_identity_path_cache is not None: return self._node_identity_path_cache - if self._table_scope == "pipeline_hash": - path = self._operator.uri + ( - f"schema:{self.pipeline_hash().to_string()}", - ) - else: - path = self._operator.uri + ( - f"schema:{self.pipeline_hash().to_string()}", - f"instance:{self.content_hash().to_string()}", - ) + path = self._operator.uri + (f"schema:{self.pipeline_hash().to_string()}",) + if self._table_scope != "pipeline_hash": + path += (f"instance:{self.content_hash().to_string()}",) self._node_identity_path_cache = path return path - def _filter_by_content_hash(self, table: "pa.Table") -> "pa.Table": - """Filter *table* to rows whose ``_node_content_hash`` matches this node. + def _filter_by_content_hash(self, table: pa.Table) -> pa.Table: + """Filter *table* to rows whose ``NODE_CONTENT_HASH_COL`` matches this node. Only applied when ``table_scope="pipeline_hash"`` because in that mode multiple runs share the same DB table and must be disambiguated at read