Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
592ff64
init
aglinxinyuan Feb 9, 2026
b95465d
update
aglinxinyuan Feb 11, 2026
f83041a
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 11, 2026
d9d0cd9
update
aglinxinyuan Feb 11, 2026
144ae29
update
aglinxinyuan Feb 11, 2026
7b13fef
update
aglinxinyuan Feb 11, 2026
bd5ac3a
update
aglinxinyuan Feb 11, 2026
19be0c1
update
aglinxinyuan Feb 11, 2026
706884f
update
aglinxinyuan Feb 11, 2026
21e6a41
update
aglinxinyuan Feb 11, 2026
24da3e3
update
aglinxinyuan Feb 11, 2026
2ba0fa4
update
aglinxinyuan Feb 11, 2026
a05ffd1
update
aglinxinyuan Feb 11, 2026
44fc0e7
update
aglinxinyuan Feb 11, 2026
846aac2
update
aglinxinyuan Feb 12, 2026
6be7dc5
update
aglinxinyuan Feb 12, 2026
d8338d1
update
aglinxinyuan Feb 12, 2026
36e517e
fix
aglinxinyuan Feb 13, 2026
a4bfbdb
fix
aglinxinyuan Feb 13, 2026
393faac
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
cbb2fc7
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
084f602
update
aglinxinyuan Feb 15, 2026
ae0d4ed
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 15, 2026
55d5cec
update
aglinxinyuan Feb 15, 2026
b8faf93
update
aglinxinyuan Feb 15, 2026
a53506a
update
aglinxinyuan Feb 15, 2026
d44a664
update
aglinxinyuan Feb 15, 2026
1cd48fd
update
aglinxinyuan Feb 15, 2026
e35a332
update
aglinxinyuan Feb 16, 2026
4d18d1d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 17, 2026
30a8562
update
aglinxinyuan Feb 24, 2026
b717fb0
update
aglinxinyuan Feb 24, 2026
da8d6ed
Merge remote-tracking branch 'origin/xinyuan-loop-feb' into xinyuan-l…
aglinxinyuan Feb 24, 2026
04fe614
update
aglinxinyuan Feb 27, 2026
160bc6d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 27, 2026
bd27031
update
aglinxinyuan Feb 28, 2026
99e0f86
update
aglinxinyuan Feb 28, 2026
53ae08b
update
aglinxinyuan Feb 28, 2026
8c7d53c
update
aglinxinyuan Feb 28, 2026
92ab10f
update
aglinxinyuan Mar 1, 2026
bc58566
update
aglinxinyuan Mar 1, 2026
c655856
update
aglinxinyuan Mar 1, 2026
0970a53
update
aglinxinyuan Mar 1, 2026
2b78e2c
update
aglinxinyuan Mar 1, 2026
55f288f
update
aglinxinyuan Mar 1, 2026
2ccde1e
update
aglinxinyuan Mar 1, 2026
9b0d14d
update
aglinxinyuan Mar 1, 2026
3a2d0b9
update
aglinxinyuan Mar 1, 2026
0cfcf2f
update
aglinxinyuan Mar 1, 2026
00e49a5
update
aglinxinyuan Mar 1, 2026
f71dbec
update
aglinxinyuan Mar 1, 2026
565ee71
update
aglinxinyuan Mar 1, 2026
aa444a0
update
aglinxinyuan Mar 2, 2026
2e7c72a
update
aglinxinyuan Mar 2, 2026
f8ce99f
update
aglinxinyuan Mar 2, 2026
fe7e071
update
aglinxinyuan Mar 2, 2026
ba1b50f
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Mar 4, 2026
08679f0
update
aglinxinyuan Mar 4, 2026
b18d9db
update
aglinxinyuan Mar 4, 2026
dac211a
update
aglinxinyuan Mar 5, 2026
43f2ca6
update
aglinxinyuan Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message ControlRequest {
EmptyRequest emptyRequest = 56;
PrepareCheckpointRequest prepareCheckpointRequest = 57;
QueryStatisticsRequest queryStatisticsRequest = 58;
EndIterationRequest endIterationRequest = 59;

// request for testing
Ping ping = 100;
Expand Down Expand Up @@ -278,4 +279,9 @@ enum StatisticsUpdateTarget {
message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
StatisticsUpdateTarget updateTarget = 2;
}

message EndIterationRequest{
core.ActorVirtualIdentity LoopStartId = 1 [(scalapb.field).no_box = true];
int32 iteration = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ service WorkerService {
rpc EndWorker(EmptyRequest) returns (EmptyReturn);
rpc StartChannel(EmptyRequest) returns (EmptyReturn);
rpc EndChannel(EmptyRequest) returns (EmptyReturn);
rpc EndIteration(EndIterationRequest) returns (EmptyReturn);
rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue);
rpc NoOperation(EmptyRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def __init__(self, worker_id: str):
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

self._storage_uris: typing.Dict[PortIdentity, str] = dict()

def is_missing_output_ports(self):
"""
This method is only used for ensuring correct region execution.
Expand Down Expand Up @@ -126,6 +128,7 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str):
Create a separate thread for saving output tuples of a port
to storage in batch.
"""
self._storage_uris[port_id] = storage_uri
document, _ = DocumentFactory.open_document(storage_uri)
buffered_item_writer = document.writer(str(get_worker_index(self.worker_id)))
writer_queue = Queue()
Expand Down Expand Up @@ -171,6 +174,21 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None:
PortStorageWriterElement(data_tuple=tuple_)
)

def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None:
if port_id is None:
uris = self._storage_uris.values()
elif port_id in self._storage_uris:
uris = [self._storage_uris[port_id]]
else:
return

for uri in uris:
writer = DocumentFactory.create_document(
uri.replace("/result", "/state"), state.schema
).writer(str(get_worker_index(self.worker_id)))
writer.put_one(Tuple(vars(state)))
writer.close()

def close_port_storage_writers(self) -> None:
"""
Flush the buffers of port storage writers and wait for all the
Expand Down
24 changes: 24 additions & 0 deletions amber/src/main/python/core/models/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,27 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]
time, or None.
"""
yield


class LoopStartOperator(TableOperator):
def open(self) -> None:
pass

@abstractmethod
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
yield

def close(self) -> None:
pass


class LoopEndOperator(TableOperator):
def open(self) -> None:
pass

@abstractmethod
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
yield

def close(self) -> None:
pass
7 changes: 7 additions & 0 deletions amber/src/main/python/core/models/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def __init__(
self.__dict__.update(table.to_pandas().iloc[0].to_dict())
self.schema = Schema(table.schema)

@classmethod
def from_tuple(cls, tuple, schema):
obj = cls()
obj.__dict__.update(tuple.as_dict())
obj.schema = schema
return obj

def add(
self, key: str, value: any, value_type: Optional[AttributeType] = None
) -> None:
Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None:
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
self._switch_context()
self._set_output_state(executor.process_state(state, port_id))

except Exception as err:
Expand Down
6 changes: 4 additions & 2 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ def complete(self) -> None:
"""
# flush the buffered console prints
self._check_and_report_console_messages(force_flush=True)
controller_interface = self._async_rpc_client.controller_stub()
#controller_interface.iteration_completed(EmptyRequest())
self.context.executor_manager.executor.close()
# stop the data processing thread
self.data_processor.stop()
self.context.state_manager.transit_to(WorkerState.COMPLETED)
self.context.statistics_manager.update_total_execution_time(time.time_ns())
controller_interface = self._async_rpc_client.controller_stub()
controller_interface.worker_execution_completed(EmptyRequest())
self.context.close()

Expand Down Expand Up @@ -197,6 +198,7 @@ def process_input_state(self) -> None:
payload=batch,
)
)
self.context.output_manager.save_state_to_storage_if_needed(output_state)

def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
"""
Expand Down Expand Up @@ -329,7 +331,7 @@ def _process_ecm(self, ecm_element: ECMElement):

if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT:
self.context.pause_manager.resume(PauseType.ECM_PAUSE)

self._switch_context()
if self.context.tuple_processing_manager.current_internal_marker:
{
StartChannel: self._process_start_channel,
Expand Down
107 changes: 59 additions & 48 deletions amber/src/main/python/core/storage/document_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,35 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument:
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
iceberg_schema = amber_schema_to_iceberg_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
else:
raise ValueError(f"Resource type {resource_type} is not supported")
match resource_type:
case VFSResourceType.RESULT:
namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
case VFSResourceType.STATE:
namespace = "state"
case _:
raise ValueError(f"Resource type {resource_type} is not supported")

storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
iceberg_schema = amber_schema_to_iceberg_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
namespace,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
namespace,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)

else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document"
Expand All @@ -96,30 +101,36 @@ def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]:
if parsed_uri.scheme == "vfs":
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema
else:
raise ValueError(f"Resource type {resource_type} is not supported")
match resource_type:
case VFSResourceType.RESULT:
namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
case VFSResourceType.STATE:
namespace = "state"
case _:
raise ValueError(f"Resource type {resource_type} is not supported")

storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
namespace,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
namespace,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema

else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document"
Expand Down
3 changes: 1 addition & 2 deletions amber/src/main/python/core/storage/iceberg/iceberg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def create_postgres_catalog(
catalog_name,
**{
"uri": f"postgresql+pg8000://{username}:{password}@{uri_without_scheme}",
"warehouse": f"file://{warehouse_path}",
"warehouse": warehouse_path,
},
)

Expand Down Expand Up @@ -180,7 +180,6 @@ def create_table(

if catalog.table_exists(identifier) and override_if_exists:
catalog.drop_table(identifier)

table = catalog.create_table(
identifier=identifier,
schema=table_schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import typing
from loguru import logger
from pyarrow import Table
from typing import Union
from pyarrow import Table

from core.architecture.sendsemantics.broad_cast_partitioner import (
BroadcastPartitioner,
Expand All @@ -34,7 +34,7 @@
from core.architecture.sendsemantics.round_robin_partitioner import (
RoundRobinPartitioner,
)
from core.models import Tuple, InternalQueue, DataFrame, DataPayload
from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
from core.util import Stoppable, get_one_of
Expand Down Expand Up @@ -125,6 +125,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) -> typing.Iterator[DataFrame
if receiver == self.worker_actor_id:
yield self.tuples_to_data_frame(tuples)

def emit_state_with_filter(self, state: State) -> typing.Iterator[StateFrame]:
for receiver, payload in self.partitioner.flush_state(state):
if receiver == self.worker_actor_id:
yield (
StateFrame(payload)
if isinstance(payload, State)
else self.tuples_to_data_frame(payload)
)

def run(self) -> None:
"""
Main execution logic that reads tuples from the materialized storage and
Expand All @@ -149,6 +158,19 @@ def run(self) -> None:
tup.cast_to_schema(self.tuple_schema)
for data_frame in self.tuple_to_batch_with_filter(tup):
self.emit_payload(data_frame)
try:
state_document, state_schema = DocumentFactory.open_document(
self.uri.replace("/result", "/state")
)
state_iterator = state_document.get()
for state in state_iterator:
for state_frame in self.emit_state_with_filter(
State.from_tuple(state, state_schema)
):
self.emit_payload(state_frame)
except ValueError:
pass

self.emit_ecm("EndChannel", EmbeddedControlMessageType.PORT_ALIGNMENT)
self._finished = True
except Exception as err:
Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/storage/vfs_uri_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
RESULT = "result"
RUNTIME_STATISTICS = "runtimeStatistics"
CONSOLE_MESSAGES = "consoleMessages"
STATE = "state"


class VFSURIFactory:
Expand Down
3 changes: 3 additions & 0 deletions amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from overrides import overrides
from typing import Iterator, Optional, Union

from core.models.operator import LoopStartOperator, LoopEndOperator
from pyamber import *
from .storage.dataset_file_document import DatasetFileDocument
from .storage.large_binary_input_stream import LargeBinaryInputStream
Expand All @@ -43,6 +44,8 @@
"UDFTableOperator",
"UDFBatchOperator",
"UDFSourceOperator",
"LoopStartOperator",
"LoopEndOperator",
"DatasetFileDocument",
"largebinary",
"LargeBinaryInputStream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class AmberProcessor(
with Serializable {

/** FIFO & exactly once */
val inputGateway: InputGateway = new NetworkInputGateway(this.actorId)
val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId)

// 1. Unified Output
val outputGateway: NetworkOutputGateway =
Expand All @@ -55,7 +55,7 @@ abstract class AmberProcessor(
}
)
// 2. RPC Layer
val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId)
val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId)
val asyncRPCServer: AsyncRPCServer =
new AsyncRPCServer(outputGateway, actorId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class WorkflowScheduler(
this.physicalPlan = updatedPhysicalPlan
}

def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next()
def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext()

def hasPendingRegions: Boolean = schedule != null && schedule.hasNext

Expand Down
Loading
Loading