Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 262 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
DataFileContent,
ManifestContent,
ManifestEntry,
ManifestEntryStatus,
ManifestFile,
)
from pyiceberg.partitioning import (
Expand All @@ -89,8 +90,11 @@
)
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import (
Operation,
Snapshot,
SnapshotLogEntry,
ancestors_between_ids,
is_ancestor_of,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
Expand Down Expand Up @@ -1139,6 +1143,60 @@ def scan(
limit=limit,
)

def incremental_append_scan(
self,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
from_snapshot_id_exclusive: Optional[int] = None,
to_snapshot_id_inclusive: Optional[int] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
) -> IncrementalAppendScan:
"""Fetch an IncrementalAppendScan based on the table's current metadata.

The incremental append scan can be used to project the table's data
from append snapshots within a snapshot range and that matches the
provided row_filter onto the table's current schema

Args:
row_filter:
A string or BooleanExpression that describes the
desired rows
selected_fields:
A tuple of strings representing the column names
to return in the output dataframe.
case_sensitive:
If True column matching is case sensitive
from_snapshot_id_exclusive:
Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set
on the IncrementalAppendScan object returned, but ultimately must not be None.
to_snapshot_id_inclusive:
Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. This can be set on the
IncrementalAppendScan object returned. Ultimately, it will default to the table's current snapshot.
options:
Additional Table properties as a dictionary of
string key value pairs to use for this scan.
limit:
An integer representing the number of rows to
return in the scan result. If None, fetches all
matching rows.

Returns:
An IncrementalAppendScan based on the table's current metadata and provided parameters.
"""
return IncrementalAppendScan(
table_metadata=self.metadata,
io=self.io,
row_filter=row_filter,
selected_fields=selected_fields,
case_sensitive=case_sensitive,
from_snapshot_id_exclusive=from_snapshot_id_exclusive,
to_snapshot_id_inclusive=to_snapshot_id_inclusive,
options=options,
limit=limit,
)

@property
def format_version(self) -> TableVersion:
return self.metadata.format_version
Expand Down Expand Up @@ -1976,6 +2034,202 @@ def count(self) -> int:
return res


class IncrementalAppendScan(AbstractTableScan):
"""An incremental scan of a table's data that accumulates appended data between two snapshots.

Args:
row_filter:
A string or BooleanExpression that describes the
desired rows
selected_fields:
A tuple of strings representing the column names
to return in the output dataframe.
case_sensitive:
If True column matching is case sensitive
options:
Additional Table properties as a dictionary of
string key value pairs to use for this scan.
limit:
An integer representing the number of rows to
return in the scan result. If None, fetches all
matching rows.
from_snapshot_id_exclusive:
Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is
ultimately planned, this must not be None.
to_snapshot_id_inclusive:
Optional ID of the "to" snapshot, to end the incremental scan at, inclusively.
Omitting it will default to the table's current snapshot.
"""

from_snapshot_id_exclusive: Optional[int]
to_snapshot_id_inclusive: Optional[int]

def __init__(
self,
table_metadata: TableMetadata,
io: FileIO,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
from_snapshot_id_exclusive: Optional[int] = None,
to_snapshot_id_inclusive: Optional[int] = None,
):
super().__init__(
table_metadata,
io,
row_filter,
selected_fields,
case_sensitive,
options,
limit,
)
self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
self.to_snapshot_id_inclusive = to_snapshot_id_inclusive

def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive: Optional[int]) -> A:
"""Instructs this scan to look for changes starting from a particular snapshot (exclusive).

Args:
from_snapshot_id_exclusive: the start snapshot ID (exclusive)

Returns:
this for method chaining
"""
return self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)

def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive: Optional[int]) -> A:
"""Instructs this scan to look for changes up to a particular snapshot (inclusive).

Args:
to_snapshot_id_inclusive: the end snapshot ID (inclusive)

Returns:
this for method chaining
"""
return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)

def projection(self) -> Schema:
current_schema = self.table_metadata.schema()

if "*" in self.selected_fields:
return current_schema

return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)

def plan_files(self) -> Iterable[FileScanTask]:
from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots()

append_snapshots: List[Snapshot] = self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata)
if len(append_snapshots) == 0:
return iter([])

append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots}

manifests = {
manifest_file
for snapshot in append_snapshots
for manifest_file in snapshot.manifests(self.io)
if manifest_file.content == ManifestContent.DATA and manifest_file.added_snapshot_id in append_snapshot_ids
}

return ManifestGroupPlanner(
table_metadata=self.table_metadata,
io=self.io,
row_filter=self.row_filter,
case_sensitive=self.case_sensitive,
options=self.options,
).plan_files(
manifests=list(manifests),
manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids
and manifest_entry.status == ManifestEntryStatus.ADDED,
)

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this IncrementalAppendScan.

All rows will be loaded into memory at once.

Returns:
pa.Table: Materialized Arrow Table from the Iceberg table's IncrementalAppendScan
"""
from pyiceberg.io.pyarrow import ArrowScan

return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())

def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this IncrementalAppendScan.

For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same IncrementalAppendScan, because a RecordBatch
is read one at a time.

Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's IncrementalAppendScan
which can be used to read a stream of record batches one by one.
"""
import pyarrow as pa

