Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 49 additions & 51 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,50 @@ def _fail_if_changing_system_annotation(self, *, key: str) -> None:
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
raise errors.ApiValidationError(self._SYSTEM_KEY_RESERVED_MSG)

def _build_pipeline_run(
self,
session: orm.Session,
root_task: structures.TaskSpec,
annotations: Optional[dict[str, Any]],
created_by: str | None,
) -> bts.PipelineRun:
"""Create a single pipeline run within an existing transaction.

Builds the execution tree, inserts the PipelineRun record, flushes to
obtain the server-generated ID, and mirrors system annotations.

The caller is responsible for transaction boundaries (begin/commit).
"""
pipeline_name = root_task.component_ref.spec.name

root_execution_node = _recursively_create_all_executions_and_artifacts_root(
session=session,
root_task_spec=root_task,
)

current_time = _get_current_time()
pipeline_run = bts.PipelineRun(
root_execution=root_execution_node,
created_at=current_time,
updated_at=current_time,
annotations=annotations,
created_by=created_by,
extra_data={
self._PIPELINE_NAME_EXTRA_DATA_KEY: pipeline_name,
},
)
session.add(pipeline_run)
# Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs.
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
session.flush()
_mirror_system_annotations(
session=session,
pipeline_run_id=pipeline_run.id,
created_by=created_by,
pipeline_name=pipeline_name,
)
return pipeline_run

def create(
self,
session: orm.Session,
Expand All @@ -125,36 +169,12 @@ def create(
# TODO: Load and validate all components
# TODO: Fetch missing components and populate component specs

pipeline_name = root_task.component_ref.spec.name

with session.begin():

root_execution_node = _recursively_create_all_executions_and_artifacts_root(
pipeline_run = self._build_pipeline_run(
session=session,
root_task_spec=root_task,
)

# Store into DB.
current_time = _get_current_time()
pipeline_run = bts.PipelineRun(
root_execution=root_execution_node,
created_at=current_time,
updated_at=current_time,
root_task=root_task,
annotations=annotations,
created_by=created_by,
extra_data={
self._PIPELINE_NAME_EXTRA_DATA_KEY: pipeline_name,
},
)
session.add(pipeline_run)
# Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs.
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
session.flush()
_mirror_system_annotations(
session=session,
pipeline_run_id=pipeline_run.id,
created_by=created_by,
pipeline_name=pipeline_name,
)
session.commit()

Expand All @@ -178,33 +198,11 @@ def create_batch(

with session.begin():
for run_request in runs:
pipeline_name = run_request.root_task.component_ref.spec.name

root_execution_node = (
_recursively_create_all_executions_and_artifacts_root(
session=session,
root_task_spec=run_request.root_task,
)
)

current_time = _get_current_time()
pipeline_run = bts.PipelineRun(
root_execution=root_execution_node,
created_at=current_time,
updated_at=current_time,
annotations=run_request.annotations,
created_by=created_by,
extra_data={
self._PIPELINE_NAME_EXTRA_DATA_KEY: pipeline_name,
},
)
session.add(pipeline_run)
session.flush()
_mirror_system_annotations(
pipeline_run = self._build_pipeline_run(
session=session,
pipeline_run_id=pipeline_run.id,
root_task=run_request.root_task,
annotations=run_request.annotations,
created_by=created_by,
pipeline_name=pipeline_name,
)
pipeline_runs.append(pipeline_run)

Expand Down