feat(pipeline): persist DAG snapshot as JSON at run start (PLT-1161)#127
Open
kurodo3[bot] wants to merge 5 commits intodevfrom
Open
feat(pipeline): persist DAG snapshot as JSON at run start (PLT-1161)#127kurodo3[bot] wants to merge 5 commits intodevfrom
kurodo3[bot] wants to merge 5 commits intodevfrom
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…id (PLT-1161) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Pipeline.save()withrun_idandsnapshot_timekeyword-only parameters, populating the previously-null fields in the JSON output (backward compatible — default toNone)Pipeline._write_dag_snapshot(run_id, snapshot_time)which derives the canonical path{db_root}/{pipeline_name}/dag_snapshot.jsonfrom the scoped pipeline database and writes alevel="standard"snapshot; silently skips for non-local (cloud, in-memory) databasesPipeline.run()to generate arun_id(UUID4) andsnapshot_time(ISO UTC) at run start, call_write_dag_snapshot()before any node executes, and threadrun_idthrough to all orchestrator calls soobserver.on_run_start()receives the same ID as the snapshotWhat this enables
Portolan and other log consumers can read
dag_snapshot.jsonfrom a predictable path to reconstruct the exact DAG structure (nodes, edges, types, content hashes) for any run — even if the run crashed mid-execution.Test plan
TestSaveRunIdAndSnapshotTime—save()populates the new fields; null by default (backward compatible)TestWriteDagSnapshot—_write_dag_snapshot()creates the file at the correct path with correct content; returnsNonefor in-memory/cloud databasesTestRunWritesDagSnapshot—run()writes snapshot before node execution;run_idin snapshot matches observer; second run overwrites with newrun_id; full node-type coverage (source, operator, function)All 19 new tests pass. Full test suite: 3136 passed, 56 skipped.
Closes PLT-1161