Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Covers decoupling nodes from pipeline_path via pre-scoped databases, observer protocol migration to contextualize(*identity_path), and scoped database reference serialization. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix DatabaseRegistry description (class already exists; use correct method names) - Correct resolve_database_from_config target signature (2-arg with optional registry) - Add current-vs-target framing note - Clarify _path_prefix retention alongside new _root/_scoped_path fields - Add NoOpObserver default fallback when pipeline_database is not set - Clarify node_identity_path multi-component path and node_name extraction - Show actual db_<hash8> key format in JSON example - Add noop_database.py to files changed table - Fix serialization.py change description (extend vs add) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Default observer created eagerly in compile(), stored as immutable _default_observer - Observer immutable post-compile; run(observer=override) is the escape hatch - contextualize() return type made explicit: ExecutionObserverProtocol - CompositeObserver.contextualize() returns new CompositeObserver of bound children - to_config()/from_config() retained on all three observer classes with OBSERVER_REGISTRY - to_config() format documented for all three observer types - Pipeline.run() resolves: effective_observer = observer or self._default_observer - Data flow summary updated to reflect compile-time observer creation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
14 tasks covering database scoped tracking, observer protocol/implementations, node refactoring, pipeline compile overhaul, orchestrator migration, and ObservabilityReader path update. TDD throughout, frequent commits. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…r, immutability guard Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Each database class (InMemoryArrowDatabase, DeltaTableDatabase, ConnectorArrowDatabase, NoOpArrowDatabase) now stores _root (the original root instance, or None for root instances) and _scoped_path (the full path tuple from root). The at() method computes and passes these to the new instance, preserving the original root across chained calls. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… fix vacuous test Replace the stub `assert scoped is not None` in test_existing_functionality_preserved with a real write+read round-trip that verifies _path_prefix routing and actual storage. Add TestNoOpScopedTracking class covering _root and _scoped_path assertions for NoOpArrowDatabase. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Enables scoped InMemoryArrowDatabase instances to be serialized to a
compact {"type": "scoped", "ref": ..., "path": [...]} format when a
DatabaseRegistry is provided, and reconstructed via
resolve_database_from_config(config, registry=...).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…-path test Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ne_path from hooks - Changed contextualize() signature from (node_hash, node_label) to (*identity_path: str) for flexible variadic identity binding per the ENG-340/ENG-349 decoupling plan. - Removed pipeline_path parameter from on_node_start, on_node_end, and create_packet_logger in both ExecutionObserverProtocol and NoOpObserver. - NoOpObserver.contextualize() returns self (no wrapping needed for no-op). - Added tests/test_observability/test_observer_protocol.py covering protocol shape and NoOpObserver behaviour. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…rver Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…pper
- StatusObserver.__init__ now takes only status_database (no status_path or pipeline_path_prefix)
- contextualize(*identity_path) returns a _ContextualizedStatusObserver bound to identity_path
- Removed _node_context dict; replaced with _tag_schema_per_node (stores node_hash + tag_schema only)
- on_node_start/on_node_end accept **kwargs to absorb legacy pipeline_path kwarg from function_node.py
- All status writes go to a flat DEFAULT_STATUS_PATH ("execution_status",) within self._db
- get_status() returns all rows; filter by _status_node_label column for per-node queries
- Added to_config(db_registry=...) and from_config(config, db_registry=...) for serialization
- _ContextualizedStatusObserver implements full ExecutionObserverProtocol with identity_path-based writes
- Tests updated: removed pipeline_path= from get_status() calls; multi-node tests filter by label column
- Added test_status_observer_to_config_shape and test_status_observer_from_config_round_trip
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove pipeline_path= keyword argument absorption from on_node_start, on_node_end, and create_packet_logger in StatusObserver and _ContextualizedStatusObserver, and remove all call sites in function_node.py that were still passing pipeline_path=. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ify docs - Raise ValueError at construction time when identity_path is empty instead of silently falling back to DEFAULT_STATUS_PATH - Add statefulness docstring to _ContextualizedStatusObserver warning against sharing instances across concurrent node executions - Clarify get_status() docstring to explain read/write relationship with the pre-scoped database - Add test_contextualize_with_empty_path_raises to verify the guard Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…apper Mirror of StatusObserver pattern (Task 5 of ENG-340/ENG-349): - LoggingObserver.__init__ takes only log_database (no log_path/pipeline_path_prefix) - LoggingObserver.contextualize(*identity_path) returns _ContextualizedLoggingObserver - _ContextualizedLoggingObserver raises ValueError for empty identity_path - All logs written to DEFAULT_LOG_PATH (flat table); node identity in row columns - get_logs() reads from DEFAULT_LOG_PATH with no pipeline_path arg - Added to_config/from_config with db_registry support - No pipeline_path or **kwargs on any hook signatures - Tests updated: removed pipeline_path= from get_logs() calls, rewrote multi-node and path-mirroring tests to use flat table + column filtering, added serialization round-trip and empty-identity-path tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- _ContextualizedLoggingObserver.create_packet_logger now passes log_path=self._identity_path to PacketLogger instead of DEFAULT_LOG_PATH, so logs are scoped to the node's identity (node_hash/node_label) within the database. - LoggingObserver.create_packet_logger (root) now raises NotImplementedError instead of silently producing a broken logger with empty node_label/node_hash. - get_logs() updated to scan all storage paths under the db prefix and concatenate every table with _log_-prefixed columns, accommodating the new per-node identity paths. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…+ OBSERVER_REGISTRY - CompositeObserver.contextualize(*identity_path) returns a new CompositeObserver wrapping each child's contextualized version (no more fixed (node_hash, node_label) args) - Remove pipeline_path from all hook signatures; on_run_start uses **kwargs pass-through - Add to_config/from_config for round-trip serialization - Add _build_observer_registry(), OBSERVER_REGISTRY, and resolve_observer_from_config() to serialization.py supporting "status", "logging", "composite" types - Update tests: use scoped databases (db.at(...)) instead of pipeline_path args; add serialization round-trip tests and unknown-type error test Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Databases are now pre-scoped at compile time (ENG-340/ENG-349), so CachedFunctionPod no longer needs a record_path_prefix param — it uses self.uri directly. Also removes the stale prefix logic from FunctionNode.attach_databases and updates the corresponding tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…identity_path, use contextualize - Remove `_pipeline_path_prefix` field and `pipeline_path_prefix` parameter from `FunctionNode.__init__` and `attach_databases()` - Rename `pipeline_path` property to `node_identity_path`; keep `pipeline_path` as a deprecated alias for backward compat - `node_identity_path` returns `pod.uri + (schema_hash_str, instance_hash_str)` with no pipeline prefix - `execute()` and `async_execute()` now call `observer.contextualize(*self.node_identity_path)` first and use the returned `ctx_obs` for all subsequent hooks - `_async_execute_cache_only()` also uses `contextualize` pattern - Simplify `from_descriptor()`: remove all prefix-stripping and hint logic; pass `result_db` directly from databases dict - Update `graph.py` compile path to not pass `pipeline_path_prefix` to `FunctionNode.attach_databases()` - Update all affected test files to use `node_identity_path` and remove prefix tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…identity_path, use contextualize - Remove `_pipeline_path_prefix` field and `pipeline_path_prefix` param from `__init__` and `attach_databases()` - Replace `pipeline_path` property with `node_identity_path` returning `operator.uri + (schema:..., instance:...)` - Update `execute()` and `async_execute()` to use `ctx_obs = obs.contextualize(*self.node_identity_path)` pattern - Simplify `from_descriptor()` to remove prefix-stripping logic - Fix graph.py compile path to drop `pipeline_path_prefix` from `attach_databases()` call and `dbs` dict - Update all operator tests and test_node_execute.py to use `node_identity_path` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Rename `function_database` → `result_database` in `__init__` and `load()` - compile() now creates `_scoped_pipeline_database`, `_result_database_scoped`, `_status_database`, and `_log_database` as scoped views of `pipeline_database` - compile() creates `_default_observer` as `CompositeObserver(StatusObserver, LoggingObserver)` when a pipeline_database is provided, else `NoOpObserver` - run() accepts `observer=None` and uses `effective_observer = observer or _default_observer` - save(level="full") serializes `_default_observer` under `"observer"` key - load() reconstructs `_default_observer` from config when present - Remove `pipeline_path_prefix` construction from load() - Update tests to use `result_database` and new scoped DB assertions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eline_path->node_identity_path in tests - Add RuntimeError guards to result_database, scoped_pipeline_database, status_database, and log_database properties when accessed pre-compile - Fix observer falsy-unsafe pattern: use `observer if observer is not None` instead of `observer or self._default_observer` - Update 5 failing tests to use node_identity_path instead of removed pipeline_path attribute; update assertions to reflect actual values (function/operator name as first path element, not pipeline name) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
SyncPipelineOrchestrator and AsyncPipelineOrchestrator are now stateless: observer is no longer stored on self but accepted as a keyword-only argument in run()/run_async(). A NoOpObserver is used when no observer is provided, removing the is-not-None guards. Pipeline.run() now passes effective_observer to both default and externally-supplied orchestrators via the run() call. All test inline observers updated: contextualize(node_hash, node_label) → contextualize(*identity_path). Integration test files updated to pass observer via pipeline.run(observer=...) instead of orchestrator constructor. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…kwarg in test Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… observer fix - Add pipeline_path property to FunctionNode and OperatorNode using db._path_prefix + node_identity_path - Fix async REPLAY branch in OperatorNode.async_execute() to emit on_node_end before return - Fix StatusObserver.contextualize() to pass run_id/pipeline_uri to contextualized observer - Fix _ContextualizedStatusObserver to write at DEFAULT_STATUS_PATH instead of identity_path - Fix _ContextualizedLoggingObserver to capture node_label/node_hash in on_node_start - Fix DeltaTableDatabase.at() to share pending_batches state and include path prefix in record keys - Fix Pipeline.save() to serialize scoped pipeline database config - Fix Pipeline.load() to derive result_db from pipeline_db when result_db_key is null - Update test observer classes to use new contextualize(*identity_path) signature - Fix test_node_descriptors to use db.at() scoping instead of removed pipeline_path_prefix kwarg - Add smoke tests for ENG-340/349 refactor lifecycle (scoped DBs, full save, default observer) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
00d1771 to
0bac8ca
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors pipeline execution to remove pipeline_path threading from nodes and observers by pre-scoping databases at Pipeline.compile() time, and by changing the observer API to contextualize via a variadic identity_path.
Changes:
- Replace node/observer
pipeline_pathscoping with pre-scoped DB views (db.at(*pipeline_name).at("_result"/"_status"/"_log")) andnode_identity_path. - Move orchestrator observer injection from
__init__torun()and update protocol signatures (contextualize(*identity_path); removepipeline_pathfrom hooks). - Add scoped DB tracking/serialization and observer (de)serialization; update tests/docs accordingly.
Reviewed changes
Copilot reviewed 42 out of 43 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| src/orcapod/pipeline/graph.py | Create scoped DB views + default observer at compile; add observer= to Pipeline.run; serialize observer at save(level="full"). |
| src/orcapod/pipeline/sync_orchestrator.py | Make orchestrator stateless w.r.t. observer; accept observer= in run(). |
| src/orcapod/pipeline/async_orchestrator.py | Same observer statelessness; accept observer= in run() / run_async(). |
| src/orcapod/protocols/observability_protocols.py | Update observer protocol (contextualize(*identity_path); remove pipeline_path from hook signatures). |
| src/orcapod/pipeline/observer.py | Update NoOpObserver to match new protocol signatures. |
| src/orcapod/pipeline/status_observer.py | Rewrite status observer to use contextualized wrappers + flat storage + config round-trip. |
| src/orcapod/pipeline/logging_observer.py | Rewrite logging observer to use contextualized wrappers + flat storage + config round-trip. |
| src/orcapod/pipeline/composite_observer.py | Update composite observer contextualization and add observer config round-trip. |
| src/orcapod/pipeline/serialization.py | Add observer registry + scoped DB resolution support in resolve_database_from_config. |
| src/orcapod/pipeline/observability_reader.py | Update discovery to look for _status / _log path keys. |
| src/orcapod/core/nodes/function_node.py | Rename pipeline_path → node_identity_path; remove prefix plumbing; contextualize observer per node. |
| src/orcapod/core/nodes/operator_node.py | Same identity path + contextualized observer usage; remove prefix plumbing. |
| src/orcapod/core/cached_function_pod.py | Remove record_path_prefix; store results directly under pod.uri. |
| src/orcapod/databases/in_memory_databases.py | Add _root / _scoped_path tracking for at() and scoped to_config(db_registry=...). |
| src/orcapod/databases/delta_lake_databases.py | Add scoped tracking and shared pending state; adjust flush to support shared pending keys. |
| src/orcapod/databases/connector_arrow_database.py | Add scoped tracking for at(). |
| src/orcapod/databases/noop_database.py | Add scoped tracking for at(). |
| tests/test_pipeline/test_sync_orchestrator.py | Update orchestrator usage + observer contextualize signature changes. |
| tests/test_pipeline/test_orchestrator.py | Update async orchestrator observer injection and signatures; add signature assertion test. |
| tests/test_pipeline/test_pipeline.py | Update pipeline DB naming + add tests for scoped DBs and default observer behavior. |
| tests/test_pipeline/test_serialization.py | Rename function_database → result_database assertions and round-trip tests. |
| tests/test_pipeline/test_status_observer_integration.py | Update status observer tests for flat storage + serialization + contextualize guards. |
| tests/test_pipeline/test_logging_observer_integration.py | Update logging observer tests for flat storage + serialization + contextualize guards. |
| tests/test_pipeline/test_composite_observer.py | Update composite observer integration + serialization tests. |
| tests/test_pipeline/test_observability_reader.py | Update expected _status / _log directory layout; add discovery test. |
| tests/test_pipeline/test_node_protocols.py | Update test observers to contextualize(*identity_path). |
| tests/test_pipeline/test_node_descriptors.py | Update descriptor construction to use pre-scoped pipeline DB views. |
| tests/test_pipeline/test_integration_smoke.py | Add smoke tests for scoped DBs + save(level="full") observer serialization + default observer run. |
| tests/test_observability/test_observer_protocol.py | Add protocol compliance tests for NoOpObserver and new hook signatures. |
| tests/test_observability/init.py | Package marker for new observability tests. |
| tests/test_databases/test_scoped_database_tracking.py | New tests for _root / _scoped_path tracking across DBs. |
| tests/test_databases/test_scoped_database_serialization.py | New tests for scoped DB config format + registry-based resolution. |
| tests/test_core/nodes/test_node_execute.py | Update pipeline DB record reads to use node_identity_path. |
| tests/test_core/operators/test_operator_node.py | Update identity-path tests for operator nodes. |
| tests/test_core/operators/test_operator_node_non_active.py | Update DB record reads to use node_identity_path. |
| tests/test_core/operators/test_operator_node_attach_db.py | Update attach-db tests to assert node_identity_path. |
| tests/test_core/function_pod/test_pipeline_hash_integration.py | Update hash/identity-path assertions. |
| tests/test_core/function_pod/test_function_pod_node.py | Update identity-path assertions and docs. |
| tests/test_core/function_pod/test_function_node_caching.py | Update DB record reads to use node_identity_path. |
| tests/test_core/function_pod/test_function_node_attach_db.py | Update attach-db tests to assert node_identity_path. |
| tests/test_core/function_pod/test_cached_function_pod.py | Update cached pod record path expectations + assert removed param behavior. |
| docs/superpowers/specs/2026-04-01-eng340-eng349-design.md | Add/record the design spec for ENG-340/349. |
| docs/superpowers/plans/2026-04-01-eng340-eng349-decouple-pipeline-path.md | Add/record the implementation plan for ENG-340/349. |
Comments suppressed due to low confidence (1)
src/orcapod/pipeline/observability_reader.py:68
- ObservabilityReader assumes the directory immediately after '_status' / '_log' is a node name. However, StatusObserver/LoggingObserver now write to DEFAULT_STATUS_PATH/DEFAULT_LOG_PATH (e.g., 'execution_status'/'execution_logs'), so this discovery will likely surface table names instead of actual node labels. If the new design is flat tables, consider updating discovery (and
nodes) to derive node names from the_status_node_label/_log_node_labelcolumns instead of path segments.
if "_status" in parts:
idx = parts.index("_status")
if idx + 1 < len(parts):
node_name = parts[idx + 1]
self._status_tables.setdefault(node_name, []).append(
table_dir
)
elif "_log" in parts:
idx = parts.index("_log")
if idx + 1 < len(parts):
node_name = parts[idx + 1]
self._log_tables.setdefault(node_name, []).append(
table_dir
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/orcapod/pipeline/graph.py
Outdated
| @@ -252,19 +302,9 @@ def compile(self) -> None: | |||
| rewired_input = persistent_node_map[input_hash] | |||
| node.upstreams = (rewired_input,) | |||
|
|
|||
| # Determine result database and path prefix | |||
| if self._function_database is not None: | |||
| result_db = self._function_database | |||
| result_prefix = None | |||
| else: | |||
| result_db = self._pipeline_database | |||
| result_prefix = self._name + ("_results",) | |||
|
|
|||
| node.attach_databases( | |||
| pipeline_database=self._pipeline_database, | |||
| pipeline_database=pipeline_db, | |||
| result_database=result_db, | |||
| result_path_prefix=result_prefix, | |||
| pipeline_path_prefix=self._pipeline_path_prefix, | |||
| ) | |||
There was a problem hiding this comment.
Pipeline.compile() sets pipeline_db=None when no pipeline_database is provided, but still unconditionally calls node.attach_databases(pipeline_database=pipeline_db, ...). This will raise at runtime for pipelines that should run compute-only without persistence. Guard attach_databases calls (and any DB-dependent wiring) behind if pipeline_db is not None, leaving nodes unattached when no DB is configured.
There was a problem hiding this comment.
Fixed. The attach_databases() call in compile() is now guarded with if pipeline_db is not None, so pipelines without a database run compute-only with no attempt to wire storage.
| @@ -521,15 +452,22 @@ | |||
| return stored | |||
| if self._pipeline_database is None: | |||
| return () | |||
| return ( | |||
| self._pipeline_path_prefix | |||
| + self._packet_function.uri | |||
| + ( | |||
| f"schema:{self.pipeline_hash().to_string()}", | |||
| f"instance:{self.content_hash().to_string()}", | |||
| ) | |||
| pf = self._function_pod | |||
| return pf.uri + ( | |||
| f"schema:{self.pipeline_hash().to_string()}", | |||
| f"instance:{self.content_hash().to_string()}", | |||
| ) | |||
There was a problem hiding this comment.
FunctionNode.node_identity_path currently returns an empty tuple when no pipeline database is attached. Identity paths are needed for observer contextualization and should be computable from the function URI + hashes even in DB-less runs; returning () will cause contextualize() guards to raise and makes DerivedSource IDs less informative. Consider always computing node_identity_path regardless of DB attachment (only pipeline_path should depend on DB prefix).
There was a problem hiding this comment.
Fixed. The DB-attachment guard was removed — node_identity_path now always computes pf.uri + (schema_hash, instance_hash) from the live pod, even when no pipeline database is set. This also fixes DerivedSource IDs for DB-less pipelines.
| def attach_databases( | ||
| self, | ||
| pipeline_database: ArrowDatabaseProtocol, | ||
| result_database: ArrowDatabaseProtocol | None = None, | ||
| result_path_prefix: tuple[str, ...] | None = None, | ||
| pipeline_path_prefix: tuple[str, ...] = (), | ||
| ) -> None: | ||
| """Attach databases for persistent caching and pipeline records. | ||
|
|
||
| Creates a ``CachedFunctionPod`` wrapping the original function pod | ||
| for result caching. The pipeline database is used separately for | ||
| pipeline-level provenance records (tag + packet hash). | ||
|
|
||
| Args: | ||
| pipeline_database: Database for pipeline records. | ||
| result_database: Database for cached results. Defaults to | ||
| pipeline_database. | ||
| result_path_prefix: Path prefix for result records. | ||
| pipeline_path_prefix: Path prefix for pipeline records. | ||
| """ | ||
| computed_result_path_prefix: tuple[str, ...] = () | ||
| if result_database is None: | ||
| result_database = pipeline_database | ||
| computed_result_path_prefix = ( | ||
| result_path_prefix | ||
| if result_path_prefix is not None | ||
| else pipeline_path_prefix + ("_result",) | ||
| result_path_prefix if result_path_prefix is not None else () | ||
| ) | ||
| elif result_path_prefix is not None: | ||
| computed_result_path_prefix = result_path_prefix | ||
|
|
||
| # Always wrap the original function_pod (not a previous cached wrapper) | ||
| # record_path_prefix removed — database is pre-scoped | ||
| self._cached_function_pod = CachedFunctionPod( | ||
| self._function_pod, | ||
| result_database=result_database, | ||
| record_path_prefix=computed_result_path_prefix, | ||
| ) |
There was a problem hiding this comment.
attach_databases() still accepts result_path_prefix and computes computed_result_path_prefix, but the value is unused after removing record_path_prefix scoping. This is a silent no-op parameter now; either remove result_path_prefix from the API or reintroduce explicit scoping behavior (e.g., by scoping the result_database view) so callers aren’t misled.
There was a problem hiding this comment.
Fixed. result_path_prefix was removed from both attach_databases() and __init__(). The default scoping is now pipeline_database.at("_result"), handled at compile time rather than via a prefix parameter.
| def node_identity_path(self) -> tuple[str, ...]: | ||
| """Return the node identity path for DB record scoping. | ||
|
|
||
| Raises: | ||
| RuntimeError: If no database is attached and this is not a | ||
| read-only deserialized node. | ||
| Returns ``()`` when no pipeline database is attached. | ||
| """ | ||
| stored = getattr(self, "_stored_pipeline_path", None) | ||
| if self._operator is None and stored is not None: | ||
| return stored | ||
| if self._pipeline_database is None: | ||
| raise RuntimeError( | ||
| "pipeline_path requires a database. Call attach_databases() first." | ||
| ) | ||
| return () | ||
| return ( | ||
| self._pipeline_path_prefix | ||
| + self._operator.uri | ||
| self._operator.uri | ||
| + ( | ||
| f"schema:{self.pipeline_hash().to_string()}", | ||
| f"instance:{self.content_hash().to_string()}", | ||
| ) | ||
| ) |
There was a problem hiding this comment.
OperatorNode.node_identity_path returns () when no pipeline database is attached. Like FunctionNode, the identity path should be computable without a DB (it’s a computation identity, not a storage prefix). Returning () can break observer contextualization (Composite/Status/Logging contextualize guards) for DB-less pipelines.
There was a problem hiding this comment.
Fixed. Same removal applied to OperatorNode.node_identity_path — the DB-attachment guard is gone and the path is always computable from operator.uri + (schema_hash, instance_hash).
| def get_logs(self) -> pa.Table | None: | ||
| """Read all log rows from the database as a `pyarrow.Table`. | ||
|
|
||
| Instantiate once, outside the pipeline, and pass to the orchestrator:: | ||
| Returns all log rows for this observer's database, regardless of node. | ||
| Node-specific logs can be filtered via the ``_log_node_label`` column. | ||
|
|
||
| obs = LoggingObserver(log_database=InMemoryArrowDatabase()) | ||
| orch = SyncPipelineOrchestrator(observer=obs) | ||
| pipeline.run(orchestrator=orch) | ||
| Scans all storage paths under the database's path prefix and | ||
| concatenates every table that contains at least one ``_log_``-prefixed | ||
| column. This handles the case where logs are written at | ||
| per-node identity paths (``node_hash/node_label``) rather than a | ||
| single flat ``DEFAULT_LOG_PATH``. | ||
|
|
||
| # After the run, read back captured logs: | ||
| logs_table = obs.get_logs() # pyarrow.Table | ||
| Returns ``None`` if no logs have been written yet. | ||
| """ | ||
| import pyarrow as pa | ||
|
|
||
| For async / Ray pipelines use `AsyncPipelineOrchestrator` | ||
| with the same observer:: | ||
| db = self._db | ||
| # Access the underlying storage dicts if available (InMemoryArrowDatabase). | ||
| tables: dict = getattr(db, "_tables", {}) | ||
| pending: dict = getattr(db, "_pending_batches", {}) | ||
| prefix = "/".join(getattr(db, "_path_prefix", ())) | ||
|
|
||
| all_parts: list[pa.Table] = [] | ||
| seen_keys: set[str] = set() | ||
|
|
||
| def _collect(store: dict) -> None: | ||
| for key, tbl in store.items(): | ||
| if key in seen_keys: | ||
| continue | ||
| # Only consider keys that fall under this db's prefix | ||
| if prefix and not key.startswith(prefix): | ||
| continue | ||
| if tbl is None or tbl.num_rows == 0: | ||
| continue | ||
| col_names = tbl.schema.names if hasattr(tbl, "schema") else [] | ||
| if any(c.startswith("_log_") for c in col_names): | ||
| # Drop the internal record-id column before concatenating | ||
| record_id_col = "__record_id" | ||
| if record_id_col in col_names: | ||
| tbl = tbl.drop([record_id_col]) | ||
| all_parts.append(tbl) | ||
| seen_keys.add(key) | ||
|
|
||
| _collect(tables) | ||
| _collect(pending) | ||
|
|
||
| if not all_parts: | ||
| return None | ||
|
|
||
| if len(all_parts) == 1: | ||
| return all_parts[0] | ||
|
|
||
| return pa.concat_tables(all_parts, promote_options="default") | ||
|
|
There was a problem hiding this comment.
LoggingObserver.get_logs() reads logs by introspecting private in-memory fields (_tables/_pending_batches). For non-in-memory databases (e.g., DeltaTableDatabase, ConnectorArrowDatabase) these fields won’t exist, so get_logs() will incorrectly return None even when logs were written. Since logs are written to DEFAULT_LOG_PATH, prefer self._db.get_all_records(DEFAULT_LOG_PATH) (and drop internal __record_id if present) so this works across all ArrowDatabaseProtocol implementations.
There was a problem hiding this comment.
Fixed. get_logs() now uses self._db.list_sources() and iterates over matching paths with self._db.get_all_records(src) — fully protocol-level, no private field access. Falls back to get_all_records(DEFAULT_LOG_PATH) for DBs that don't implement list_sources.
| # Derive the absolute table URI directly from the key (already includes prefix). | ||
| abs_path_components = tuple(record_key.split("/")) | ||
| if not self._is_cloud: | ||
| table_uri = str(self._local_root.joinpath(*abs_path_components)) | ||
| else: | ||
| table_uri = self._root_uri.rstrip("/") + "/" + record_key | ||
|
|
||
| try: | ||
| # Combine all tables in the batch | ||
| combined_table = pending_batch.combine_chunks() | ||
|
|
||
| table_uri = self._get_table_uri(record_path, create_dir=True) | ||
| # Ensure parent directory exists (local only) | ||
| if not self._is_cloud: | ||
| import os | ||
| os.makedirs(table_uri, exist_ok=True) | ||
|
|
There was a problem hiding this comment.
DeltaTableDatabase._flush_by_key() constructs local table paths via Path.joinpath(*abs_path_components) without applying _sanitize_path_component(). This reintroduces invalid filenames on Windows for identity-path components containing ':' (e.g., 'schema:...'). Consider reusing _get_table_uri() logic or sanitizing each component when building table_uri for local paths.
There was a problem hiding this comment.
Fixed. _flush_by_key() now applies _sanitize_path_component() to each component when building the local table_uri, reusing the same sanitization logic that _get_table_uri() uses.
| # invalidate record id cache (uses abs_path_components as the record_path) | ||
| self._invalidate_cache(abs_path_components) |
There was a problem hiding this comment.
_flush_by_key() invalidates the existing-IDs cache via _invalidate_cache(abs_path_components). Because _invalidate_cache calls _get_record_key(record_path) which prepends self._path_prefix, calling flush() on a scoped instance will double-prefix the cache key and leave the real entry marked clean. Consider invalidating by record_key directly (e.g., self._cache_dirty[record_key]=True) or delegating cache invalidation to the root instance.
| # invalidate record id cache (uses abs_path_components as the record_path) | |
| self._invalidate_cache(abs_path_components) | |
| # Invalidate record ID cache for this absolute record_key directly to | |
| # avoid double-prefixing when called from scoped instances. | |
| self._cache_dirty[record_key] = True |
There was a problem hiding this comment.
Fixed. Changed to self._cache_dirty[record_key] = True to invalidate by the already-computed absolute record_key directly, avoiding the double-prefix that _invalidate_cache() would introduce on scoped instances.
| def at(self, *path_components: str) -> "DeltaTableDatabase": | ||
| """Return a new DeltaTableDatabase scoped to the given sub-path. | ||
|
|
||
| The returned instance uses the same underlying filesystem root but | ||
| all reads and writes are relative to the extended prefix. Unlike | ||
| InMemoryArrowDatabase and ConnectorArrowDatabase, DeltaTableDatabase | ||
| does NOT share pending state — the filesystem is the shared storage. | ||
|
|
||
| Raises: | ||
| TypeError: If any component is not a str. | ||
| ValueError: If any component is empty, is ``'.'`` or ``'..'``, or | ||
| contains filesystem-unsafe characters (``/``, ``\\``, ``*``, | ||
| ``?``, ``"``, ``<``, ``>``, ``|``, ``\\0``). | ||
| """ | ||
| _unsafe_chars = ["/", "\\", "*", "?", '"', "<", ">", "|", "\0"] | ||
| for i, component in enumerate(path_components): | ||
| if not isinstance(component, str): | ||
| raise TypeError( | ||
| f"at() path component {i} must be str, got {type(component)!r}" | ||
| ) | ||
| if not component: | ||
| raise ValueError(f"at() path component {i} must not be empty") | ||
| if component in (".", ".."): | ||
| raise ValueError( | ||
| f"at() path component {repr(component)}: '.' and '..' are not allowed" | ||
| ) | ||
| if any(char in component for char in _unsafe_chars): | ||
| raise ValueError( | ||
| f"at() path component {repr(component)} contains invalid characters" | ||
| ) | ||
| new_root = self._root if self._root is not None else self | ||
| new_scoped_path = self._scoped_path + path_components | ||
| # Propagate shared pending state to the scoped instance so that | ||
| # root.flush() is visible to — and can flush — all scoped instances. | ||
| root_instance = new_root | ||
| return DeltaTableDatabase( | ||
| base_path=self._root_uri, | ||
| storage_options=self._storage_options, | ||
| batch_size=self.batch_size, | ||
| max_hierarchy_depth=self.max_hierarchy_depth, | ||
| allow_schema_evolution=self.allow_schema_evolution, | ||
| _path_prefix=self._path_prefix + path_components, | ||
| _root=new_root, | ||
| _scoped_path=new_scoped_path, | ||
| _shared_pending_batches=root_instance._pending_batches, | ||
| _shared_pending_record_ids=root_instance._pending_record_ids, | ||
| ) |
There was a problem hiding this comment.
DeltaTableDatabase.at() docstring says scoped instances do NOT share pending state, but the implementation now explicitly shares _pending_batches/_pending_record_ids with the root. Update the docstring to match the new behavior (or reconsider whether sharing is intended), since it affects correctness expectations around flush behavior.
There was a problem hiding this comment.
Fixed. The at() docstring now correctly states that scoped instances share _pending_batches and _pending_record_ids with the root, making flushed writes visible across all scopes of the same root database.
| tag_data=tag_data, | ||
| Correct usage (mirrors how ``FunctionNode`` calls it):: | ||
|
|
||
| ctx_obs = observer.contextualize(node_hash, node_label) |
There was a problem hiding this comment.
The create_packet_logger() docstring example still uses the old contextualize(node_hash, node_label) signature; the new protocol uses contextualize(*identity_path). Updating the example avoids confusion for API consumers.
| ctx_obs = observer.contextualize(node_hash, node_label) | |
| ctx_obs = observer.contextualize(*identity_path) |
There was a problem hiding this comment.
Already addressed in the current code — create_packet_logger() docstring uses contextualize(*identity_path) in both the module docstring and inline comments. The old contextualize(node_hash, node_label) signature no longer appears anywhere.
| @@ -10,6 +11,22 @@ | |||
| from orcapod.pipeline.observability_reader import ObservabilityReader | |||
|
|
|||
|
|
|||
| def _make_fake_delta_table(path: Path) -> None: | |||
| """Create a minimal _delta_log directory to simulate a Delta table.""" | |||
| (path / "_delta_log").mkdir(parents=True, exist_ok=True) | |||
|
|
|||
|
|
|||
| def test_discover_tables_finds_status_and_log_paths(tmp_path): | |||
| from orcapod.pipeline.observability_reader import ObservabilityReader | |||
|
|
|||
There was a problem hiding this comment.
tempfile is imported but never used in this test module (and ObservabilityReader is imported twice: module-level and inside the test). Removing unused/redundant imports will keep lint/static analysis clean.
There was a problem hiding this comment.
Fixed. Removed the unused import tempfile and the redundant in-function from orcapod.pipeline.observability_reader import ObservabilityReader import (the module-level import is sufficient).
| self.attach_databases( | ||
| pipeline_database=pipeline_database, | ||
| result_database=result_database, | ||
| result_path_prefix=result_path_prefix, |
There was a problem hiding this comment.
now that we established that we'll make use of "at" logic for contextualizing database (if not explicitly provided), do we still need result_path_prefxi? Is it to offer a choice other than the default _result that'd be used if no result_database is provided? If so, this is not obvious from the call signature.
There was a problem hiding this comment.
Fixed — has been removed entirely from both and . Since we now pre-scope the result database at compile time via , the prefix parameter serves no purpose and its removal eliminates the confusion.
| result_path_prefix | ||
| if result_path_prefix is not None | ||
| else pipeline_path_prefix + ("_result",) | ||
| result_path_prefix if result_path_prefix is not None else () |
There was a problem hiding this comment.
I don't think this is correct -- this would lead to results being dumped directly into pipeline database at the same level as the pipeline results. If result database is to be based on the pipeline database, we must provide subpath (e.g. _result) as was done before. Also, this should have been caught by the tests -- check and explain why this was not something tests had caught and add tests to catch this scenario.
There was a problem hiding this comment.
Fixed. The default in attach_databases() is now result_database = pipeline_database.at("_result"), so results land in a dedicated sub-path rather than at the same level as pipeline records. On why tests didn't catch it: the relevant tests read back via get_all_records(node.node_identity_path) using the node's result DB directly, so they passed regardless of whether the result DB was a sub-scope or not. A dedicated test has been added in test_function_node_attach_db.py to assert that the default result DB is a scoped view of the pipeline DB.
| def node_identity_path(self) -> tuple[str, ...]: | ||
| """Return the node identity path for DB record scoping. | ||
|
|
||
| Returns ``()`` when no pipeline database is attached. |
There was a problem hiding this comment.
it makes no sense for an empty path to be returned when no database is set -- node identity path in itself is NOT persistent layer dependent whatsoever.
There was a problem hiding this comment.
Fixed. node_identity_path now always computes from pod.uri + (schema_hash, instance_hash) regardless of whether a pipeline database is attached. The DB-attachment guard was removed entirely — the identity path is a pure function of the computation graph, not of the storage layer.
| ) | ||
|
|
||
| @property | ||
| def pipeline_path(self) -> tuple[str, ...]: |
There was a problem hiding this comment.
who uses this? The name of pipeline_path is obviously confusing against the old definition and I also just don't see why such a property reaching into its pipeline_database's prefix properties (which is completely private) should even exist?
There was a problem hiding this comment.
Agreed — the property was encapsulation-violating and the name was confusing against the old definition. pipeline_path has been removed entirely from FunctionNode. The pipeline name prefix now lives in the database's own path (via db.at(*pipeline_name) at compile time), so no node property needs to reach into _path_prefix.
| ) | ||
| ) | ||
|
|
||
| @property |
There was a problem hiding this comment.
Again what's the point of creating this confusing and in principle encapsulation violating method?
There was a problem hiding this comment.
Same removal applied to OperatorNode. The pipeline_path property is gone from both node types.
| def __init__( | ||
| self, | ||
| _path_prefix: tuple[str, ...] = (), | ||
| _root: "NoOpArrowDatabase | None" = None, |
There was a problem hiding this comment.
remove quotes around type hint
There was a problem hiding this comment.
Fixed. Added from __future__ import annotations to noop_database.py and removed the quoted 'NoOpArrowDatabase' self-reference.
src/orcapod/pipeline/graph.py
Outdated
| self._scoped_pipeline_database: dbp.ArrowDatabaseProtocol | None = None | ||
| self._status_database: dbp.ArrowDatabaseProtocol | None = None | ||
| self._log_database: dbp.ArrowDatabaseProtocol | None = None | ||
| self._default_observer: Any = None |
There was a problem hiding this comment.
Do not make this Any -- be more type specific
There was a problem hiding this comment.
Fixed. _default_observer is now typed as ExecutionObserverProtocol | None with a TYPE_CHECKING import to avoid the circular import.
src/orcapod/pipeline/graph.py
Outdated
| config: PipelineConfig | None = None, | ||
| execution_engine: cp.PacketFunctionExecutorProtocol | None = None, | ||
| execution_engine_opts: "dict[str, Any] | None" = None, | ||
| observer=None, |
There was a problem hiding this comment.
provider stronger typing based on observer protocol
There was a problem hiding this comment.
Fixed. The observer parameter in both Pipeline.run() and the orchestrator call sites is typed as ExecutionObserverProtocol | None = None.
There was a problem hiding this comment.
attach database should behave similarly to how things would work on at the init
There was a problem hiding this comment.
Fixed. compile() now guards the attach_databases() call behind if pipeline_db is not None, matching the behaviour you'd expect from __init__ — nodes are left unattached when no database is configured.
| row = pa.table(columns) | ||
| try: | ||
| self._db.add_record(DEFAULT_STATUS_PATH, status_id, row, flush=True) | ||
| except Exception: |
There was a problem hiding this comment.
not entirely comfortable absorbing all exceptions here -- should at least include TODO to re-evaluate the strategy
There was a problem hiding this comment.
Done — added a # TODO: re-evaluate broad exception absorption comment to _write_event() noting that ideally the catch should be narrowed to DB-specific exceptions (IOError, ArrowInvalid) once the full set of failure modes is known.
- LoggingObserver.get_logs(): replace private InMemoryArrowDatabase field introspection (_tables/_pending_batches) with public get_all_records() so it works across all ArrowDatabaseProtocol implementations - StatusObserver/LoggingObserver.to_config(): guard db_registry kwarg with try/except TypeError for databases whose to_config() doesn't accept it yet - FunctionNode/OperatorNode.node_identity_path: compute identity path from pod/operator URI + hashes regardless of DB attachment; only return () for truly unavailable (load-only) nodes with no live pod/operator - FunctionNode.__init__ + attach_databases(): remove dead result_path_prefix parameter (databases are pre-scoped; no per-node path prefix needed) - Pipeline.compile(): guard attach_databases() calls behind `if pipeline_db is not None` so DB-less pipelines don't raise TypeError - DeltaTableDatabase._flush_by_key(): apply _sanitize_path_component() when building local table_uri to handle ':' in identity-path components on Windows; fix cache invalidation to use record_key directly instead of _invalidate_cache(abs_path_components) which would double-prefix via _get_record_key() - DeltaTableDatabase.at() docstring: fix incorrect claim that scoped instances do NOT share pending state; they do (shared _pending_batches/_pending_record_ids) - LoggingObserver.create_packet_logger() docstring: update example to use contextualize(*identity_path) not the old positional signature - test_observability_reader.py: remove unused `import tempfile` and redundant module-level re-import of ObservabilityReader inside test function Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
brian-arnold
left a comment
There was a problem hiding this comment.
Code Review
Critical
1. LoggingObserver.get_logs() breaks database encapsulation — InMemoryArrowDatabase-only
The new get_logs() accesses db._tables and db._pending_batches directly via getattr instead of going through the database protocol (get_all_records). This will silently return empty results for DeltaTableDatabase or any other non-in-memory implementation. The old implementation used the protocol-level API. This is a regression.
2. Spec/implementation mismatch on identity_path usage in StatusObserver
Both StatusObserver._write_event and _ContextualizedStatusObserver._write_event write to DEFAULT_STATUS_PATH (a fixed constant), never using self._identity_path for DB path scoping. The node identity is only stamped as column values. The spec says writes should go to identity_path-scoped locations, but the implementation uses a flat table. This isn't necessarily wrong (column-based filtering works), but it means contextualize() doesn't actually scope the storage — only the metadata. This should be an intentional, documented decision.
3. Dead result_path_prefix parameter in FunctionNode.attach_databases()
The parameter is still accepted and computed_result_path_prefix is computed, but it's never passed anywhere since CachedFunctionPod no longer accepts record_path_prefix. Callers setting it get silent no-ops. Should be removed.
Moderate
4. LoggingObserver.create_packet_logger raises NotImplementedError on root observer
Unlike StatusObserver which returns a no-op logger from the root, LoggingObserver raises if create_packet_logger is called before contextualize(). This asymmetry is a runtime landmine if the observer is used standalone.
5. DeltaTableDatabase._flush_by_key uses bare except Exception
The except Exception: delta_table = None catch during table existence checks is overly broad — it treats network errors, permission errors, etc. as "table doesn't exist." This could mask real failures in production.
6. NoOpArrowDatabase.to_config() signature not updated for db_registry
Other DB classes were updated to accept db_registry=None, but NoOpArrowDatabase (and possibly ConnectorArrowDatabase/DeltaTableDatabase) weren't. Scoped NoOp instances won't serialize correctly through the observer pipeline.
Minor
7. pipeline_path property on nodes accesses private _path_prefix
Both FunctionNode and OperatorNode reconstruct absolute paths via getattr(self._pipeline_database, "_path_prefix", ()) — coupling to an implementation detail.
8. OBSERVER_REGISTRY triggers eager imports
Unlike the LazyModule pattern used elsewhere, serialization.py eagerly imports all observer classes at module load. Potential circular import risk.
9. Inconsistent empty identity_path validation
NoOpObserver.contextualize() accepts empty args, but all real observers raise ValueError. Code that works with NoOpObserver could break when swapped to a real observer.
Overall: The architectural direction is sound — pre-scoping databases eliminates a major source of parameter threading and makes the node/observer APIs cleaner. The critical items (especially #1) should be addressed before merge. The dead result_path_prefix (#3) is easy cleanup. The spec/implementation mismatch (#2) should at minimum be documented as an intentional design choice.
The pipeline_path property was removed from FunctionNode and OperatorNode as part of the path-scoped observer contextualization refactor (ENG-340/349). Pipeline name prefix now lives in the pre-scoped database path, not in node identity properties. Update all test references to use node_identity_path instead, and repurpose the two "pipeline_path starts with pipeline name" tests to verify the new schema:/instance: component structure of node_identity_path. All 3046 tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Update: All review comments addressedThanks for the thorough reviews (@eywalker, @brian-arnold, @copilot-pull-request-reviewer). All issues have been addressed across the subsequent commits. Here is a full accounting: Critical / Major items
Spec/implementation mismatch — observer path scoping — Fixed. Dead
Typing / style itemsType annotations in String quotes around type hints — Fixed across
Copilot items
Unused Final commit ( |
The test-objective directory has its own test suite (run separately in CI) that still referenced the removed pipeline_path property on FunctionNode. Update test_pipeline_path_uses_pipeline_hash to use node_identity_path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 44 out of 45 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Create scoped databases and default observer eagerly at compile time | ||
| if self._pipeline_database is not None: | ||
| pipeline_db = self._pipeline_database.at(*self._name) | ||
| result_db = self._result_database if self._result_database is not None else pipeline_db.at("_result") | ||
| status_db = pipeline_db.at("_status") | ||
| log_db = pipeline_db.at("_log") |
There was a problem hiding this comment.
In compile(), an explicitly provided result_database is used unscoped (result_db = self._result_database ...). Since CachedFunctionPod now stores results under pod.uri with no prefix, sharing a result_database across multiple pipelines can cause cross-pipeline cache collisions (same function URI → same record_path). Consider scoping the provided result_database to the pipeline namespace (e.g. result_database.at(*self._name).at("_result")), or enforcing/validating that callers pass an already-scoped DB view.
There was a problem hiding this comment.
No change here — intentional design. When a caller explicitly provides a result_database to compile(), it is used as-is; the caller is responsible for scoping it appropriately (e.g. db.at(pipeline_name, "_result")). Silently re-scoping an explicitly provided database would be surprising and could break callers who have already pre-scoped it. The default path (when no result_database is given) already scopes correctly to pipeline_db.at("_result"). The Pipeline docstring has been updated to document this expectation.
| else: | ||
| from orcapod.pipeline.observer import NoOpObserver | ||
| pipeline._default_observer = NoOpObserver() | ||
|
|
There was a problem hiding this comment.
Pipeline.load() sets pipeline._compiled = True but does not initialize the scoped database view fields (_scoped_pipeline_database, _result_database_scoped, _status_database, _log_database). As a result, properties like result_database, status_database, and log_database will return None on loaded pipelines even when databases are available. Load should populate these fields consistently (e.g. pipeline_db, pipeline_db.at("_result"), pipeline_db.at("_status"), pipeline_db.at("_log")) so the public properties work after deserialization.
| # Ensure scoped database views are restored on deserialized pipelines | |
| if "databases" in data: | |
| # `pipeline_db` is the pipeline-level database view constructed earlier | |
| pipeline._scoped_pipeline_database = pipeline_db | |
| if pipeline_db is not None: | |
| pipeline._result_database_scoped = pipeline_db.at("_result") | |
| pipeline._status_database = pipeline_db.at("_status") | |
| pipeline._log_database = pipeline_db.at("_log") |
There was a problem hiding this comment.
Fixed. Pipeline.load() now populates _scoped_pipeline_database, _result_database_scoped, _status_database, and _log_database from the resolved pipeline_db before setting _compiled = True — mirroring exactly what compile() does at runtime. The result_database, status_database, and log_database properties now return meaningful values on loaded pipelines.
| def get_status(self) -> "pa.Table | None": | ||
| """Return all status records aggregated across all node sub-paths. | ||
|
|
||
| Args: | ||
| pipeline_path: If provided, reads status for a specific node | ||
| (mirrored path). If ``None``, reads from the default | ||
| status path. | ||
| Reads from all node-scoped sub-paths within this observer's database, | ||
| concatenating results. Falls back to a direct read for DBs that don't | ||
| support ``list_sources``. | ||
|
|
||
| Returns: | ||
| ``None`` if no status events have been written yet. | ||
| """ | ||
| if pipeline_path is not None: | ||
| status_path = pipeline_path[:1] + ("status",) + pipeline_path[1:] | ||
| else: | ||
| status_path = self._status_path | ||
| return self._db.get_all_records(status_path) | ||
| import pyarrow as pa | ||
|
|
||
| try: | ||
| sources = self._db.list_sources() | ||
| except (NotImplementedError, Exception): | ||
| # Fall back to direct read for DBs that don't support list_sources | ||
| return self._db.get_all_records(DEFAULT_STATUS_PATH) | ||
|
|
||
| parts = [] | ||
| for src in sources: | ||
| if src[-len(DEFAULT_STATUS_PATH):] == DEFAULT_STATUS_PATH: | ||
| tbl = self._db.get_all_records(src) | ||
| if tbl is not None and tbl.num_rows > 0: | ||
| if "__record_id" in tbl.schema.names: | ||
| tbl = tbl.drop(["__record_id"]) | ||
| parts.append(tbl) | ||
| if not parts: | ||
| return None | ||
| if len(parts) == 1: | ||
| return parts[0] | ||
| return pa.concat_tables(parts, promote_options="default") |
There was a problem hiding this comment.
get_status() concatenates status tables across node identity subpaths but drops all node identity columns, so the returned combined Table cannot be attributed back to a node (especially since _status_node_label/_status_node_hash were removed). Consider adding an explicit node identity column while aggregating (e.g. derive identity_prefix = src[:-len(DEFAULT_STATUS_PATH)] and add it as a column before concat) or returning a mapping keyed by identity_path so consumers can filter per-node.
There was a problem hiding this comment.
No code change — by design. StatusObserver.get_status() is a convenience aggregator for quick inspection of all status events across a run; it is intentionally flat. Per-node breakdown is the responsibility of ObservabilityReader, which discovers status tables by path and derives node identity from the path structure. Adding a _node_identity column to the flat aggregate would force us to define a serialization format for the path, which ObservabilityReader already handles properly. The get_status() docstring has been updated to make this distinction explicit.
|
|
||
| try: | ||
| sources = self._db.list_sources() | ||
| except (NotImplementedError, Exception): |
There was a problem hiding this comment.
The exception handling in get_status() catches all Exceptions from list_sources() and silently falls back to a single-table read. This can mask real bugs (e.g. unexpected runtime errors in a DB implementation) and make failures harder to diagnose. Consider narrowing this to AttributeError/NotImplementedError (and possibly known DB capability errors), and letting other exceptions propagate or at least logging them before falling back.
| except (NotImplementedError, Exception): | |
| except (AttributeError, NotImplementedError): |
There was a problem hiding this comment.
Fixed. Narrowed except (NotImplementedError, Exception) to except (AttributeError, NotImplementedError) in both get_status() and get_logs(). The original clause with Exception made the NotImplementedError narrowing meaningless and could silently swallow real runtime bugs.
| def get_logs(self) -> "pa.Table | None": | ||
| """Read all log rows aggregated across all node sub-paths. | ||
|
|
||
| Reads from all node-scoped sub-paths within this observer's database, | ||
| concatenating results. Falls back to a direct read for DBs that don't | ||
| support ``list_sources``. | ||
|
|
||
| class LoggingObserver: | ||
| """Concrete observer that writes packet execution logs to a database. | ||
| Returns ``None`` if no logs have been written yet. | ||
| """ | ||
| import pyarrow as pa | ||
|
|
||
| Instantiate once, outside the pipeline, and pass to the orchestrator:: | ||
| try: | ||
| sources = self._db.list_sources() | ||
| except (NotImplementedError, Exception): | ||
| return self._db.get_all_records(DEFAULT_LOG_PATH) | ||
|
|
||
| parts = [] | ||
| for src in sources: | ||
| if src[-len(DEFAULT_LOG_PATH):] == DEFAULT_LOG_PATH: | ||
| tbl = self._db.get_all_records(src) | ||
| if tbl is not None and tbl.num_rows > 0: | ||
| if "__record_id" in tbl.schema.names: | ||
| tbl = tbl.drop(["__record_id"]) | ||
| parts.append(tbl) | ||
| if not parts: | ||
| return None | ||
| if len(parts) == 1: | ||
| return parts[0] | ||
| return pa.concat_tables(parts, promote_options="default") |
There was a problem hiding this comment.
get_logs() concatenates log tables across node identity subpaths but drops node identity columns, so the returned combined Table cannot be filtered per-node (and _log_node_label/_log_node_hash were removed). Consider adding a derived node identity column while aggregating (similar to ObservabilityReader adding node_label from the directory), or returning per-node/per-identity tables to avoid losing attribution.
There was a problem hiding this comment.
No code change — by design, mirroring the same decision made for StatusObserver.get_status(). LoggingObserver.get_logs() is a flat convenience aggregator for quick inspection of all log rows across a run. Per-node breakdown is the responsibility of ObservabilityReader, which discovers log tables by path and derives node identity from the path structure. Adding a _node_identity column here would duplicate logic that ObservabilityReader already handles correctly. The get_logs() docstring has been updated to make this explicit.
| except (NotImplementedError, Exception): | ||
| return self._db.get_all_records(DEFAULT_LOG_PATH) |
There was a problem hiding this comment.
The exception handling in get_logs() catches all Exceptions from list_sources() and falls back to reading DEFAULT_LOG_PATH. Catching broad Exception here can hide real runtime errors in the DB implementation. Consider narrowing to capability-related exceptions (e.g. AttributeError/NotImplementedError) and logging unexpected exceptions rather than silently switching behavior.
| except (NotImplementedError, Exception): | |
| return self._db.get_all_records(DEFAULT_LOG_PATH) | |
| except (AttributeError, NotImplementedError): | |
| # Databases that do not support listing sources fall back to a direct read. | |
| return self._db.get_all_records(DEFAULT_LOG_PATH) | |
| except Exception: | |
| # Unexpected errors from the database should not be silently hidden. | |
| logger.exception("Unexpected error while listing log sources in LoggingObserver.get_logs") | |
| raise |
There was a problem hiding this comment.
Fixed. Narrowed except (NotImplementedError, Exception) to except (AttributeError, NotImplementedError) in get_logs(). The original clause with bare Exception made the NotImplementedError narrowing meaningless and could silently swallow real runtime bugs in DB implementations.
| # --------------------------------------------------------------------------- | ||
| # 9. get_logs(pipeline_path) retrieves node-specific logs | ||
| # 9. get_logs() returns all logs; filter by _log_node_label for node-specific | ||
| # --------------------------------------------------------------------------- |
There was a problem hiding this comment.
This section header mentions filtering by _log_node_label, but the refactor removes _log_node_label/_log_node_hash columns and LoggingObserver.get_logs() currently returns a combined table without any node-identity column. The comment (and possibly the test name) should be updated to reflect the new way to identify per-node logs (e.g. include a derived node_label column during aggregation, or clarify that per-node filtering isn't supported by get_logs()).
There was a problem hiding this comment.
Fixed. The section header has been updated to 'get_logs() returns all logs aggregated across all node sub-paths', and the test method test_get_logs_filters_by_node_label_column has been renamed to test_get_logs_returns_rows_for_all_nodes to reflect that node identity is path-based, not column-based.
| # --------------------------------------------------------------------------- | ||
| # 9. get_status(pipeline_path) retrieves node-specific status | ||
| # 9. get_status() returns combined status, filter by node_label column | ||
| # --------------------------------------------------------------------------- |
There was a problem hiding this comment.
This comment says get_status() results can be filtered by a node_label column, but _status_node_label/_status_node_hash were removed and StatusObserver.get_status() currently returns an aggregated table without any node-identity column. Update the comment to match the actual API (or consider adding a node identity column during aggregation if per-node filtering is intended).
There was a problem hiding this comment.
Fixed. The section header has been updated to 'get_status() returns combined status across all node sub-paths'. The _status_node_label / _status_node_hash columns no longer exist; node identity is encoded in the database path, not in column values.
- Pipeline.load(): populate _scoped_pipeline_database, _result_database_scoped, _status_database, _log_database so that the result_database/status_database/ log_database properties return meaningful values on loaded pipelines (mirrors what compile() does at runtime). - get_status() / get_logs(): narrow broad `except (NotImplementedError, Exception)` to `except (AttributeError, NotImplementedError)` — Exception swallowed the narrowing of the first clause and could mask real runtime errors. - Update stale test section header comments and rename test_get_logs_filters_by_node_label_column → test_get_logs_returns_rows_for_all_nodes (node filtering via column is no longer the mechanism; path-based scoping is). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| Returns ``()`` only when the node is in a load-only state with no | ||
| live function pod (deserialized descriptor without a real pod). | ||
| """ | ||
| stored = getattr(self, "_stored_pipeline_path", None) |
There was a problem hiding this comment.
generally utilizing possibly non-existing property is not an ideal design. Can we not make this a field always present for this concrete implementation?
There was a problem hiding this comment.
Fixed. _stored_pipeline_path: tuple[str, ...] = () is now initialized in __init__ on both FunctionNode and OperatorNode, making it a proper always-present instance field. node_identity_path accesses it directly as self._stored_pipeline_path — no getattr needed. from_descriptor() continues to overwrite it with the deserialized value when one is present in the descriptor.
There was a problem hiding this comment.
Extended the fix to cover all descriptor-derived fields, not just _stored_pipeline_path. Both FunctionNode and OperatorNode now initialize every field that was previously set only by from_descriptor() — _load_status, _stored_content_hash, _stored_pipeline_hash, _stored_schema, _stored_node_uri, _stored_pipeline_path, _stored_result_record_path (FunctionNode only), and _descriptor — directly in __init__ with appropriate defaults. All getattr(self, "_stored_*", ...) fallback reads are replaced with direct attribute access.
Replace getattr(self, "_stored_pipeline_path", None) guard in node_identity_path with a direct field access. Both FunctionNode and OperatorNode now initialize _stored_pipeline_path = () in __init__, making it always present on the concrete class. from_descriptor() continues to overwrite it with the deserialized value when present. Also remove the now-stale "pipeline_path" reference from the node_identity_path docstring on both nodes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Round 3 update (commit 28ab132)Single fix this round:
Previously self._stored_pipeline_path: tuple[str, ...] = ()
|
…ratorNode Extend the previous _stored_pipeline_path fix to all descriptor-derived fields. Both FunctionNode and OperatorNode now initialize every field that was previously set only by from_descriptor() — _load_status, _stored_content_hash, _stored_pipeline_hash, _stored_schema, _stored_node_uri, _stored_pipeline_path, _stored_result_record_path (FunctionNode), and _descriptor — directly in __init__ with appropriate defaults. All getattr(self, "_stored_*", ...) fallback reads are replaced with direct attribute access, making the concrete class design clean and predictable. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Round 4 update (commit fe26874)Scope expanded from previous round: The Both
All |
Summary
pipeline_pathfromFunctionNode/OperatorNodeentirely — the pipeline now pre-scopes each database viadb.at(*pipeline_name)atcompile()time and passes scoped views to nodescontextualize(*identity_path)toExecutionObserverProtocol;StatusObserver/LoggingObserver/CompositeObserverare rewritten to receive a pre-scoped DB at construction and produce per-node wrappers viacontextualize()— eliminatingpipeline_pathfrom all observer hooksSyncOrchestrator,AsyncOrchestrator) are now fully stateless w.r.t. observer — observer is passed only torun(), not stored at__init__to_config/from_configround-trip for all observers;Pipeline.save(level='full')now serializes the default observerObservabilityReader._discover_tables()updated to use_status/_logpath keysCachedFunctionPod: removedrecord_path_prefixparameterTest plan
pytest tests/ -q --tb=short— 3020 passing, 23 pre-existing optional-dep failures (graphviz, ray)pytest tests/test_pipeline/test_integration_smoke.py -v— all 3 smoke tests passpytest tests/test_databases/— scoped DB tracking + serialization tests passpytest tests/test_observability/— observer protocol tests passpipeline_path=keyword arguments remain insrc/save(level='full')round-trip includes observer config in JSON outputLinear
🤖 Generated with Claude Code