diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 24d0745f41e..c76c8b4d9cd 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -22,7 +22,6 @@ from pyarrow import lib from core.models.type.large_binary import largebinary - class AttributeType(Enum): """ Types supported by PyTexera & PyAmber. @@ -78,6 +77,17 @@ class AttributeType(Enum): } +FROM_STRING_PARSER_MAPPING = { + AttributeType.STRING: str, + AttributeType.INT: int, + AttributeType.LONG: int, + AttributeType.DOUBLE: float, + AttributeType.BOOL: lambda v: str(v).strip().lower() in ("True", "true", "1", "yes"), + AttributeType.BINARY: lambda v: v if isinstance(v, bytes) else str(v).encode(), + AttributeType.TIMESTAMP: lambda v: datetime.datetime.fromisoformat(v), + AttributeType.LARGE_BINARY: largebinary, +} + # Only single-directional mapping. TO_PYOBJECT_MAPPING = { AttributeType.STRING: str, diff --git a/amber/src/main/python/pyamber/__init__.py b/amber/src/main/python/pyamber/__init__.py index 01ee5e08279..7a3d45c2ff9 100644 --- a/amber/src/main/python/pyamber/__init__.py +++ b/amber/src/main/python/pyamber/__init__.py @@ -27,6 +27,7 @@ SourceOperator, TupleOperatorV2, State, + AttributeType, ) __all__ = [ @@ -41,4 +42,5 @@ "TupleOperatorV2", "SourceOperator", "State", + "AttributeType", ] diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index e40d1a43fe0..8ad44fa55e3 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -30,6 +30,7 @@ UDFSourceOperator, ) from core.models.type.large_binary import largebinary +from core.models.schema.attribute_type import * __all__ = [ "State", @@ -53,4 +54,5 @@ "Iterator", "Optional", "Union", + "AttributeType", ] diff --git a/amber/src/main/python/pytexera/udf/udf_operator.py b/amber/src/main/python/pytexera/udf/udf_operator.py index 003225c75c3..25c7f0695e6 100644 --- a/amber/src/main/python/pytexera/udf/udf_operator.py +++ b/amber/src/main/python/pytexera/udf/udf_operator.py @@ -16,12 +16,76 @@ # under the License. from abc import abstractmethod -from typing import Iterator, Optional, Union - -from pyamber import * +from typing import Any, Dict, Iterator, Optional, Union +import functools -class UDFOperatorV2(TupleOperatorV2): +from pyamber import * +from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING + +class _UiParameterSupport: + _ui_parameter_injected_values: Dict[str, Any] = {} + _ui_parameter_name_types: Dict[str, AttributeType] = {} + + # Reserved hook name. Backend injector will generate this in the user's class. + def _texera_injected_ui_parameters(self) -> Dict[str, Any]: + return {} + + def _texera_apply_injected_ui_parameters(self) -> None: + values = self._texera_injected_ui_parameters() + # Write to base class storage (not cls) because UiParameter reads from _UiParameterSupport directly + _UiParameterSupport._ui_parameter_injected_values = dict(values or {}) + _UiParameterSupport._ui_parameter_name_types = {} + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + + # Wrap only methods defined on this class (not inherited ones) + original_open = getattr(cls, "open", None) + if original_open is None: + return + + # Avoid double wrapping + if getattr(original_open, "__texera_ui_params_wrapped__", False): + return + + @functools.wraps(original_open) + def wrapped_open(self, *args, **kwargs): + self._texera_apply_injected_ui_parameters() + return original_open(self, *args, **kwargs) + + setattr(wrapped_open, "__texera_ui_params_wrapped__", True) + cls.open = wrapped_open + + class UiParameter: + def __init__(self, name: str, type: AttributeType): + if not isinstance(type, AttributeType): + raise TypeError( + f"UiParameter.type must be an AttributeType, got {type!r}." + ) + + existing_type = _UiParameterSupport._ui_parameter_name_types.get(name) + if existing_type is not None and existing_type != type: + raise ValueError( + f"Duplicate UiParameter name '{name}' with conflicting types: " + f"{existing_type.name} vs {type.name}." + ) + + _UiParameterSupport._ui_parameter_name_types[name] = type + raw_value = _UiParameterSupport._ui_parameter_injected_values.get(name) + self.name = name + self.type = type + self.value = _UiParameterSupport._parse(raw_value, type) + + @staticmethod + def _parse(value: Any, attr_type: AttributeType) -> Any: + if value is None: + return None + + py_type = FROM_STRING_PARSER_MAPPING.get(attr_type) + return py_type(value) + +class UDFOperatorV2(_UiParameterSupport, TupleOperatorV2): """ Base class for tuple-oriented user-defined operators. A concrete implementation must be provided upon using. @@ -65,7 +129,7 @@ def close(self) -> None: pass -class UDFSourceOperator(SourceOperator): +class UDFSourceOperator(_UiParameterSupport, SourceOperator): def open(self) -> None: """ Open a context of the operator. Usually can be used for loading/initiating some @@ -90,7 +154,7 @@ def close(self) -> None: pass -class UDFTableOperator(TableOperator): +class UDFTableOperator(_UiParameterSupport, TableOperator): """ Base class for table-oriented user-defined operators. A concrete implementation must be provided upon using. @@ -123,7 +187,7 @@ def close(self) -> None: pass -class UDFBatchOperator(BatchOperator): +class UDFBatchOperator(_UiParameterSupport, BatchOperator): """ Base class for batch-oriented user-defined operators. A concrete implementation must be provided upon using. diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java index 84d52fddced..dc812a55991 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java @@ -21,6 +21,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.texera.amber.pybuilder.EncodableStringAnnotation; +import org.apache.texera.amber.pybuilder.PyStringTypes; +import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableStringFactory$; + import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -49,6 +53,7 @@ public Attribute( @JsonProperty(value = "attributeName", required = true) @NotBlank(message = "Attribute name is required") + @EncodableStringAnnotation public String getName() { return attributeName; } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala index 3f056c96055..a7a02560901 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala @@ -77,6 +77,11 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp { ) var outputColumns: List[Attribute] = List() + @JsonProperty + @JsonSchemaTitle("Parameters") + @JsonPropertyDescription("Parameters inferred from self.UiParameter(...) in Python script") + var uiParameters: List[UiUDFParameter] = List() + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity @@ -88,7 +93,7 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp { workflowId, executionId, operatorIdentifier, - OpExecWithCode(code, "python") + OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python") ) .withParallelizable(true) .withSuggestedWorkerNum(workers) @@ -98,7 +103,7 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp { workflowId, executionId, operatorIdentifier, - OpExecWithCode(code, "python") + OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python") ) .withParallelizable(false) } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2.scala index ef4da06cef9..efac09d63b4 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2.scala @@ -79,6 +79,11 @@ class PythonUDFOpDescV2 extends LogicalOp { ) var outputColumns: List[Attribute] = List() + @JsonProperty + @JsonSchemaTitle("Parameters") + @JsonPropertyDescription("Parameters inferred from self.UiParameter(...) in Python script") + var uiParameters: List[UiUDFParameter] = List() + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity @@ -118,7 +123,7 @@ class PythonUDFOpDescV2 extends LogicalOp { workflowId, executionId, operatorIdentifier, - OpExecWithCode(code, "python") + OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python") ) .withParallelizable(true) .withSuggestedWorkerNum(workers) @@ -128,7 +133,7 @@ class PythonUDFOpDescV2 extends LogicalOp { workflowId, executionId, operatorIdentifier, - OpExecWithCode(code, "python") + OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python") ) .withParallelizable(false) } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala new file mode 100644 index 00000000000..67b9ca67bb9 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext + +import scala.util.matching.Regex + +object PythonUdfUiParameterInjector { + + private val ReservedHookMethod = "_texera_injected_ui_parameters" + + // Match user-facing UDF classes (the ones users write) + private val SupportedUserClassRegex: Regex = + """(?m)^([ \t]*)class\s+(ProcessTupleOperator|ProcessBatchOperator|ProcessTableOperator|GenerateOperator)\s*\([^)]*\)\s*:\s*(?:#.*)?$""".r + + private def validate(uiParameters: List[UiUDFParameter]): Unit = { + uiParameters.foreach { parameter => + if (parameter.attribute == null) { + throw new RuntimeException("UiParameter attribute is required.") + } + } + + val grouped = uiParameters.groupBy(_.attribute.getName) + grouped.foreach { + case (key, values) => + val typeSet = values.map(_.attribute.getType).toSet + if (typeSet.size > 1) { + throw new RuntimeException( + s"UiParameter key '$key' has multiple types: ${typeSet.map(_.name()).mkString(",")}." + ) + } + } + } + + private def buildInjectedParametersMap( + uiParameters: List[UiUDFParameter] + ): PythonTemplateBuilder = { + val entries = uiParameters.map { parameter => + pyb"'${parameter.attribute.getName()}': ${parameter.value}" + } + + entries.reduceOption((acc, entry) => acc + pyb", " + entry).getOrElse(pyb"") + } + + private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]): String = { + val injectedParametersMap = buildInjectedParametersMap(uiParameters) + + // unindented method; we indent it when inserting into the class body + (pyb"""|@overrides + |def """ + pyb"$ReservedHookMethod" + pyb"""(self) -> typing.Dict[str, typing.Any]: + | return {""" + + injectedParametersMap + + pyb"""} + |""").encode + } + + private def ensureTypingImport(encodedUserCode: String): String = { + val alreadyImported = encodedUserCode + .split("\n", -1) + .exists(_.trim == "import typing") + + if (alreadyImported) { + encodedUserCode + } else { + "import typing\n" + encodedUserCode + } + } + + private def indentBlock(block: String, indent: String): String = { + block + .split("\n", -1) + .map { line => + if (line.nonEmpty) indent + line else line + } + .mkString("\n") + } + + private def lineEndIndex(text: String, from: Int): Int = { + val idx = text.indexOf('\n', from) + if (idx < 0) text.length else idx + } + + private def detectClassBlockEnd(code: String, classHeaderStart: Int, classIndent: String): Int = { + val classLineEnd = lineEndIndex(code, classHeaderStart) + var pos = if (classLineEnd < code.length) classLineEnd + 1 else code.length + + while (pos < code.length) { + val end = lineEndIndex(code, pos) + val line = code.substring(pos, end) + + val trimmed = line.trim + val isBlank = trimmed.isEmpty + + // a top-level (or same/lower-indented) non-blank line ends the class block + val currentIndentLen = line.prefixLength(ch => ch == ' ' || ch == '\t') + val classIndentLen = classIndent.length + + if (!isBlank && currentIndentLen <= classIndentLen) { + return pos + } + + pos = if (end < code.length) end + 1 else code.length + } + + code.length + } + + private def containsReservedHook(classBlock: String): Boolean = { + val hookRegex = ("""(?m)^[ \t]+def\s+""" + Regex.quote(ReservedHookMethod) + """\s*\(""").r + hookRegex.findFirstIn(classBlock).isDefined + } + + private def findInsertionPointInsideClass(classBlock: String, classIndent: String): Int = { + // Insert before the first method definition in the class body. + // This preserves existing open() and also preserves class docstrings if present. + val methodRegex = """(?m)^[ \t]+def\s+\w+\s*\(""".r + methodRegex.findFirstMatchIn(classBlock).map(_.start).getOrElse(classBlock.length) + } + + private def injectHookIntoUserClass(encodedUserCode: String, hookMethod: String): String = { + val m = SupportedUserClassRegex.findFirstMatchIn(encodedUserCode).getOrElse { + return encodedUserCode + } + + val classHeaderStart = m.start + val classIndent = m.group(1) + val classBlockEnd = detectClassBlockEnd(encodedUserCode, classHeaderStart, classIndent) + + val classBlock = encodedUserCode.substring(classHeaderStart, classBlockEnd) + + if (containsReservedHook(classBlock)) { + throw new RuntimeException( + s"Reserved method '$ReservedHookMethod' is already defined in the UDF class. Please rename your method." + ) + } + + val bodyIndent = inferClassBodyIndent(classBlock, classIndent).getOrElse(classIndent + " ") + val indentedHook = indentBlock( + (if (classBlock.endsWith("\n")) "" else "\n") + hookMethod.trim + "\n", + bodyIndent + ) + + encodedUserCode.substring(0, classBlockEnd) + + indentedHook + + encodedUserCode.substring(classBlockEnd) + } + + private def inferClassBodyIndent(classBlock: String, classIndent: String): Option[String] = { + val lines = classBlock.split("\n", -1).toList.drop(1) // skip class header line + + lines.collectFirst { + case line if line.trim.nonEmpty => + val leading = line.takeWhile(ch => ch == ' ' || ch == '\t') + if (leading.length > classIndent.length) leading else classIndent + " " + } + } + + def inject(code: String, uiParameters: List[UiUDFParameter]): String = { + val params = Option(uiParameters).getOrElse(List.empty) + validate(params) + + // Let pyb encode the user's source normally + val encodedUserCode = pyb"$code".encode + + // If there are no UI params, return unchanged code (no hook injection needed) + if (params.isEmpty) { + return encodedUserCode + } + + val encodedUserCodeWithTypingImport = ensureTypingImport(encodedUserCode) + + // Build encoded hook method (contains self.decode_python_template(...)) + val hookMethod = buildInjectedHookMethod(params) + + // Inject hook into the UDF class body; Python base class will auto-call it before open() + injectHookIntoUserClass(encodedUserCodeWithTypingImport, hookMethod) + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala new file mode 100644 index 00000000000..71ce2596788 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.tuple.Attribute +import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString + +import javax.validation.Valid +import javax.validation.constraints.NotNull + +class UiUDFParameter { + + @JsonProperty(required = true) + @JsonSchemaTitle("Attribute") + @Valid + @NotNull(message = "Attribute is required") + var attribute: Attribute = _ + + @JsonProperty() + @JsonSchemaTitle("Value") + var value: EncodableString = "" +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.scala index b575612d884..23033621209 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.scala @@ -27,18 +27,20 @@ import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, Workflow import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, SchemaPropagationFunc} import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} import org.apache.texera.amber.operator.source.SourceOperatorDescriptor +import org.apache.texera.amber.operator.udf.python.{PythonUdfUiParameterInjector, UiUDFParameter} class PythonUDFSourceOpDescV2 extends SourceOperatorDescriptor { @JsonProperty( required = true, - defaultValue = "# from pytexera import *\n" + - "# class GenerateOperator(UDFSourceOperator):\n" + - "# \n" + - "# @overrides\n" + - "# \n" + - "# def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:\n" + - "# yield\n" + defaultValue = + "# from pytexera import *\n" + + "# class GenerateOperator(UDFSourceOperator):\n" + + "# \n" + + "# @overrides\n" + + "# \n" + + "# def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:\n" + + "# yield\n" ) @JsonSchemaTitle("Python script") @JsonPropertyDescription("Input your code here") @@ -54,13 +56,23 @@ class PythonUDFSourceOpDescV2 extends SourceOperatorDescriptor { @JsonPropertyDescription("The columns of the source") var columns: List[Attribute] = List.empty + @JsonProperty + @JsonSchemaTitle("Parameters") + @JsonPropertyDescription("Parameters inferred from self.UiParameter(...) in Python script") + var uiParameters: List[UiUDFParameter] = List() + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity ): PhysicalOp = { require(workers >= 1, "Need at least 1 worker.") val physicalOp = PhysicalOp - .sourcePhysicalOp(workflowId, executionId, operatorIdentifier, OpExecWithCode(code, "python")) + .sourcePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python") + ) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) .withIsOneToManyOp(true) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala new file mode 100644 index 00000000000..9d7e367255b --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { + + private def createParameter( + key: String, + attributeType: AttributeType, + value: String + ): UiUDFParameter = { + val parameter = new UiUDFParameter + parameter.attribute = new Attribute(key, attributeType) + parameter.value = value + parameter + } + + private val baseUdfCode: String = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def open(self): + | print("open") + | + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int): + | yield tuple_ + |""".stripMargin + + it should "return encoded user code unchanged when there are no ui parameters" in { + val injectedCode = PythonUdfUiParameterInjector.inject(baseUdfCode, Nil) + + injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") + injectedCode should include("""print("open")""") + injectedCode should not include ("_texera_injected_ui_parameters") + injectedCode should not include ("self.decode_python_template") + } + + it should "inject ui parameter hook into supported UDF class" in { + val injectedCode = PythonUdfUiParameterInjector.inject( + baseUdfCode, + List( + createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") + ) + ) + + injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") + injectedCode should include("def _texera_injected_ui_parameters(self):") + injectedCode should include("return {") + injectedCode should include("self.decode_python_template") + injectedCode should include("""print("open")""") + } + + it should "inject the reserved hook before the first method definition" in { + val injectedCode = PythonUdfUiParameterInjector.inject( + baseUdfCode, + List(createParameter("k", AttributeType.STRING, "v")) + ) + + val hookIndex = injectedCode.indexOf("def _texera_injected_ui_parameters(self):") + val openIndex = injectedCode.indexOf("def open(self):") + + hookIndex should be >= 0 + openIndex should be > hookIndex + } + + it should "preserve multiple ui parameters in the injected map" in { + val injectedCode = PythonUdfUiParameterInjector.inject( + baseUdfCode, + List( + createParameter("param1", AttributeType.DOUBLE, "12.5"), + createParameter("param2", AttributeType.INTEGER, "1"), + createParameter("param3", AttributeType.STRING, "Hola"), + createParameter("param4", AttributeType.TIMESTAMP, "2026-02-28T03:15:00Z") + ) + ) + + injectedCode should include("def _texera_injected_ui_parameters(self):") + injectedCode should include("self.decode_python_template") + injectedCode.count(_ == ':') should be > 0 + } + + it should "throw when a parameter attribute is missing" in { + val invalidParameter = new UiUDFParameter + invalidParameter.attribute = null + invalidParameter.value = "anything" + + val exception = the[RuntimeException] thrownBy { + PythonUdfUiParameterInjector.inject(baseUdfCode, List(invalidParameter)) + } + + exception.getMessage should include("UiParameter attribute is required") + } + + it should "throw when a key is declared with conflicting attribute types" in { + val conflictingParameters = List( + createParameter("date", AttributeType.STRING, "2024-01-01"), + createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") + ) + + val exception = the[RuntimeException] thrownBy { + PythonUdfUiParameterInjector.inject(baseUdfCode, conflictingParameters) + } + + exception.getMessage should include("UiParameter key 'date' has multiple types") + } + + it should "allow duplicate keys when the attribute type is the same" in { + val sameTypeParameters = List( + createParameter("date", AttributeType.TIMESTAMP, "2024-01-01"), + createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") + ) + + noException should be thrownBy { + PythonUdfUiParameterInjector.inject(baseUdfCode, sameTypeParameters) + } + } + + it should "throw when the reserved hook is already defined by the user" in { + val udfWithReservedHook = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | def _texera_injected_ui_parameters(self): + | return {} + | + | def open(self): + | pass + |""".stripMargin + + val exception = the[RuntimeException] thrownBy { + PythonUdfUiParameterInjector.inject( + udfWithReservedHook, + List(createParameter("k", AttributeType.STRING, "v")) + ) + } + + exception.getMessage should include( + "Reserved method '_texera_injected_ui_parameters' is already defined" + ) + } + + it should "leave code unchanged when no supported user class is present" in { + val nonSupportedCode = + """from pytexera import * + | + |class SomethingElse: + | def open(self): + | pass + |""".stripMargin + + val injectedCode = PythonUdfUiParameterInjector.inject( + nonSupportedCode, + List(createParameter("k", AttributeType.STRING, "v")) + ) + + injectedCode should not include ("_texera_injected_ui_parameters") + injectedCode should include("class SomethingElse:") + } +} diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts index b41b1f80b73..d0acf5e5f35 100644 --- a/frontend/src/app/app.module.ts +++ b/frontend/src/app/app.module.ts @@ -153,6 +153,7 @@ import { NzTreeViewModule } from "ng-zorro-antd/tree-view"; import { NzNoAnimationModule } from "ng-zorro-antd/core/no-animation"; import { TreeModule } from "@ali-hm/angular-tree-component"; import { FileSelectionComponent } from "./workspace/component/file-selection/file-selection.component"; +import { UiUdfParametersComponent } from "./workspace/component/ui-udf-parameters/ui-udf-parameters.component"; import { ResultExportationComponent } from "./workspace/component/result-exportation/result-exportation.component"; import { ReportGenerationService } from "./workspace/service/report-generation/report-generation.service"; import { SearchBarComponent } from "./dashboard/component/user/search-bar/search-bar.component"; @@ -257,6 +258,7 @@ registerLocaleData(en); AgentRegistrationComponent, InputAutoCompleteComponent, FileSelectionComponent, + UiUdfParametersComponent, CollabWrapperComponent, AboutComponent, UserWorkflowListItemComponent, diff --git a/frontend/src/app/common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component.css b/frontend/src/app/common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component.css index 5f583284045..500e875dfa7 100644 --- a/frontend/src/app/common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component.css +++ b/frontend/src/app/common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component.css @@ -25,7 +25,7 @@ outline: transparent; overflow: visible; position: relative; - : 1pt; + :1pt; } :host ::ng-deep .ql-editor > p { diff --git a/frontend/src/app/common/formly/formly-config.ts b/frontend/src/app/common/formly/formly-config.ts index d950bd3690c..f2fa38ab493 100644 --- a/frontend/src/app/common/formly/formly-config.ts +++ b/frontend/src/app/common/formly/formly-config.ts @@ -27,6 +27,7 @@ import { PresetWrapperComponent } from "./preset-wrapper/preset-wrapper.componen import { InputAutoCompleteComponent } from "../../workspace/component/input-autocomplete/input-autocomplete.component"; import { CollabWrapperComponent } from "./collab-wrapper/collab-wrapper/collab-wrapper.component"; import { FormlyRepeatDndComponent } from "./repeat-dnd/repeat-dnd.component"; +import { UiUdfParametersComponent } from "../../workspace/component/ui-udf-parameters/ui-udf-parameters.component"; /** * Configuration for using Json Schema with Formly. @@ -78,6 +79,7 @@ export const TEXERA_FORMLY_CONFIG = { { name: "codearea", component: CodeareaCustomTemplateComponent }, { name: "inputautocomplete", component: InputAutoCompleteComponent, wrappers: ["form-field"] }, { name: "repeat-section-dnd", component: FormlyRepeatDndComponent }, + { name: "ui-udf-parameters", component: UiUdfParametersComponent, wrappers: ["form-field"] }, ], wrappers: [ { name: "preset-wrapper", component: PresetWrapperComponent }, diff --git a/frontend/src/app/workspace/component/code-editor-dialog/code-editor.component.ts b/frontend/src/app/workspace/component/code-editor-dialog/code-editor.component.ts index 18ebfc59a1f..81c8db5abe9 100644 --- a/frontend/src/app/workspace/component/code-editor-dialog/code-editor.component.ts +++ b/frontend/src/app/workspace/component/code-editor-dialog/code-editor.component.ts @@ -47,11 +47,12 @@ import "@codingame/monaco-vscode-python-default-extension"; import "@codingame/monaco-vscode-r-default-extension"; import "@codingame/monaco-vscode-java-default-extension"; import { isDefined } from "../../../common/util/predicate"; -import { filter, switchMap } from "rxjs/operators"; +import { debounceTime, filter, switchMap } from "rxjs/operators"; import { BreakpointConditionInputComponent } from "./breakpoint-condition-input/breakpoint-condition-input.component"; import { CodeDebuggerComponent } from "./code-debugger.component"; import { MonacoEditor } from "monaco-breakpoints/dist/types"; import { GuiConfigService } from "src/app/common/service/gui-config.service"; +import { UiUdfParametersSyncService } from "../../service/code-editor/ui-udf-parameters-sync.service"; export const LANGUAGE_SERVER_CONNECTION_TIMEOUT_MS = 1000; @@ -102,6 +103,7 @@ export class CodeEditorComponent implements AfterViewInit, SafeStyle, OnDestroy private isMultipleVariables: boolean = false; public codeDebuggerComponent!: Type | null; public editorToPass!: MonacoEditor; + // private readonly pythonCodeChangeSubject = new Subject(); private generateLanguageTitle(language: string): string { return `${language.charAt(0).toUpperCase()}${language.slice(1)} UDF`; @@ -118,7 +120,8 @@ export class CodeEditorComponent implements AfterViewInit, SafeStyle, OnDestroy private workflowVersionService: WorkflowVersionService, public coeditorPresenceService: CoeditorPresenceService, private aiAssistantService: AIAssistantService, - private config: GuiConfigService + private config: GuiConfigService, + private uiUdfParametersSyncService: UiUdfParametersSyncService ) { this.currentOperatorId = this.workflowActionService.getJointGraphWrapper().getCurrentHighlightedOperatorIDs()[0]; const operatorType = this.workflowActionService.getTexeraGraph().getOperator(this.currentOperatorId).operatorType; @@ -143,9 +146,8 @@ export class CodeEditorComponent implements AfterViewInit, SafeStyle, OnDestroy .get("operatorProperties") as YType> ).get("code") as YText; } - + private detachYCodeListener?: () => void; ngAfterViewInit() { - // hacky solution to reset view after view is rendered. const style = localStorage.getItem(this.currentOperatorId); if (style) this.containerElement.nativeElement.style.cssText = style; @@ -176,6 +178,9 @@ export class CodeEditorComponent implements AfterViewInit, SafeStyle, OnDestroy this.workflowVersionStreamSubject.next(); this.workflowVersionStreamSubject.complete(); } + if (this.detachYCodeListener) { + this.detachYCodeListener(); + } } /** @@ -273,6 +278,13 @@ export class CodeEditorComponent implements AfterViewInit, SafeStyle, OnDestroy ); this.setupAIAssistantActions(editor); this.initCodeDebuggerComponent(editor); + if (this.detachYCodeListener) { + this.detachYCodeListener(); + } + + if (this.code) { + this.detachYCodeListener = this.uiUdfParametersSyncService.attachToYCode(this.currentOperatorId, this.code); + } }); } diff --git a/frontend/src/app/workspace/component/menu/coeditor-user-icon/coeditor-user-icon.component.css b/frontend/src/app/workspace/component/menu/coeditor-user-icon/coeditor-user-icon.component.css index 826a2e3e64f..51da6c0f2bb 100644 --- a/frontend/src/app/workspace/component/menu/coeditor-user-icon/coeditor-user-icon.component.css +++ b/frontend/src/app/workspace/component/menu/coeditor-user-icon/coeditor-user-icon.component.css @@ -16,4 +16,3 @@ * specific language governing permissions and limitations * under the License. */ - diff --git a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.scss b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.scss index 4126a9ee1ce..aa32d22b4aa 100644 --- a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.scss +++ b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.scss @@ -73,3 +73,17 @@ margin-bottom: 0; } } + +/* ================================ + Style ONLY the UDF Parameters field + ================================ */ + +:host ::ng-deep label[for*="ui-udf-parameters"] { + font-weight: 700; +} + +:host ::ng-deep nz-form-item:has(label[for*="ui-udf-parameters"]) { + border-top: 1.5px solid #d1d1d1; + padding-top: 12px; + margin-top: 8px; +} diff --git a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts index 5d457e9050e..e98193c9924 100644 --- a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts +++ b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts @@ -67,7 +67,7 @@ import * as Y from "yjs"; import { OperatorSchema } from "src/app/workspace/types/operator-schema.interface"; import { AttributeType, PortSchema } from "../../../types/workflow-compiling.interface"; import { GuiConfigService } from "../../../../common/service/gui-config.service"; - +import { UiUdfParametersSyncService } from "../../../service/code-editor/ui-udf-parameters-sync.service"; Quill.register("modules/cursors", QuillCursors); /** @@ -155,7 +155,8 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On private changeDetectorRef: ChangeDetectorRef, private workflowVersionService: WorkflowVersionService, private workflowStatusSerivce: WorkflowStatusService, - private config: GuiConfigService + private config: GuiConfigService, + private uiUdfParametersSyncService: UiUdfParametersSyncService ) {} ngOnChanges(changes: SimpleChanges): void { @@ -193,6 +194,23 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On this.currentOperatorStatus = update[this.currentOperatorId]; } }); + + this.uiUdfParametersSyncService.uiParametersChanged$ + .pipe(untilDestroyed(this)) + .subscribe(({ operatorId, parameters }) => { + if (operatorId !== this.currentOperatorId) return; + + const currentOperator = this.workflowActionService.getTexeraGraph().getOperator(operatorId); + + const newModel = { + ...cloneDeep(currentOperator.operatorProperties), + uiParameters: cloneDeep(parameters), + }; + + this.listeningToChange = false; + this.workflowActionService.setOperatorProperty(operatorId, newModel); + this.listeningToChange = true; + }); } async ngOnDestroy() { @@ -453,6 +471,10 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On mappedField.type = "inputautocomplete"; } + if (mappedField.key === "uiParameters") { + mappedField.type = "ui-udf-parameters"; + } + // if the title is python script (for Python UDF), then make this field a custom template 'codearea' if (mapSource?.description?.toLowerCase() === "input your code here") { if (mappedField.type) { diff --git a/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.html b/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.html new file mode 100644 index 00000000000..0ecfb16a780 --- /dev/null +++ b/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.html @@ -0,0 +1,55 @@ + +
+ +
+
Value
+
Name
+
Type
+
+ +
+ + +
+ + + +
+ + +
+ + + +
+ + +
+ + + +
+
+
+
diff --git a/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.scss b/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.scss new file mode 100644 index 00000000000..901edaf1fc9 --- /dev/null +++ b/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.scss @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +.ui-udf-param-row { + display: grid; + grid-template-columns: 250px 250px 1fr; + gap: 12px; + align-items: start; +} + +.field-cell { + min-width: 0; +} + +/* Remove Formly/Ant label spacing */ +:host ::ng-deep .ant-form-item { + margin-bottom: 0; +} + +/* Hide Formly labels*/ +:host ::ng-deep .ant-form-item-label { + display: none; +} diff --git a/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.ts b/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.ts new file mode 100644 index 00000000000..10e98d7afa7 --- /dev/null +++ b/frontend/src/app/workspace/component/ui-udf-parameters/ui-udf-parameters.component.ts @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Component } from "@angular/core"; +import { FieldArrayType, FormlyFieldConfig } from "@ngx-formly/core"; + +@Component({ + selector: "texera-ui-udf-parameters", + templateUrl: "./ui-udf-parameters.component.html", + styleUrls: ["./ui-udf-parameters.component.scss"], +}) +export class UiUdfParametersComponent extends FieldArrayType { + private getField(rowField: FormlyFieldConfig, key: string): FormlyFieldConfig | undefined { + return rowField.fieldGroup?.find(f => f.key === key); + } + + private getAttributeChild(rowField: FormlyFieldConfig, childKey: string): FormlyFieldConfig | undefined { + const attributeGroup = this.getField(rowField, "attribute"); + return attributeGroup?.fieldGroup?.find(f => f.key === childKey); + } + + private setDisabled(field: FormlyFieldConfig | undefined, disabled: boolean): FormlyFieldConfig | undefined { + if (!field) return undefined; + + // 1) Modern Formly + field.props = { ...(field.props ?? {}), disabled }; + + // 2) Compatibility for templates/wrappers still using templateOptions + // (use `as any` so you don't get nagged by the @deprecated JSDoc) + (field as any).templateOptions = { ...((field as any).templateOptions ?? {}), disabled }; + + // 3) Enforce at the reactive form level + if (field.formControl) { + if (disabled) { + field.formControl.disable({ emitEvent: false }); + } else { + field.formControl.enable({ emitEvent: false }); + } + } else { + // If control isn't created yet, disable it at init time. + const prevOnInit = field.hooks?.onInit; + field.hooks = { + ...(field.hooks ?? {}), + onInit: f => { + prevOnInit?.(f); + if (disabled) { + f.formControl?.disable({ emitEvent: false }); + } else { + f.formControl?.enable({ emitEvent: false }); + } + }, + }; + } + + return field; + } + + // Disable Name + getNameField(rowField: FormlyFieldConfig): FormlyFieldConfig | undefined { + return this.setDisabled(this.getAttributeChild(rowField, "attributeName"), true); + } + + // Disable Type (set to false if you want it editable) + getTypeField(rowField: FormlyFieldConfig): FormlyFieldConfig | undefined { + return this.setDisabled(this.getAttributeChild(rowField, "attributeType"), true); + } + + // Value editable (set to true to disable) + getValueField(rowField: FormlyFieldConfig): FormlyFieldConfig | undefined { + return this.setDisabled(this.getField(rowField, "value"), false); + } + + trackByParamName = (index: number, param: any): string | number => { + return param?.attribute?.attributeName ?? index; + }; +} diff --git a/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-parser.service.spec.ts b/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-parser.service.spec.ts new file mode 100644 index 00000000000..d64c81ef463 --- /dev/null +++ b/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-parser.service.spec.ts @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { UiUdfParametersParserService } from "./ui-udf-parameters-parser.service"; + +describe("UiUdfParametersParserService", () => { + let service: UiUdfParametersParserService; + + beforeEach(() => { + service = new UiUdfParametersParserService(); + }); + + it("should parse positional and name-based arguments", () => { + const code = ` + class ProcessTupleOperator(UDFOperatorV2): + def open(self): + self.UiParameter(AttributeType.INT, "count") + self.UiParameter(type=AttributeType.STRING, name="name") + self.UiParameter(name="age", type=AttributeType.LONG) + self.UiParameter(AttributeType.DOUBLE, name="score") + self.UiParameter("created_at", type=AttributeType.TIMESTAMP) + `; + + expect(service.parse(code)).toEqual([ + { attribute: { attributeName: "count", attributeType: "INT" }, value: "" }, + { attribute: { attributeName: "name", attributeType: "STRING" }, value: "" }, + { attribute: { attributeName: "age", attributeType: "LONG" }, value: "" }, + { attribute: { attributeName: "score", attributeType: "DOUBLE" }, value: "" }, + { attribute: { attributeName: "created_at", attributeType: "TIMESTAMP" }, value: "" }, + ]); + }); + + it("should ignore calls where name or type is missing", () => { + const code = ` + class ProcessTupleOperator(UDFOperatorV2): + def open(self): + self.UiParameter(name="a") + self.UiParameter(type=AttributeType.DOUBLE) + `; + + expect(service.parse(code)).toEqual([]); + }); + + it("should ignore legacy key= named argument", () => { + const code = ` + class ProcessTupleOperator(UDFOperatorV2): + def open(self): + self.UiParameter(type=AttributeType.DOUBLE, key="a") + `; + + expect(service.parse(code)).toEqual([]); + }); + + it("should ignore unsupported classes", () => { + const code = ` + class RandomClass(ABC): + def open(self): + self.UiParameter(type=AttributeType.DOUBLE, name="a") + `; + + expect(service.parse(code)).toEqual([]); + }); +}); diff --git a/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-parser.service.ts b/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-parser.service.ts new file mode 100644 index 00000000000..bf080bb5fee --- /dev/null +++ b/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-parser.service.ts @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { + AttributeType, + JavaAttributeTypeName, + PythonAttributeTypeName, + SchemaAttribute, + JAVA_ATTRIBUTE_TYPE_NAMES, + PYTHON_ATTRIBUTE_TYPE_NAMES, +} from "../../types/workflow-compiling.interface"; + +export interface UiUdfParameter { + attribute: SchemaAttribute; + value: string; +} + +type ParserAttributeTypeToken = JavaAttributeTypeName | PythonAttributeTypeName; + +const ATTRIBUTE_TYPE_TOKEN_TO_CANONICAL: Readonly> = { + STRING: "string", + INTEGER: "integer", + INT: "integer", + LONG: "long", + DOUBLE: "double", + BOOLEAN: "boolean", + BOOL: "boolean", + TIMESTAMP: "timestamp", + BINARY: "binary", + LARGE_BINARY: "large_binary", +}; + +const JAVA_ATTRIBUTE_TYPE_NAME_SET = new Set(JAVA_ATTRIBUTE_TYPE_NAMES); +const PYTHON_ATTRIBUTE_TYPE_NAME_SET = new Set(PYTHON_ATTRIBUTE_TYPE_NAMES); + +@Injectable({ providedIn: "root" }) +export class UiUdfParametersParserService { + private static readonly SUPPORTED_CLASSES = [ + "ProcessTupleOperator", + "ProcessBatchOperator", + "ProcessTableOperator", + "GenerateOperator", + ]; + + parse(code: string): UiUdfParameter[] { + if (!code) { + return []; + } + + const classPattern = UiUdfParametersParserService.SUPPORTED_CLASSES.join("|"); + const classRegex = new RegExp( + `class\\s+(${classPattern})\\s*\\([^)]*\\)\\s*:[\\s\\S]*?(?=\\nclass\\s+\\w+\\s*\\(|$)`, + "g" + ); + + const parsed: UiUdfParameter[] = []; + const existingNames = new Set(); + + let classMatch: RegExpExecArray | null; + while ((classMatch = classRegex.exec(code)) !== null) { + const classBlock = classMatch[0]; + + for (const args of this.extractUiParameterArgumentLists(classBlock)) { + const argumentTokens = this.tokenizeArguments(args); + const extracted = this.extractParameter(argumentTokens); + if (!extracted || existingNames.has(extracted.attribute.attributeName)) { + continue; + } + + existingNames.add(extracted.attribute.attributeName); + parsed.push(extracted); + } + } + + return parsed; + } + + /** + * Extract argument strings from self.UiParameter(...) + * More robust than regex when there are nested parentheses. + */ + private extractUiParameterArgumentLists(code: string): string[] { + const result: string[] = []; + const needle = "self.UiParameter("; + let index = 0; + + while (index < code.length) { + const start = code.indexOf(needle, index); + if (start === -1) { + break; + } + + const openParenIndex = start + needle.length - 1; + const closeParenIndex = this.findMatchingParen(code, openParenIndex); + if (closeParenIndex === -1) { + break; + } + + result.push(code.slice(openParenIndex + 1, closeParenIndex)); + index = closeParenIndex + 1; + } + + return result; + } + + /** + * Find matching ')' for a '(' while ignoring quoted strings. + */ + private findMatchingParen(text: string, openIndex: number): number { + let depth = 0; + let inSingle = false; + let inDouble = false; + let escaped = false; + + for (let i = openIndex; i < text.length; i++) { + const ch = text[i]; + + if (escaped) { + escaped = false; + continue; + } + + if ((inSingle || inDouble) && ch === "\\") { + escaped = true; + continue; + } + + if (!inDouble && ch === "'") { + inSingle = !inSingle; + continue; + } + + if (!inSingle && ch === "\"") { + inDouble = !inDouble; + continue; + } + + if (inSingle || inDouble) { + continue; + } + + if (ch === "(") { + depth++; + } else if (ch === ")") { + depth--; + if (depth === 0) { + return i; + } + } + } + + return -1; + } + + /** + * Split on top-level commas only (ignores commas inside strings / nested calls). + */ + private tokenizeArguments(argumentList: string): string[] { + const tokens: string[] = []; + let current = ""; + let depth = 0; + let inSingle = false; + let inDouble = false; + let escaped = false; + + for (let i = 0; i < argumentList.length; i++) { + const ch = argumentList[i]; + + if (escaped) { + current += ch; + escaped = false; + continue; + } + + if ((inSingle || inDouble) && ch === "\\") { + current += ch; + escaped = true; + continue; + } + + if (!inDouble && ch === "'") { + inSingle = !inSingle; + current += ch; + continue; + } + + if (!inSingle && ch === "\"") { + inDouble = !inDouble; + current += ch; + continue; + } + + if (!inSingle && !inDouble) { + if (ch === "(") { + depth++; + current += ch; + continue; + } + + if (ch === ")") { + depth = Math.max(0, depth - 1); + current += ch; + continue; + } + + if (ch === "," && depth === 0) { + const token = current.trim(); + if (token.length > 0) { + tokens.push(token); + } + current = ""; + continue; + } + } + + current += ch; + } + + const tail = current.trim(); + if (tail.length > 0) { + tokens.push(tail); + } + + return tokens; + } + + private extractParameter(tokens: string[]): UiUdfParameter | undefined { + let namedName: string | undefined; + let namedType: AttributeType | undefined; + let positionalName: string | undefined; + let positionalType: AttributeType | undefined; + + for (const token of tokens) { + const namedNameMatch = token.match(/^name\s*=\s*["']([^"']+)["']$/); + if (namedNameMatch) { + namedName = namedNameMatch[1].trim(); + continue; + } + + const namedTypeMatch = token.match(/^type\s*=\s*AttributeType\.([A-Za-z_][A-Za-z0-9_]*)$/); + if (namedTypeMatch) { + namedType = this.normalizeAttributeType(namedTypeMatch[1]); + continue; + } + + const positionalTypeMatch = token.match(/^AttributeType\.([A-Za-z_][A-Za-z0-9_]*)$/); + if (positionalTypeMatch && !positionalType) { + positionalType = this.normalizeAttributeType(positionalTypeMatch[1]); + continue; + } + + const positionalNameMatch = token.match(/^["']([^"']+)["']$/); + if (positionalNameMatch && !positionalName) { + positionalName = positionalNameMatch[1].trim(); + } + } + + const attributeName = namedName ?? positionalName; + const attributeType = namedType ?? positionalType; + + if (!attributeName || !attributeType) { + return undefined; + } + + return { + attribute: { + attributeName, + attributeType, + }, + value: "", + }; + } + + /** + * Convert Java/Python enum tokens into canonical schema names. + * Examples: + * STRING -> string + * INTEGER -> integer + * INT -> integer + * BOOL -> boolean + */ + private normalizeAttributeType(token: string): AttributeType | undefined { + const normalized = token.trim().toUpperCase(); + + if (!JAVA_ATTRIBUTE_TYPE_NAME_SET.has(normalized) && !PYTHON_ATTRIBUTE_TYPE_NAME_SET.has(normalized)) { + return undefined; + } + + return ATTRIBUTE_TYPE_TOKEN_TO_CANONICAL[normalized as ParserAttributeTypeToken]; + } +} diff --git a/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-sync.service.ts b/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-sync.service.ts new file mode 100644 index 00000000000..edd9cc428a2 --- /dev/null +++ b/frontend/src/app/workspace/service/code-editor/ui-udf-parameters-sync.service.ts @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Injectable } from "@angular/core"; +import { isEqual } from "lodash-es"; +import { ReplaySubject } from "rxjs"; +import { Subject } from "rxjs"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { UiUdfParameter, UiUdfParametersParserService } from "./ui-udf-parameters-parser.service"; +import { isDefined } from "../../../common/util/predicate"; +import { + DUAL_INPUT_PORTS_PYTHON_UDF_V2_OP_TYPE, + PYTHON_UDF_SOURCE_V2_OP_TYPE, + PYTHON_UDF_V2_OP_TYPE, +} from "../workflow-graph/model/workflow-graph"; +import { YType } from "../../types/shared-editing.interface"; +import { YText } from "yjs/dist/src/types/YText"; + +@Injectable({ providedIn: "root" }) +export class UiUdfParametersSyncService { + private readonly uiParametersChangedSubject = new ReplaySubject<{ operatorId: string; parameters: UiUdfParameter[] }>( + 1 + ); + + readonly uiParametersChanged$ = this.uiParametersChangedSubject.asObservable(); + + constructor( + private workflowActionService: WorkflowActionService, + private uiUdfParametersParserService: UiUdfParametersParserService + ) {} + + /** + * Attach directly to YText and sync whenever it changes + */ + attachToYCode(operatorId: string, yCode: YText): () => void { + const handler = () => { + const latestCode = yCode.toString(); + this.syncStructureFromCode(operatorId, latestCode); + }; + + yCode.observe(handler); + + handler(); + + // return cleanup function + return () => yCode.unobserve(handler); + } + + syncStructureFromCode(operatorId: string, codeFromEditor?: string): void { + const operator = this.workflowActionService.getTexeraGraph().getOperator(operatorId); + + if (!operator || !this.isSupportedPythonUdfType(operator.operatorType)) { + return; + } + + const code = codeFromEditor ?? this.getSharedCode(operatorId); + if (!isDefined(code)) { + return; + } + + const existingParameters = operator.operatorProperties?.uiParameters ?? []; + const mergedUiParameters = this.buildParsedShapeWithPreservedValues(code, existingParameters); + + if (isEqual(existingParameters, mergedUiParameters)) { + return; + } + + // Emit event so UI updates + this.uiParametersChangedSubject.next({ + operatorId, + parameters: mergedUiParameters, + }); + + // optionally persist here if desired + // this.workflowActionService.setOperatorProperty(...) + } + + private buildParsedShapeWithPreservedValues(code: string, existingParameters: any[]): UiUdfParameter[] { + const parsedParameters = this.uiUdfParametersParserService.parse(code); + + const existingValues = new Map(); + existingParameters.forEach((parameter: any) => { + const parameterName = parameter?.attribute?.attributeName ?? parameter?.attribute?.name; + + if (isDefined(parameterName) && isDefined(parameter?.value)) { + existingValues.set(parameterName, parameter.value); + } + }); + + return parsedParameters.map(parameter => ({ + ...parameter, + value: existingValues.get(parameter.attribute.attributeName) ?? "", + })); + } + + private getSharedCode(operatorId: string): string | undefined { + try { + const sharedOperatorType = this.workflowActionService.getTexeraGraph().getSharedOperatorType(operatorId); + + const operatorProperties = sharedOperatorType.get("operatorProperties") as YType< + Readonly<{ [key: string]: any }> + >; + + const yCode = operatorProperties.get("code") as YText; + return yCode?.toString(); + } catch { + return undefined; + } + } + + private isSupportedPythonUdfType(operatorType: string): boolean { + return [PYTHON_UDF_V2_OP_TYPE, PYTHON_UDF_SOURCE_V2_OP_TYPE, DUAL_INPUT_PORTS_PYTHON_UDF_V2_OP_TYPE].includes( + operatorType + ); + } +} diff --git a/frontend/src/app/workspace/types/workflow-compiling.interface.ts b/frontend/src/app/workspace/types/workflow-compiling.interface.ts index 8c4499d3d4c..a81d6f844cd 100644 --- a/frontend/src/app/workspace/types/workflow-compiling.interface.ts +++ b/frontend/src/app/workspace/types/workflow-compiling.interface.ts @@ -70,8 +70,50 @@ export type CompilationStateInfo = Readonly< operatorErrors: Readonly>; } >; + // possible types of an attribute -export type AttributeType = "string" | "integer" | "double" | "boolean" | "long" | "timestamp" | "binary"; // schema: an array of attribute names and types +// Canonical frontend / JSON schema names +export type AttributeType = + | "string" + | "integer" + | "long" + | "double" + | "boolean" + | "timestamp" + | "binary" + | "large_binary"; + +// Java enum constant names (AttributeType.java) +export const JAVA_ATTRIBUTE_TYPE_NAMES = [ + "STRING", + "INTEGER", + "LONG", + "DOUBLE", + "BOOLEAN", + "TIMESTAMP", + "BINARY", + "LARGE_BINARY", +] as const; + +export type JavaAttributeTypeName = (typeof JAVA_ATTRIBUTE_TYPE_NAMES)[number]; + +// Python enum constant names (core.models.AttributeType) +export const PYTHON_ATTRIBUTE_TYPE_NAMES = [ + "STRING", + "INT", + "LONG", + "DOUBLE", + "BOOL", + "TIMESTAMP", + "BINARY", + "LARGE_BINARY", +] as const; + +export type PythonAttributeTypeName = (typeof PYTHON_ATTRIBUTE_TYPE_NAMES)[number]; + +// Useful when parsing code from either side +export type AttributeTypeToken = AttributeType | JavaAttributeTypeName | PythonAttributeTypeName; + export interface SchemaAttribute extends Readonly<{ attributeName: string;