diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 861f1c9..2851777 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -111,6 +111,50 @@ def _fail_if_changing_system_annotation(self, *, key: str) -> None: if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX): raise errors.ApiValidationError(self._SYSTEM_KEY_RESERVED_MSG) + def _build_pipeline_run( + self, + session: orm.Session, + root_task: structures.TaskSpec, + annotations: Optional[dict[str, Any]], + created_by: str | None, + ) -> bts.PipelineRun: + """Create a single pipeline run within an existing transaction. + + Builds the execution tree, inserts the PipelineRun record, flushes to + obtain the server-generated ID, and mirrors system annotations. + + The caller is responsible for transaction boundaries (begin/commit). + """ + pipeline_name = root_task.component_ref.spec.name + + root_execution_node = _recursively_create_all_executions_and_artifacts_root( + session=session, + root_task_spec=root_task, + ) + + current_time = _get_current_time() + pipeline_run = bts.PipelineRun( + root_execution=root_execution_node, + created_at=current_time, + updated_at=current_time, + annotations=annotations, + created_by=created_by, + extra_data={ + self._PIPELINE_NAME_EXTRA_DATA_KEY: pipeline_name, + }, + ) + session.add(pipeline_run) + # Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs. + # TODO: Use ORM relationship instead of explicit flush + manual FK assignment. + session.flush() + _mirror_system_annotations( + session=session, + pipeline_run_id=pipeline_run.id, + created_by=created_by, + pipeline_name=pipeline_name, + ) + return pipeline_run + def create( self, session: orm.Session, @@ -125,36 +169,12 @@ def create( # TODO: Load and validate all components # TODO: Fetch missing components and populate component specs - pipeline_name = root_task.component_ref.spec.name - with session.begin(): - - root_execution_node = _recursively_create_all_executions_and_artifacts_root( + pipeline_run = self._build_pipeline_run( session=session, - root_task_spec=root_task, - ) - - # Store into DB. - current_time = _get_current_time() - pipeline_run = bts.PipelineRun( - root_execution=root_execution_node, - created_at=current_time, - updated_at=current_time, + root_task=root_task, annotations=annotations, created_by=created_by, - extra_data={ - self._PIPELINE_NAME_EXTRA_DATA_KEY: pipeline_name, - }, - ) - session.add(pipeline_run) - # Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs. - # TODO: Use ORM relationship instead of explicit flush + manual FK assignment. - session.flush() - _mirror_system_annotations( - session=session, - pipeline_run_id=pipeline_run.id, - created_by=created_by, - pipeline_name=pipeline_name, ) session.commit() @@ -178,33 +198,11 @@ def create_batch( with session.begin(): for run_request in runs: - pipeline_name = run_request.root_task.component_ref.spec.name - - root_execution_node = ( - _recursively_create_all_executions_and_artifacts_root( - session=session, - root_task_spec=run_request.root_task, - ) - ) - - current_time = _get_current_time() - pipeline_run = bts.PipelineRun( - root_execution=root_execution_node, - created_at=current_time, - updated_at=current_time, - annotations=run_request.annotations, - created_by=created_by, - extra_data={ - self._PIPELINE_NAME_EXTRA_DATA_KEY: pipeline_name, - }, - ) - session.add(pipeline_run) - session.flush() - _mirror_system_annotations( + pipeline_run = self._build_pipeline_run( session=session, - pipeline_run_id=pipeline_run.id, + root_task=run_request.root_task, + annotations=run_request.annotations, created_by=created_by, - pipeline_name=pipeline_name, ) pipeline_runs.append(pipeline_run)