From 17a686532654746a0c5888d053fa32941f946836 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Tue, 22 Jul 2025 12:44:49 +0100 Subject: [PATCH] Introduce IncrementalAppendScan class --- pyiceberg/table/__init__.py | 263 +++++++++++++++++++++++++++++++++- pyiceberg/table/snapshots.py | 29 ++++ tests/table/test_snapshots.py | 42 ++++++ 3 files changed, 333 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 466fceedc4..e2919cb078 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -69,6 +69,7 @@ DataFileContent, ManifestContent, ManifestEntry, + ManifestEntryStatus, ManifestFile, ) from pyiceberg.partitioning import ( @@ -89,8 +90,11 @@ ) from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef from pyiceberg.table.snapshots import ( + Operation, Snapshot, SnapshotLogEntry, + ancestors_between_ids, + is_ancestor_of, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( @@ -1139,6 +1143,60 @@ def scan( limit=limit, ) + def incremental_append_scan( + self, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ) -> IncrementalAppendScan: + """Fetch an IncrementalAppendScan based on the table's current metadata. + + The incremental append scan can be used to project the table's data + from append snapshots within a snapshot range and that matches the + provided row_filter onto the table's current schema + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set + on the IncrementalAppendScan object returned, but ultimately must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. This can be set on the + IncrementalAppendScan object returned. Ultimately, it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + + Returns: + An IncrementalAppendScan based on the table's current metadata and provided parameters. + """ + return IncrementalAppendScan( + table_metadata=self.metadata, + io=self.io, + row_filter=row_filter, + selected_fields=selected_fields, + case_sensitive=case_sensitive, + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, + options=options, + limit=limit, + ) + @property def format_version(self) -> TableVersion: return self.metadata.format_version @@ -1976,6 +2034,202 @@ def count(self) -> int: return res +class IncrementalAppendScan(AbstractTableScan): + """An incremental scan of a table's data that accumulates appended data between two snapshots. + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is + ultimately planned, this must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. + Omitting it will default to the table's current snapshot. + """ + + from_snapshot_id_exclusive: Optional[int] + to_snapshot_id_inclusive: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + options, + limit, + ) + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + self.to_snapshot_id_inclusive = to_snapshot_id_inclusive + + def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive: Optional[int]) -> A: + """Instructs this scan to look for changes starting from a particular snapshot (exclusive). + + Args: + from_snapshot_id_exclusive: the start snapshot ID (exclusive) + + Returns: + this for method chaining + """ + return self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive) + + def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive: Optional[int]) -> A: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). + + Args: + to_snapshot_id_inclusive: the end snapshot ID (inclusive) + + Returns: + this for method chaining + """ + return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive) + + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + + def plan_files(self) -> Iterable[FileScanTask]: + from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() + + append_snapshots: List[Snapshot] = self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata) + if len(append_snapshots) == 0: + return iter([]) + + append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} + + manifests = { + manifest_file + for snapshot in append_snapshots + for manifest_file in snapshot.manifests(self.io) + if manifest_file.content == ManifestContent.DATA and manifest_file.added_snapshot_id in append_snapshot_ids + } + + return ManifestGroupPlanner( + table_metadata=self.table_metadata, + io=self.io, + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + options=self.options, + ).plan_files( + manifests=list(manifests), + manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids + and manifest_entry.status == ManifestEntryStatus.ADDED, + ) + + def to_arrow(self) -> pa.Table: + """Read an Arrow table eagerly from this IncrementalAppendScan. + + All rows will be loaded into memory at once. + + Returns: + pa.Table: Materialized Arrow Table from the Iceberg table's IncrementalAppendScan + """ + from pyiceberg.io.pyarrow import ArrowScan + + return ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_table(self.plan_files()) + + def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + """Return an Arrow RecordBatchReader from this IncrementalAppendScan. + + For large results, using a RecordBatchReader requires less memory than + loading an Arrow Table for the same IncrementalAppendScan, because a RecordBatch + is read one at a time. + + Returns: + pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's IncrementalAppendScan + which can be used to read a stream of record batches one by one. + """ + import pyarrow as pa + + from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow + + target_schema = schema_to_pyarrow(self.projection()) + batches = ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_record_batches(self.plan_files()) + + return pa.RecordBatchReader.from_batches( + target_schema, + batches, + ).cast(target_schema) + + def _validate_and_resolve_snapshots(self) -> tuple[int, int]: + current_snapshot = self.table_metadata.current_snapshot() + to_snapshot_id = self.to_snapshot_id_inclusive + + if self.from_snapshot_id_exclusive is None: + raise ValueError("Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive") + + if to_snapshot_id is None: + if current_snapshot is None: + raise ValueError("End snapshot of append scan is not set and table has no current snapshot") + + to_snapshot_id = current_snapshot.snapshot_id + elif self._is_snapshot_missing(to_snapshot_id): + raise ValueError(f"End snapshot of append scan not found on table metadata: {to_snapshot_id}") + + if self._is_snapshot_missing(self.from_snapshot_id_exclusive): + raise ValueError(f"Start snapshot of append scan not found on table metadata: {self.from_snapshot_id_exclusive}") + + if not is_ancestor_of(to_snapshot_id, self.from_snapshot_id_exclusive, self.table_metadata): + raise ValueError( + f"Append scan's start snapshot {self.from_snapshot_id_exclusive} is not an ancestor of end snapshot {to_snapshot_id}" + ) + + return self.from_snapshot_id_exclusive, to_snapshot_id + + def _is_snapshot_missing(self, snapshot_id: int) -> bool: + """Return whether the snapshot ID is missing in the table metadata.""" + return self.table_metadata.snapshot_by_id(snapshot_id) is None + + @staticmethod + def _appends_between( + from_snapshot_id_exclusive: int, to_snapshot_id_inclusive: int, table_metadata: TableMetadata + ) -> List[Snapshot]: + """Return the list of snapshots that are appends between two snapshot IDs.""" + return [ + snapshot + for snapshot in ancestors_between_ids( + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, + table_metadata=table_metadata, + ) + if snapshot.summary is not None and snapshot.summary.operation == Operation.APPEND + ] + + class ManifestGroupPlanner: io: FileIO table_metadata: TableMetadata @@ -1997,7 +2251,11 @@ def __init__( self.case_sensitive = case_sensitive self.options = options - def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]: + def plan_files( + self, + manifests: List[ManifestFile], + manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _: True, + ) -> Iterable[FileScanTask]: # step 1: filter manifests using partition summaries # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id @@ -2034,6 +2292,9 @@ def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]: ], ) ): + if not manifest_entry_filter(manifest_entry): + continue + data_file = manifest_entry.data_file if data_file.content == DataFileContent.DATA: data_entries.append(manifest_entry) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 60ad7219e1..dac504f2a5 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -451,3 +451,32 @@ def ancestors_between( break else: yield from ancestors_of(to_snapshot, table_metadata) + + +def ancestors_between_ids( + from_snapshot_id_exclusive: Optional[int], + to_snapshot_id_inclusive: int, + table_metadata: TableMetadata, +) -> Iterable[Snapshot]: + """Return the ancestors of and including the given "to" snapshot, up to but not including the "from" snapshot. + + If from_snapshot_id_exclusive is None or no ancestors of the "to" snapshot match it, all ancestors of the "to" + snapshot are returned. + """ + if from_snapshot_id_exclusive is not None: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata): + if snapshot.snapshot_id == from_snapshot_id_exclusive: + break + + yield snapshot + else: + yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata) + + +def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool: + """Return whether ancestor_snapshot_id is an ancestor of snapshot_id.""" + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): + if snapshot.snapshot_id == ancestor_snapshot_id: + return True + + return False diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index d26562ad8f..6244fb1981 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -29,7 +29,9 @@ SnapshotSummaryCollector, Summary, ancestors_between, + ancestors_between_ids, ancestors_of, + is_ancestor_of, update_snapshot_summaries, ) from pyiceberg.transforms import IdentityTransform @@ -456,3 +458,43 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: ) == 2000 ) + + +def test_is_ancestor_of(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + assert is_ancestor_of(snapshot_id, ancestor_snapshot_id, table_v2.metadata) + assert not is_ancestor_of(ancestor_snapshot_id, snapshot_id, table_v2.metadata) + + +def test_ancestors_between_ids(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + result = list(ancestors_between_ids(ancestor_snapshot_id, snapshot_id, table_v2.metadata)) + ids = [ancestor.snapshot_id for ancestor in result] + + # Exclusive-inclusive semantics means just 'snapshot_id' should be returned + assert ids == [snapshot_id] + + +def test_ancestors_between_equal_ids(table_v2: Table) -> None: + snapshot_id = 3055729675574597004 + + result = list(ancestors_between_ids(snapshot_id, snapshot_id, table_v2.metadata)) + + # Exclusive-inclusive semantics mean no ancestors should be returned + assert result == [] + + +def test_ancestors_between_ids_missing_from_snapshot(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + result = list( + ancestors_between_ids( + from_snapshot_id_exclusive=None, to_snapshot_id_inclusive=snapshot_id, table_metadata=table_v2.metadata + ) + ) + ids = [ancestor.snapshot_id for ancestor in result] + + # With a from snapshot missing, all ancestors should be returned + assert ids == [snapshot_id, ancestor_snapshot_id]