diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index d714f64a154..efff6948a00 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -57,6 +57,7 @@ message ControlRequest { EmptyRequest emptyRequest = 56; PrepareCheckpointRequest prepareCheckpointRequest = 57; QueryStatisticsRequest queryStatisticsRequest = 58; + EndIterationRequest endIterationRequest = 59; // request for testing Ping ping = 100; @@ -278,4 +279,9 @@ enum StatisticsUpdateTarget { message QueryStatisticsRequest{ repeated core.ActorVirtualIdentity filterByWorkers = 1; StatisticsUpdateTarget updateTarget = 2; +} + +message EndIterationRequest{ + core.ActorVirtualIdentity LoopStartId = 1 [(scalapb.field).no_box = true]; + int32 iteration = 2; } \ No newline at end of file diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto index dbcd6d8a5e0..a2c4d6ef9b7 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto @@ -47,6 +47,7 @@ service WorkerService { rpc EndWorker(EmptyRequest) returns (EmptyReturn); rpc StartChannel(EmptyRequest) returns (EmptyReturn); rpc EndChannel(EmptyRequest) returns (EmptyReturn); + rpc EndIteration(EndIterationRequest) returns (EmptyReturn); rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn); rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue); rpc NoOperation(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index bf4afbf396f..d92e8dcc987 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -87,6 +87,8 @@ def __init__(self, worker_id: str): PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread] ] = dict() + self._storage_uris: typing.Dict[PortIdentity, str] = dict() + def is_missing_output_ports(self): """ This method is only used for ensuring correct region execution. @@ -126,6 +128,7 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str): Create a separate thread for saving output tuples of a port to storage in batch. """ + self._storage_uris[port_id] = storage_uri document, _ = DocumentFactory.open_document(storage_uri) buffered_item_writer = document.writer(str(get_worker_index(self.worker_id))) writer_queue = Queue() @@ -171,6 +174,21 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None: PortStorageWriterElement(data_tuple=tuple_) ) + def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: + if port_id is None: + uris = self._storage_uris.values() + elif port_id in self._storage_uris: + uris = [self._storage_uris[port_id]] + else: + return + + for uri in uris: + writer = DocumentFactory.create_document( + uri.replace("/result", "/state"), state.schema + ).writer(str(get_worker_index(self.worker_id))) + writer.put_one(Tuple(vars(state))) + writer.close() + def close_port_storage_writers(self) -> None: """ Flush the buffers of port storage writers and wait for all the diff --git a/amber/src/main/python/core/models/operator.py b/amber/src/main/python/core/models/operator.py index 79050839958..7532fae1bd7 100644 --- a/amber/src/main/python/core/models/operator.py +++ b/amber/src/main/python/core/models/operator.py @@ -293,3 +293,27 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike] time, or None. """ yield + + +class LoopStartOperator(TableOperator): + def open(self) -> None: + pass + + @abstractmethod + def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + yield + + def close(self) -> None: + pass + + +class LoopEndOperator(TableOperator): + def open(self) -> None: + pass + + @abstractmethod + def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + yield + + def close(self) -> None: + pass diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index 2c8a268dfb7..ca39acc3335 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -35,6 +35,13 @@ def __init__( self.__dict__.update(table.to_pandas().iloc[0].to_dict()) self.schema = Schema(table.schema) + @classmethod + def from_tuple(cls, tuple, schema): + obj = cls() + obj.__dict__.update(tuple.as_dict()) + obj.schema = schema + return obj + def add( self, key: str, value: any, value_type: Optional[AttributeType] = None ) -> None: diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 4399b1a3a2f..815e85a6446 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None: self._context.worker_id, self._context.console_message_manager.print_buf, ): + self._switch_context() self._set_output_state(executor.process_state(state, port_id)) except Exception as err: diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index d73c655734f..0cfb89f7d27 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -94,12 +94,13 @@ def complete(self) -> None: """ # flush the buffered console prints self._check_and_report_console_messages(force_flush=True) + controller_interface = self._async_rpc_client.controller_stub() + #controller_interface.iteration_completed(EmptyRequest()) self.context.executor_manager.executor.close() # stop the data processing thread self.data_processor.stop() self.context.state_manager.transit_to(WorkerState.COMPLETED) self.context.statistics_manager.update_total_execution_time(time.time_ns()) - controller_interface = self._async_rpc_client.controller_stub() controller_interface.worker_execution_completed(EmptyRequest()) self.context.close() @@ -197,6 +198,7 @@ def process_input_state(self) -> None: payload=batch, ) ) + self.context.output_manager.save_state_to_storage_if_needed(output_state) def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]: """ @@ -329,7 +331,7 @@ def _process_ecm(self, ecm_element: ECMElement): if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT: self.context.pause_manager.resume(PauseType.ECM_PAUSE) - + self._switch_context() if self.context.tuple_processing_manager.current_internal_marker: { StartChannel: self._process_start_channel, diff --git a/amber/src/main/python/core/storage/document_factory.py b/amber/src/main/python/core/storage/document_factory.py index 9b686ab66b6..8a4d6fe3c5f 100644 --- a/amber/src/main/python/core/storage/document_factory.py +++ b/amber/src/main/python/core/storage/document_factory.py @@ -61,30 +61,35 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument: if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME: _, _, _, resource_type = VFSURIFactory.decode_uri(uri) - if resource_type in {VFSResourceType.RESULT}: - storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - - # Convert Amber Schema to Iceberg Schema with LARGE_BINARY - # field name encoding - iceberg_schema = amber_schema_to_iceberg_schema(schema) - - create_table( - IcebergCatalogInstance.get_instance(), - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - iceberg_schema, - override_if_exists=True, - ) - - return IcebergDocument[Tuple]( - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - iceberg_schema, - amber_tuples_to_arrow_table, - arrow_table_to_amber_tuples, - ) - else: - raise ValueError(f"Resource type {resource_type} is not supported") + match resource_type: + case VFSResourceType.RESULT: + namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + case VFSResourceType.STATE: + namespace = "state" + case _: + raise ValueError(f"Resource type {resource_type} is not supported") + + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + # Convert Amber Schema to Iceberg Schema with LARGE_BINARY + # field name encoding + iceberg_schema = amber_schema_to_iceberg_schema(schema) + + create_table( + IcebergCatalogInstance.get_instance(), + namespace, + storage_key, + iceberg_schema, + override_if_exists=True, + ) + + return IcebergDocument[Tuple]( + namespace, + storage_key, + iceberg_schema, + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + else: raise NotImplementedError( f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document" @@ -96,30 +101,36 @@ def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]: if parsed_uri.scheme == "vfs": _, _, _, resource_type = VFSURIFactory.decode_uri(uri) - if resource_type in {VFSResourceType.RESULT}: - storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - - table = load_table_metadata( - IcebergCatalogInstance.get_instance(), - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - ) - - if table is None: - raise ValueError("No storage is found for the given URI") - - amber_schema = Schema(table.schema().as_arrow()) - - document = IcebergDocument( - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - table.schema(), - amber_tuples_to_arrow_table, - arrow_table_to_amber_tuples, - ) - return document, amber_schema - else: - raise ValueError(f"Resource type {resource_type} is not supported") + match resource_type: + case VFSResourceType.RESULT: + namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + case VFSResourceType.STATE: + namespace = "state" + case _: + raise ValueError(f"Resource type {resource_type} is not supported") + + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + + table = load_table_metadata( + IcebergCatalogInstance.get_instance(), + namespace, + storage_key, + ) + + if table is None: + raise ValueError("No storage is found for the given URI") + + amber_schema = Schema(table.schema().as_arrow()) + + document = IcebergDocument( + namespace, + storage_key, + table.schema(), + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + return document, amber_schema + else: raise NotImplementedError( f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document" diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py index 9e17b2e0e82..cca785beb69 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py @@ -148,7 +148,7 @@ def create_postgres_catalog( catalog_name, **{ "uri": f"postgresql+pg8000://{username}:{password}@{uri_without_scheme}", - "warehouse": f"file://{warehouse_path}", + "warehouse": warehouse_path, }, ) @@ -180,7 +180,6 @@ def create_table( if catalog.table_exists(identifier) and override_if_exists: catalog.drop_table(identifier) - table = catalog.create_table( identifier=identifier, schema=table_schema, diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index e49c0316cc7..12aa8991263 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -17,8 +17,8 @@ import typing from loguru import logger -from pyarrow import Table from typing import Union +from pyarrow import Table from core.architecture.sendsemantics.broad_cast_partitioner import ( BroadcastPartitioner, @@ -34,7 +34,7 @@ from core.architecture.sendsemantics.round_robin_partitioner import ( RoundRobinPartitioner, ) -from core.models import Tuple, InternalQueue, DataFrame, DataPayload +from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame from core.models.internal_queue import DataElement, ECMElement from core.storage.document_factory import DocumentFactory from core.util import Stoppable, get_one_of @@ -125,6 +125,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) -> typing.Iterator[DataFrame if receiver == self.worker_actor_id: yield self.tuples_to_data_frame(tuples) + def emit_state_with_filter(self, state: State) -> typing.Iterator[StateFrame]: + for receiver, payload in self.partitioner.flush_state(state): + if receiver == self.worker_actor_id: + yield ( + StateFrame(payload) + if isinstance(payload, State) + else self.tuples_to_data_frame(payload) + ) + def run(self) -> None: """ Main execution logic that reads tuples from the materialized storage and @@ -149,6 +158,19 @@ def run(self) -> None: tup.cast_to_schema(self.tuple_schema) for data_frame in self.tuple_to_batch_with_filter(tup): self.emit_payload(data_frame) + try: + state_document, state_schema = DocumentFactory.open_document( + self.uri.replace("/result", "/state") + ) + state_iterator = state_document.get() + for state in state_iterator: + for state_frame in self.emit_state_with_filter( + State.from_tuple(state, state_schema) + ): + self.emit_payload(state_frame) + except ValueError: + pass + self.emit_ecm("EndChannel", EmbeddedControlMessageType.PORT_ALIGNMENT) self._finished = True except Exception as err: diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py b/amber/src/main/python/core/storage/vfs_uri_factory.py index de0c5db56ec..0e23e607055 100644 --- a/amber/src/main/python/core/storage/vfs_uri_factory.py +++ b/amber/src/main/python/core/storage/vfs_uri_factory.py @@ -34,6 +34,7 @@ class VFSResourceType(str, Enum): RESULT = "result" RUNTIME_STATISTICS = "runtimeStatistics" CONSOLE_MESSAGES = "consoleMessages" + STATE = "state" class VFSURIFactory: diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index e40d1a43fe0..c6001667380 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -19,6 +19,7 @@ from overrides import overrides from typing import Iterator, Optional, Union +from core.models.operator import LoopStartOperator, LoopEndOperator from pyamber import * from .storage.dataset_file_document import DatasetFileDocument from .storage.large_binary_input_stream import LargeBinaryInputStream @@ -43,6 +44,8 @@ "UDFTableOperator", "UDFBatchOperator", "UDFSourceOperator", + "LoopStartOperator", + "LoopEndOperator", "DatasetFileDocument", "largebinary", "LargeBinaryInputStream", diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala index e7763073232..22811b46417 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala @@ -43,7 +43,7 @@ abstract class AmberProcessor( with Serializable { /** FIFO & exactly once */ - val inputGateway: InputGateway = new NetworkInputGateway(this.actorId) + val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId) // 1. Unified Output val outputGateway: NetworkOutputGateway = @@ -55,7 +55,7 @@ abstract class AmberProcessor( } ) // 2. RPC Layer - val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId) + val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId) val asyncRPCServer: AsyncRPCServer = new AsyncRPCServer(outputGateway, actorId) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index b1acb3c0650..dafeeb4cc93 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -52,7 +52,7 @@ class WorkflowScheduler( this.physicalPlan = updatedPhysicalPlan } - def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() + def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext() def hasPendingRegions: Boolean = schedule != null && schedule.hasNext diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index b806479b892..2de29f31fdd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -44,6 +44,7 @@ case class WorkflowExecution() { * @throws AssertionError if the `RegionExecution` has already been initialized. */ def initRegionExecution(region: Region): RegionExecution = { + regionExecutions.remove(region.id) // ensure the region execution hasn't been initialized already. assert( !regionExecutions.contains(region.id), diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala index d81b4239ba7..b7611c8f8f9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala @@ -41,6 +41,7 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging { private var portId: Option[PortIdentity] = None def acceptMessage(msg: WorkflowFIFOMessage): Unit = { + //channel remove val seq = msg.sequenceNumber val payload = msg.payload if (isDuplicated(seq)) { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala index 5cfd8aabc04..1d3ee3cb72c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala @@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) enforcers += enforcer } + def removeControlChannel(from: ActorVirtualIdentity): Unit = { + inputChannels.remove(ChannelIdentity(from, actorId, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala index 929a30f4efa..e35e819d41f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala @@ -94,4 +94,8 @@ class NetworkOutputGateway( idToSequenceNums.getOrElseUpdate(channelId, new AtomicLong()).getAndIncrement() } + def removeControlChannel(to: ActorVirtualIdentity): Unit = { + idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 4ab3d18056f..a018158f369 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -22,6 +22,7 @@ package org.apache.texera.amber.engine.architecture.messaginglayer import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.BufferedItemWriter +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} @@ -124,6 +125,8 @@ class OutputManager( : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = mutable.HashMap() + private val storageUris: mutable.HashMap[Int, URI] = mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -232,6 +235,15 @@ class OutputManager( }) } + def saveStateToStorageIfNeeded(state: State, outputPortId: Int): Unit = { + val writer = DocumentFactory + .createDocument(this.storageUris(outputPortId).resolve("state"), state.schema) + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] + writer.putOne(state.toTuple) + writer.close() + } + /** * Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that * the output port is properly completed. If the output port does not need storage, no action will be done. @@ -280,6 +292,7 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { + this.storageUris(portId.id) = storageUri val bufferedItemWriter = DocumentFactory .openDocument(storageUri) ._1 diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index a0c73b6506d..6793248d8a3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -181,6 +181,8 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) + asyncRPCClient.inputGateway.removeControlChannel(workerId) + asyncRPCClient.outputGateway.removeControlChannel(workerId) gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq @@ -209,14 +211,15 @@ class RegionExecutionCoordinator( regionExecution: RegionExecution, attempt: Int = 1 ): Future[Unit] = { - terminateWorkers(regionExecution).rescue { case err => - logger.warn( - s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", - err - ) - Future - .sleep(killRetryDelay)(killRetryTimer) - .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) + terminateWorkers(regionExecution).rescue { + case err => + logger.warn( + s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", + err + ) + Future + .sleep(killRetryDelay)(killRetryTimer) + .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) } } @@ -568,7 +571,18 @@ class RegionExecutionCoordinator( region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - DocumentFactory.createDocument(storageUriToAdd, schema) + + if (region.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) { + try { + DocumentFactory.openDocument(storageUriToAdd) + } catch { + case _: Exception => + DocumentFactory.createDocument(storageUriToAdd, schema) + } + } else { + DocumentFactory.createDocument(storageUriToAdd, schema) + } + WorkflowExecutionsResource.insertOperatorPortResultUri( eid = eid, globalPortId = outputPortId, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 6f34c9ed1e5..36ccd4a5ad7 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -19,8 +19,15 @@ package org.apache.texera.amber.engine.architecture.scheduling +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} +import org.apache.texera.amber.util.JSONUtils.objectMapper + case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) + private var loopStartLevel = currentLevel + private var iteration = 3 + private var i = 1 def getRegions: List[Region] = levelSets.values.flatten.toList @@ -31,4 +38,24 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat currentLevel += 1 regions } + + def loopNext(): Set[Region] = { + val regions = levelSets(currentLevel) + if ( + regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-"))) + ) { + //iteration = objectMapper.readValue(regions.head.getOperators.head.opExecInitInfo.asInstanceOf[OpExecWithClassName].descString, classOf[LoopStartOpDesc]).iteration + loopStartLevel = currentLevel - 1 + } + if ( + regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) + ) { + if (i < iteration) { + currentLevel = loopStartLevel + i += 1 + } + } + currentLevel += 1 + regions + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 3aa5fa90a46..bc89f26a754 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -126,6 +126,7 @@ class DataProcessor( val outputState = executor.processState(state, port) if (outputState.isDefined) { outputManager.emitState(outputState.get) + outputManager.saveStateToStorageIfNeeded(state, port) } } catch safely { case e => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index 2abcdf66975..a49860c1e11 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -45,6 +45,7 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with ResumeHandler with StartHandler with EndHandler + with EndIterationHandler with StartChannelHandler with EndChannelHandler with AssignPortHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 10fbbc44a2c..91b22a98d9e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -21,31 +21,19 @@ package org.apache.texera.amber.engine.architecture.worker.managers import io.grpc.MethodDescriptor import org.apache.texera.amber.config.ApplicationConfig +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.Tuple -import org.apache.texera.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ChannelIdentity, - EmbeddedControlMessageIdentity -} +import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.toPartitioner -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ - NO_ALIGNMENT, - PORT_ALIGNMENT -} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{NO_ALIGNMENT, PORT_ALIGNMENT} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{ - METHOD_END_CHANNEL, - METHOD_START_CHANNEL -} +import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{METHOD_END_CHANNEL, METHOD_START_CHANNEL} import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning -import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{ - DPInputQueueElement, - FIFOMessageElement -} -import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{DPInputQueueElement, FIFOMessageElement} +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, StateFrame, WorkflowFIFOMessage} import org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage import java.net.URI @@ -106,6 +94,14 @@ class InputPortMaterializationReaderThread( } // Flush any remaining tuples in the buffer. if (buffer.nonEmpty) flush() + val state_document = DocumentFactory.openDocument(uri.resolve("state"))._1.asInstanceOf[VirtualDocument[Tuple]] + val stateReadIterator = state_document.get() + + if (stateReadIterator.hasNext) { + val state = State(Option(stateReadIterator.next())) + inputMessageQueue.put(FIFOMessageElement(WorkflowFIFOMessage(channelId, getSequenceNumber, StateFrame(state)))) + } + emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT) isFinished.set(true) } catch { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala new file mode 100644 index 00000000000..b0233bbdcb9 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.worker.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + EmptyRequest, + EndIterationRequest +} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer + +trait EndIterationHandler { + this: DataProcessorRPCHandlerInitializer => + + override def endIteration( + request: EndIterationRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + dp.executor match { + //case _: LoopEndOpExec => + //workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker)) + case _ => + //dp.processOnFinish() + //dp.outputManager.finalizeIteration(request.worker) + } + EmptyReturn() + } +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala index 704ebd7f476..f7e26803b47 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala @@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{ EmbeddedControlMessageIdentity } import org.apache.texera.amber.engine.architecture.controller.ClientEvent -import org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway +import org.apache.texera.amber.engine.architecture.messaginglayer.{ + NetworkInputGateway, + NetworkOutputGateway +} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ @@ -125,7 +128,8 @@ object AsyncRPCClient { } class AsyncRPCClient( - outputGateway: NetworkOutputGateway, + val inputGateway: NetworkInputGateway, + val outputGateway: NetworkOutputGateway, val actorId: ActorVirtualIdentity ) extends AmberLogging { diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 72fb1c364e5..92582afdd2b 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -247,6 +247,8 @@ object WorkflowExecutionsResource { OPERATOR_PORT_EXECUTIONS.RESULT_URI ) .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString) + .onConflict() + .doNothing() .execute() } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala index 3226c9d2fe7..92abd9041c6 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala @@ -41,14 +41,15 @@ final case class State(tuple: Option[Tuple] = None, passToAllDownstream: Boolean def apply(key: String): Any = get(key) + def schema: Schema = + Schema(data.map { + case (name, (attrType, _)) => + new Attribute(name, attrType) + }.toList) + def toTuple: Tuple = Tuple - .builder( - Schema(data.map { - case (name, (attrType, _)) => - new Attribute(name, attrType) - }.toList) - ) + .builder(schema) .addSequentially(data.values.map(_._2).toArray) .build() diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 15949ef4717..ae37def667e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -72,6 +72,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => "state" case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } @@ -119,6 +120,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => "state" case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala index 3513ac5ecd8..990776a69f0 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala @@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") val RUNTIME_STATISTICS: Value = Value("runtimeStatistics") val CONSOLE_MESSAGES: Value = Value("consoleMessages") + val STATE: Value = Value("state") } object VFSURIFactory { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 549cb4b9d17..25b6df58001 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -29,7 +29,7 @@ import org.apache.iceberg.io.{DataWriter, OutputFile} import org.apache.iceberg.parquet.Parquet import org.apache.iceberg.{Schema, Table} -import java.nio.file.Paths +import java.nio.file.{Files, Path, Paths} import scala.collection.mutable.ArrayBuffer /** @@ -107,9 +107,12 @@ private[storage] class IcebergTableWriter[T]( private def flushBuffer(): Unit = { if (buffer.nonEmpty) { // Create a unique file path using the writer's identifier and the filename index - val filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}") - // Increment the filename index by 1 - filenameIdx += 1 + var filepath: Path = null + do { + filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_$filenameIdx") + filenameIdx += 1 + } while (Files.exists(filepath)) + val outputFile: OutputFile = table.io().newOutputFile(filepath.toString) // Create a Parquet data writer to write a new file val dataWriter: DataWriter[Record] = Parquet diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index a575d5b0188..c842cf5db17 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -137,6 +137,7 @@ import org.apache.texera.amber.operator.visualization.volcanoPlot.VolcanoPlotOpD import org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder} +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc import org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc @@ -203,6 +204,8 @@ trait StateTransferFunc new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 00000000000..f5498fad1cd --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + val pythonCode = + try { + generatePythonCode() + } catch { + case ex: Throwable => + s"#EXCEPTION DURING CODE GENERATION: ${ex.getMessage}" + } + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(pythonCode, "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessLoopEndOperator(LoopEndOperator): + | @overrides + | def process_state(self, state: State, port: int) -> Optional[State]: + | print(state) + | return state + | + | @overrides + | def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + | yield table + |""".stripMargin + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 00000000000..f2550d2c00f --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopStartOpDesc extends LogicalOp { + @JsonProperty(required = true, defaultValue = "i = 0") + @JsonSchemaTitle("Initialization") + var initialization: String = _ + + @JsonProperty(required = true, defaultValue = "i += 1") + @JsonSchemaTitle("Update") + var update: String = _ + + @JsonProperty(required = true, defaultValue = "i < len(table)") + @JsonSchemaTitle("Condition") + var condition: String = _ + + @JsonProperty(required = true, defaultValue = "table.iloc[0]") + @JsonSchemaTitle("Output") + var output: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + val pythonCode = + try { + generatePythonCode() + } catch { + case ex: Throwable => + s"#EXCEPTION DURING CODE GENERATION: ${ex.getMessage}" + } + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(pythonCode, "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessTableOperator(UDFTableOperator): + | @overrides + | def produce_state_on_finish(self, port: int) -> State: + | table = Table(self._TableOperator__table_data[port]) + | state = State(pass_to_all_downstream = True) + | $initialization + | state["condition"] = $condition + | $update + | state["i"] = i + | return state + | + | @overrides + | def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + | $initialization + | yield $output + |""".stripMargin + } +} diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index f1532bf9a9d..592fe09c598 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -342,7 +342,6 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy body: { fill: "rgba(158,158,158,0.2)", pointerEvents: "none", - visibility: "hidden", }, }, }, diff --git a/frontend/src/assets/operator_images/LoopEnd.png b/frontend/src/assets/operator_images/LoopEnd.png new file mode 100644 index 00000000000..ee0f9ab6fac Binary files /dev/null and b/frontend/src/assets/operator_images/LoopEnd.png differ diff --git a/frontend/src/assets/operator_images/LoopStart.png b/frontend/src/assets/operator_images/LoopStart.png new file mode 100644 index 00000000000..7e5be023cdf Binary files /dev/null and b/frontend/src/assets/operator_images/LoopStart.png differ