from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches(self.plan_files())

return pa.RecordBatchReader.from_batches(
target_schema,
batches,
).cast(target_schema)

def _validate_and_resolve_snapshots(self) -> tuple[int, int]:
current_snapshot = self.table_metadata.current_snapshot()
to_snapshot_id = self.to_snapshot_id_inclusive

if self.from_snapshot_id_exclusive is None:
raise ValueError("Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive")

if to_snapshot_id is None:
if current_snapshot is None:
raise ValueError("End snapshot of append scan is not set and table has no current snapshot")

to_snapshot_id = current_snapshot.snapshot_id
elif self._is_snapshot_missing(to_snapshot_id):
raise ValueError(f"End snapshot of append scan not found on table metadata: {to_snapshot_id}")

if self._is_snapshot_missing(self.from_snapshot_id_exclusive):
raise ValueError(f"Start snapshot of append scan not found on table metadata: {self.from_snapshot_id_exclusive}")

if not is_ancestor_of(to_snapshot_id, self.from_snapshot_id_exclusive, self.table_metadata):
raise ValueError(
f"Append scan's start snapshot {self.from_snapshot_id_exclusive} is not an ancestor of end snapshot {to_snapshot_id}"
)

return self.from_snapshot_id_exclusive, to_snapshot_id

def _is_snapshot_missing(self, snapshot_id: int) -> bool:
"""Return whether the snapshot ID is missing in the table metadata."""
return self.table_metadata.snapshot_by_id(snapshot_id) is None

@staticmethod
def _appends_between(
from_snapshot_id_exclusive: int, to_snapshot_id_inclusive: int, table_metadata: TableMetadata
) -> List[Snapshot]:
"""Return the list of snapshots that are appends between two snapshot IDs."""
return [
snapshot
for snapshot in ancestors_between_ids(
from_snapshot_id_exclusive=from_snapshot_id_exclusive,
to_snapshot_id_inclusive=to_snapshot_id_inclusive,
table_metadata=table_metadata,
)
if snapshot.summary is not None and snapshot.summary.operation == Operation.APPEND
]


class ManifestGroupPlanner:
io: FileIO
table_metadata: TableMetadata
Expand All @@ -1997,7 +2251,11 @@ def __init__(
self.case_sensitive = case_sensitive
self.options = options

def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]:
def plan_files(
self,
manifests: List[ManifestFile],
manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _: True,
) -> Iterable[FileScanTask]:
# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

Expand Down Expand Up @@ -2034,6 +2292,9 @@ def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]:
],
)
):
if not manifest_entry_filter(manifest_entry):
continue

data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
Expand Down
29 changes: 29 additions & 0 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,32 @@ def ancestors_between(
break
else:
yield from ancestors_of(to_snapshot, table_metadata)


def ancestors_between_ids(
from_snapshot_id_exclusive: Optional[int],
to_snapshot_id_inclusive: int,
table_metadata: TableMetadata,
) -> Iterable[Snapshot]:
"""Return the ancestors of and including the given "to" snapshot, up to but not including the "from" snapshot.

If from_snapshot_id_exclusive is None or no ancestors of the "to" snapshot match it, all ancestors of the "to"
snapshot are returned.
"""
if from_snapshot_id_exclusive is not None:
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata):
if snapshot.snapshot_id == from_snapshot_id_exclusive:
break

yield snapshot
else:
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata)


def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool:
"""Return whether ancestor_snapshot_id is an ancestor of snapshot_id."""
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata):
if snapshot.snapshot_id == ancestor_snapshot_id:
return True

return False
42 changes: 42 additions & 0 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
SnapshotSummaryCollector,
Summary,
ancestors_between,
ancestors_between_ids,
ancestors_of,
is_ancestor_of,
update_snapshot_summaries,
)
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -456,3 +458,43 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
)
== 2000
)


def test_is_ancestor_of(table_v2: Table) -> None:
snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004

assert is_ancestor_of(snapshot_id, ancestor_snapshot_id, table_v2.metadata)
assert not is_ancestor_of(ancestor_snapshot_id, snapshot_id, table_v2.metadata)


def test_ancestors_between_ids(table_v2: Table) -> None:
snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004

result = list(ancestors_between_ids(ancestor_snapshot_id, snapshot_id, table_v2.metadata))
ids = [ancestor.snapshot_id for ancestor in result]

# Exclusive-inclusive semantics means just 'snapshot_id' should be returned
assert ids == [snapshot_id]


def test_ancestors_between_equal_ids(table_v2: Table) -> None:
snapshot_id = 3055729675574597004

result = list(ancestors_between_ids(snapshot_id, snapshot_id, table_v2.metadata))

# Exclusive-inclusive semantics mean no ancestors should be returned
assert result == []


def test_ancestors_between_ids_missing_from_snapshot(table_v2: Table) -> None:
snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004

result = list(
ancestors_between_ids(
from_snapshot_id_exclusive=None, to_snapshot_id_inclusive=snapshot_id, table_metadata=table_v2.metadata
)
)
ids = [ancestor.snapshot_id for ancestor in result]

# With a from snapshot missing, all ancestors should be returned
assert ids == [snapshot_id, ancestor_snapshot_id]