diff --git a/docs/development/extensions-contrib/iceberg.md b/docs/development/extensions-contrib/iceberg.md index bbb98101e5ff..a5f3bae0fa6f 100644 --- a/docs/development/extensions-contrib/iceberg.md +++ b/docs/development/extensions-contrib/iceberg.md @@ -194,6 +194,62 @@ Example: When `residualFilterMode` is set to `fail` and a residual filter is detected, the job will fail with an error message indicating which filter expression produced the residual. This helps ensure data quality by preventing unintended rows from being ingested. +## Iceberg v2 delete file support + +Iceberg v2 tables support row-level deletes through two types of delete files: + +| File type | Content | Purpose | +|-----------|---------|---------| +| Positional delete file | `(file_path, row_position)` pairs | Deletes the row at a specific position in a data file | +| Equality delete file | Column value sets | Deletes any row where the specified column values match | + +The Iceberg extension automatically detects v2 delete files during table scan. No configuration changes are required to existing ingestion specs. + +### How it works + +When `IcebergInputSource` scans the Iceberg table, it inspects each `FileScanTask` for associated delete files: + +- **No delete files (v1 path)**: Data file paths are extracted and delegated to `warehouseSource` for reading. This is the existing behavior and remains unchanged. +- **Delete files detected (v2 path)**: Each task is wrapped in an `IcebergFileTaskInputSource` that carries the data file path, delete file metadata (paths, types, equality field IDs), and the serialized table schema. The `IcebergNativeRecordReader` then applies deletes at read time: + 1. Reads positional delete files and builds a set of deleted row positions for the current data file. + 2. Reads equality delete files and builds sets of deleted key tuples. + 3. Streams the data file and skips any row that is position-deleted or equality-deleted. + 4. Converts surviving Iceberg records to Druid `InputRow` objects. + +### Example + +Given an Iceberg v2 table with the following snapshots: + +``` +Snapshot 1 (append): data-001.parquet -> rows: order_id=1, order_id=2, order_id=3 +Snapshot 2 (delete): eq-delete-001.parquet -> "delete where order_id = 2" +``` + +Druid ingests only `order_id=1` and `order_id=3`. The deleted row (`order_id=2`) is excluded automatically. + +The ingestion spec is identical to a v1 table -- no additional fields are needed: + +```json +{ + "type": "iceberg", + "tableName": "orders", + "namespace": "analytics", + "icebergCatalog": { + "type": "rest", + "catalogUri": "http://localhost:8181" + }, + "warehouseSource": { + "type": "s3" + } +} +``` + +### Performance considerations + +- Positional delete files are read into an in-memory `Set` per data file. Memory usage is proportional to the number of deleted positions, not the data file size. +- Equality delete files are read into an in-memory `Set` of key tuples. For tables with very large equality delete files, this may increase memory usage on the ingestion worker. +- A v2-format table that has never had any rows deleted (no delete files) automatically goes through the v1 path with no overhead. + ## Known limitations This section lists the known limitations that apply to the Iceberg extension. diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogResource.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogResource.java index 496ced0e06c5..1b8e63f980eb 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogResource.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogResource.java @@ -149,6 +149,21 @@ public Table createTable(String namespace, String tableName, Schema schema) return getClientCatalog().createTable(tableId, schema); } + /** + * Creates a table with custom properties (e.g., format-version=2 for v2 tables). + */ + public Table createTable( + String namespace, + String tableName, + Schema schema, + org.apache.iceberg.PartitionSpec partitionSpec, + Map properties + ) + { + final TableIdentifier tableId = TableIdentifier.of(namespace, tableName); + return getClientCatalog().createTable(tableId, schema, partitionSpec, properties); + } + /** * Drops a table from the REST catalog. Best-effort; ignores errors. */ diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergV2DeleteIngestionTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergV2DeleteIngestionTest.java new file mode 100644 index 000000000000..33f572a1c197 --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergV2DeleteIngestionTest.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.parquet.ParquetExtensionsModule; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.http.SqlTaskStatus; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.UUID; + +/** + * End-to-end integration test for Iceberg v2 delete file support. + * Creates v2-format tables with positional and equality deletes, + * ingests via MSQ SQL, and verifies that deleted rows are excluded. + */ +public class IcebergV2DeleteIngestionTest extends EmbeddedClusterTestBase +{ + private static final String ICEBERG_NAMESPACE = "default"; + + private static final Schema TABLE_SCHEMA = new Schema( + Types.NestedField.required(1, "event_time", Types.StringType.get()), + Types.NestedField.required(2, "order_id", Types.IntegerType.get()), + Types.NestedField.required(3, "product", Types.StringType.get()), + Types.NestedField.required(4, "amount", Types.LongType.get()) + ); + + private final IcebergRestCatalogResource icebergCatalog = new IcebergRestCatalogResource(); + + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer() + .setServerMemory(300_000_000L) + .addProperty("druid.worker.capacity", "2"); + private final EmbeddedBroker broker = new EmbeddedBroker(); + + private EmbeddedMSQApis msqApis; + + @Override + protected EmbeddedDruidCluster createCluster() + { + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .addResource(icebergCatalog) + .addExtension(ParquetExtensionsModule.class) + .addServer(overlord) + .addServer(coordinator) + .addServer(indexer) + .addServer(broker) + .addServer(new EmbeddedHistorical()); + } + + @BeforeAll + public void setup() + { + msqApis = new EmbeddedMSQApis(cluster, overlord); + icebergCatalog.createNamespace(ICEBERG_NAMESPACE); + } + + @AfterAll + public void tearDown() + { + dropTableSafely("eq_delete_test"); + dropTableSafely("pos_delete_test"); + dropTableSafely("mixed_delete_test"); + dropTableSafely("no_delete_test"); + icebergCatalog.dropNamespace(ICEBERG_NAMESPACE); + } + + @Test + public void testV2EqualityDeleteIngestion() throws IOException + { + final String tableName = "eq_delete_test"; + final Table table = createV2Table(tableName); + + // Append 3 rows: order_id=1, 2, 3 + final DataFile dataFile = writeDataFile(table, ImmutableList.of( + row("2024-01-01T00:00:00.000Z", 1, "Widget", 100L), + row("2024-01-01T01:00:00.000Z", 2, "Gadget", 200L), + row("2024-01-01T02:00:00.000Z", 3, "Doohickey", 300L) + )); + table.newAppend().appendFile(dataFile).commit(); + + // Write equality delete: delete where order_id=2 + final Schema deleteSchema = new Schema( + Types.NestedField.required(2, "order_id", Types.IntegerType.get()) + ); + final DeleteFile eqDelete = writeEqualityDeleteFile(table, deleteSchema, 2, ImmutableList.of( + eqDeleteRow(deleteSchema, 2) + )); + table.newRowDelta().addDeletes(eqDelete).commit(); + + // Ingest and verify: only order_id=1 and 3 should be present + ingestAndVerify( + tableName, + "SELECT __time, \"order_id\", \"product\", \"amount\" FROM %s ORDER BY \"order_id\"", + "2024-01-01T00:00:00.000Z,1,Widget,100\n" + + "2024-01-01T02:00:00.000Z,3,Doohickey,300" + ); + } + + @Test + public void testV2PositionalDeleteIngestion() throws IOException + { + final String tableName = "pos_delete_test"; + final Table table = createV2Table(tableName); + + final DataFile dataFile = writeDataFile(table, ImmutableList.of( + row("2024-01-01T00:00:00.000Z", 1, "Widget", 100L), + row("2024-01-01T01:00:00.000Z", 2, "Gadget", 200L), + row("2024-01-01T02:00:00.000Z", 3, "Doohickey", 300L) + )); + table.newAppend().appendFile(dataFile).commit(); + + // Write positional delete: delete row at position 1 (order_id=2) + final DeleteFile posDelete = writePositionalDeleteFile( + table, + dataFile.location(), + 1L + ); + table.newRowDelta().addDeletes(posDelete).commit(); + + ingestAndVerify( + tableName, + "SELECT __time, \"order_id\", \"product\", \"amount\" FROM %s ORDER BY \"order_id\"", + "2024-01-01T00:00:00.000Z,1,Widget,100\n" + + "2024-01-01T02:00:00.000Z,3,Doohickey,300" + ); + } + + @Test + public void testV1TableIngestionUnchanged() throws IOException + { + final String tableName = "no_delete_test"; + final Table table = createV2Table(tableName); + + // Append 3 rows, no deletes + final DataFile dataFile = writeDataFile(table, ImmutableList.of( + row("2024-01-01T00:00:00.000Z", 1, "Widget", 100L), + row("2024-01-01T01:00:00.000Z", 2, "Gadget", 200L), + row("2024-01-01T02:00:00.000Z", 3, "Doohickey", 300L) + )); + table.newAppend().appendFile(dataFile).commit(); + + ingestAndVerify( + tableName, + "SELECT __time, \"order_id\", \"product\", \"amount\" FROM %s ORDER BY \"order_id\"", + "2024-01-01T00:00:00.000Z,1,Widget,100\n" + + "2024-01-01T01:00:00.000Z,2,Gadget,200\n" + + "2024-01-01T02:00:00.000Z,3,Doohickey,300" + ); + } + + @Test + public void testV2MixedDeleteIngestion() throws IOException + { + final String tableName = "mixed_delete_test"; + final Table table = createV2Table(tableName); + + final DataFile dataFile = writeDataFile(table, ImmutableList.of( + row("2024-01-01T00:00:00.000Z", 1, "Widget", 100L), + row("2024-01-01T01:00:00.000Z", 2, "Gadget", 200L), + row("2024-01-01T02:00:00.000Z", 3, "Doohickey", 300L), + row("2024-01-01T03:00:00.000Z", 4, "Thingamajig", 400L), + row("2024-01-01T04:00:00.000Z", 5, "Whatchamacallit", 500L) + )); + table.newAppend().appendFile(dataFile).commit(); + + // Positional delete: remove position 0 (order_id=1) + final DeleteFile posDelete = writePositionalDeleteFile(table, dataFile.location(), 0L); + table.newRowDelta().addDeletes(posDelete).commit(); + + // Equality delete: remove order_id=4 + final Schema deleteSchema = new Schema( + Types.NestedField.required(2, "order_id", Types.IntegerType.get()) + ); + final DeleteFile eqDelete = writeEqualityDeleteFile(table, deleteSchema, 2, ImmutableList.of( + eqDeleteRow(deleteSchema, 4) + )); + table.newRowDelta().addDeletes(eqDelete).commit(); + + // Only order_id=2, 3, 5 should remain + ingestAndVerify( + tableName, + "SELECT __time, \"order_id\", \"product\", \"amount\" FROM %s ORDER BY \"order_id\"", + "2024-01-01T01:00:00.000Z,2,Gadget,200\n" + + "2024-01-01T02:00:00.000Z,3,Doohickey,300\n" + + "2024-01-01T04:00:00.000Z,5,Whatchamacallit,500" + ); + } + + // --- Helper methods --- + + private Table createV2Table(final String tableName) + { + return icebergCatalog.createTable( + ICEBERG_NAMESPACE, + tableName, + TABLE_SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of("format-version", "2") + ); + } + + private ImmutableMap row( + final String eventTime, + final int orderId, + final String product, + final long amount + ) + { + return ImmutableMap.of( + "event_time", eventTime, + "order_id", orderId, + "product", product, + "amount", amount + ); + } + + private DataFile writeDataFile( + final Table table, + final ImmutableList> rows + ) throws IOException + { + final String filepath = table.location() + "/data/" + UUID.randomUUID() + ".parquet"; + final OutputFile file = table.io().newOutputFile(filepath); + + try (DataWriter writer = Parquet.writeData(file) + .schema(TABLE_SCHEMA) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite() + .withSpec(table.spec()) + .build()) { + for (final ImmutableMap row : rows) { + final GenericRecord record = GenericRecord.create(TABLE_SCHEMA); + record.set(0, row.get("event_time")); + record.set(1, row.get("order_id")); + record.set(2, row.get("product")); + record.set(3, row.get("amount")); + writer.write(record); + } + return writer.toDataFile(); + } + } + + private GenericRecord eqDeleteRow(final Schema deleteSchema, final int orderId) + { + final GenericRecord record = GenericRecord.create(deleteSchema); + record.setField("order_id", orderId); + return record; + } + + private DeleteFile writeEqualityDeleteFile( + final Table table, + final Schema deleteSchema, + final int equalityFieldId, + final ImmutableList deleteRows + ) throws IOException + { + final String deletePath = table.location() + "/data/" + UUID.randomUUID() + "-eq-delete.parquet"; + final OutputFile outputFile = table.io().newOutputFile(deletePath); + + try (EqualityDeleteWriter writer = Parquet.writeDeletes(outputFile) + .forTable(table) + .rowSchema(deleteSchema) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite() + .equalityFieldIds(equalityFieldId) + .buildEqualityWriter()) { + for (final GenericRecord row : deleteRows) { + writer.write(row); + } + return writer.toDeleteFile(); + } + } + + private DeleteFile writePositionalDeleteFile( + final Table table, + final String dataFilePath, + final long position + ) throws IOException + { + final String deletePath = table.location() + "/data/" + UUID.randomUUID() + "-pos-delete.parquet"; + final OutputFile outputFile = table.io().newOutputFile(deletePath); + + try (PositionDeleteWriter writer = Parquet.writeDeletes(outputFile) + .forTable(table) + .createWriterFunc(GenericParquetWriter::buildWriter) + .rowSchema(TABLE_SCHEMA) + .overwrite() + .buildPositionWriter()) { + final PositionDelete posDelete = PositionDelete.create(); + posDelete.set(dataFilePath, position, null); + writer.write(posDelete); + return writer.toDeleteFile(); + } + } + + private void ingestAndVerify( + final String icebergTableName, + final String verifyQueryTemplate, + final String expectedCsv + ) + { + final String catalogUri = icebergCatalog.getCatalogUri(); + final String druidDataSource = dataSource + "_" + icebergTableName; + + final String sql = StringUtils.format( + "INSERT INTO \"%s\"\n" + + "SELECT\n" + + " TIME_PARSE(\"event_time\") AS __time,\n" + + " \"order_id\",\n" + + " \"product\",\n" + + " \"amount\"\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"iceberg\"," + + "\"tableName\":\"%s\"," + + "\"namespace\":\"%s\"," + + "\"icebergCatalog\":{\"type\":\"rest\",\"catalogUri\":\"%s\"," + + "\"catalogProperties\":{\"io-impl\":\"org.apache.iceberg.hadoop.HadoopFileIO\"}}," + + "\"warehouseSource\":{\"type\":\"local\"}}',\n" + + " '{\"type\":\"parquet\"}',\n" + + " '[{\"type\":\"string\",\"name\":\"event_time\"}," + + "{\"type\":\"long\",\"name\":\"order_id\"}," + + "{\"type\":\"string\",\"name\":\"product\"}," + + "{\"type\":\"long\",\"name\":\"amount\"}]'\n" + + " )\n" + + ")\n" + + "PARTITIONED BY ALL TIME", + druidDataSource, + icebergTableName, + ICEBERG_NAMESPACE, + catalogUri + ); + + final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql); + cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord); + cluster.callApi().waitForAllSegmentsToBeAvailable(druidDataSource, coordinator, broker); + + cluster.callApi().verifySqlQuery( + verifyQueryTemplate, + druidDataSource, + expectedCsv + ); + } + + private void dropTableSafely(final String tableName) + { + try { + icebergCatalog.dropTable(ICEBERG_NAMESPACE, tableName); + } + catch (Exception e) { + // best-effort cleanup + } + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index fc8c0958dc43..78220512a273 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -755,17 +755,21 @@ provided + + org.apache.iceberg + iceberg-data + ${iceberg.core.version} + runtime + org.apache.iceberg iceberg-parquet ${iceberg.core.version} - test org.apache.parquet parquet-column ${parquet.version} - test @@ -805,6 +809,7 @@ software.amazon.awssdk:sts software.amazon.awssdk:kms org.apache.iceberg:iceberg-gcp + org.apache.iceberg:iceberg-data com.google.cloud:google-cloud-storage diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java index d238cccc248c..6566a4ded551 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java @@ -27,6 +27,7 @@ import org.apache.druid.iceberg.guice.HiveConf; import org.apache.druid.iceberg.input.GlueIcebergCatalog; import org.apache.druid.iceberg.input.HiveIcebergCatalog; +import org.apache.druid.iceberg.input.IcebergFileTaskInputSource; import org.apache.druid.iceberg.input.IcebergInputSource; import org.apache.druid.iceberg.input.LocalCatalog; import org.apache.druid.iceberg.input.RestIcebergCatalog; @@ -49,6 +50,7 @@ public List getJacksonModules() new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY), new NamedType(RestIcebergCatalog.class, RestIcebergCatalog.TYPE_KEY), new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY), + new NamedType(IcebergFileTaskInputSource.class, IcebergFileTaskInputSource.TYPE_KEY), new NamedType(GlueIcebergCatalog.class, GlueIcebergCatalog.TYPE_KEY) ) ); diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/DeleteFileInfo.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/DeleteFileInfo.java new file mode 100644 index 000000000000..43a19bdd368a --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/DeleteFileInfo.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Serializable metadata about an Iceberg delete file associated with a data file. + * Carried inside {@link IcebergFileTaskInputSource} so that workers can apply + * deletes without catalog access. + */ +public class DeleteFileInfo +{ + public enum ContentType + { + POSITION("position"), + EQUALITY("equality"); + + private final String value; + + ContentType(final String value) + { + this.value = value; + } + + @JsonValue + public String getValue() + { + return value; + } + } + + private final String path; + private final ContentType contentType; + private final List equalityFieldIds; + + @JsonCreator + public DeleteFileInfo( + @JsonProperty("path") final String path, + @JsonProperty("contentType") final ContentType contentType, + @JsonProperty("equalityFieldIds") final List equalityFieldIds + ) + { + this.path = path; + this.contentType = contentType; + this.equalityFieldIds = equalityFieldIds != null ? equalityFieldIds : Collections.emptyList(); + } + + @JsonProperty + public String getPath() + { + return path; + } + + @JsonProperty + public ContentType getContentType() + { + return contentType; + } + + @JsonProperty + public List getEqualityFieldIds() + { + return equalityFieldIds; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DeleteFileInfo that = (DeleteFileInfo) o; + return Objects.equals(path, that.path) + && contentType == that.contentType + && Objects.equals(equalityFieldIds, that.equalityFieldIds); + } + + @Override + public int hashCode() + { + return Objects.hash(path, contentType, equalityFieldIds); + } + + @Override + public String toString() + { + return "DeleteFileInfo{" + + "path='" + path + '\'' + + ", contentType=" + contentType + + ", equalityFieldIds=" + equalityFieldIds + + '}'; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java index 2c8de41bb386..c95e0ec5a0da 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -37,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.joda.time.DateTime; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -153,4 +155,147 @@ public List extractSnapshotDataFiles( } return dataFilePaths; } + + /** + * Result container for a file scan that preserves the Table reference and + * individual FileScanTasks with their associated delete files. + */ + public static class FileScanResult + { + private final Table table; + private final List fileScanTasks; + private final boolean hasDeleteFiles; + + FileScanResult(final Table table, final List fileScanTasks, final boolean hasDeleteFiles) + { + this.table = table; + this.fileScanTasks = fileScanTasks; + this.hasDeleteFiles = hasDeleteFiles; + } + + public Table getTable() + { + return table; + } + + public List getFileScanTasks() + { + return fileScanTasks; + } + + public boolean hasDeleteFiles() + { + return hasDeleteFiles; + } + } + + /** + * Scan the iceberg table and return FileScanTasks with their delete file metadata intact. + * This is used when v2 delete handling is enabled, allowing the caller to apply deletes + * via Iceberg's native reader stack rather than just extracting raw file paths. + * + * @param tableNamespace The catalog namespace under which the table is defined + * @param tableName The iceberg table name + * @param icebergFilter The iceberg filter to apply for partition pruning + * @param snapshotTime Datetime for snapshot time-travel (null for latest) + * @param residualFilterMode Controls how residual filters are handled + * @return a FileScanResult containing the table, tasks, and delete file presence flag + */ + public FileScanResult extractFileScanTasks( + final String tableNamespace, + final String tableName, + final IcebergFilter icebergFilter, + final DateTime snapshotTime, + final ResidualFilterMode residualFilterMode + ) + { + final Catalog catalog = retrieveCatalog(); + final Namespace namespace = Namespace.of(tableNamespace); + final String tableIdentifier = tableNamespace + "." + tableName; + + final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream() + .filter(tableId -> tableId.toString().equals(tableIdentifier)) + .findFirst() + .orElseThrow(() -> new IAE( + " Couldn't retrieve table identifier for '%s'. " + + "Please verify that the table exists in the given catalog", + tableIdentifier + )); + + final long start = System.currentTimeMillis(); + final Table table = catalog.loadTable(icebergTableIdentifier); + TableScan tableScan = table.newScan(); + + if (icebergFilter != null) { + tableScan = icebergFilter.filter(tableScan); + } + if (snapshotTime != null) { + tableScan = tableScan.asOfTime(snapshotTime.getMillis()); + } + tableScan = tableScan.caseSensitive(isCaseSensitive()); + + final List fileScanTasks = new ArrayList<>(); + boolean hasDeleteFiles = false; + Expression detectedResidual = null; + + try (CloseableIterable tasks = tableScan.planFiles()) { + for (final FileScanTask task : tasks) { + fileScanTasks.add(task); + + if (!hasDeleteFiles && !task.deletes().isEmpty()) { + hasDeleteFiles = true; + } + + if (detectedResidual == null) { + final Expression residual = task.residual(); + if (residual != null && !residual.equals(Expressions.alwaysTrue())) { + detectedResidual = residual; + } + } + } + } + catch (IOException e) { + throw new RE(e, "Failed to plan files for iceberg table [%s]", tableIdentifier); + } + + if (detectedResidual != null) { + final String message = StringUtils.format( + "Iceberg filter produced residual expression that requires row-level filtering. " + + "This typically means the filter is on a non-partition column. " + + "Residual rows may be ingested unless filtered by transformSpec. " + + "Residual filter: [%s]", + detectedResidual + ); + + if (residualFilterMode == ResidualFilterMode.FAIL) { + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(message); + } + log.warn(message); + } + + final long duration = System.currentTimeMillis() - start; + log.info( + "File scan task extraction took [%d ms] for [%d] tasks, hasDeleteFiles=[%s]", + duration, + fileScanTasks.size(), + hasDeleteFiles + ); + + return new FileScanResult(table, fileScanTasks, hasDeleteFiles); + } + catch (DruidException e) { + throw e; + } + catch (Exception e) { + throw new RE(e, "Failed to load iceberg table with identifier [%s]", tableIdentifier); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxClassloader); + } + } } diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergFileTaskInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergFileTaskInputSource.java new file mode 100644 index 000000000000..00f436752da2 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergFileTaskInputSource.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceFactory; +import org.apache.druid.data.input.InputSourceReader; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.List; + +/** + * A non-splittable {@link InputSource} representing a single Iceberg v2 data file + * with its associated delete files. Created internally by {@link IcebergInputSource} + * when v2 delete files are detected. + * + * This input source is JSON-serializable, carrying all metadata needed for a worker + * to read a data file and apply deletes without catalog access: + *
    + *
  • Data file path
  • + *
  • Delete file metadata (paths, types, equality field IDs)
  • + *
  • Serialized Iceberg table schema (JSON)
  • + *
  • warehouseSource for file I/O
  • + *
+ */ +public class IcebergFileTaskInputSource implements InputSource +{ + public static final String TYPE_KEY = "icebergFileTask"; + + private final String dataFilePath; + private final List deleteFiles; + private final String tableSchemaJson; + private final InputSourceFactory warehouseSource; + + @JsonCreator + public IcebergFileTaskInputSource( + @JsonProperty("dataFilePath") final String dataFilePath, + @JsonProperty("deleteFiles") final List deleteFiles, + @JsonProperty("tableSchemaJson") final String tableSchemaJson, + @JsonProperty("warehouseSource") final InputSourceFactory warehouseSource + ) + { + this.dataFilePath = dataFilePath; + this.deleteFiles = deleteFiles; + this.tableSchemaJson = tableSchemaJson; + this.warehouseSource = warehouseSource; + } + + @JsonProperty + public String getDataFilePath() + { + return dataFilePath; + } + + @JsonProperty + public List getDeleteFiles() + { + return deleteFiles; + } + + @JsonProperty + public String getTableSchemaJson() + { + return tableSchemaJson; + } + + @JsonProperty + public InputSourceFactory getWarehouseSource() + { + return warehouseSource; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public boolean needsFormat() + { + // Native Iceberg reader handles format internally + return false; + } + + @Override + public InputSourceReader reader( + final InputRowSchema inputRowSchema, + @Nullable final InputFormat inputFormat, + final File temporaryDirectory + ) + { + return new IcebergNativeRecordReader( + dataFilePath, + deleteFiles, + tableSchemaJson, + warehouseSource, + inputRowSchema + ); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index ccbb10af14dc..4d5b67886a08 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -36,24 +36,37 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.iceberg.filter.IcebergFilter; import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.SchemaParser; import org.joda.time.DateTime; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.stream.Stream; /** - * Inputsource to ingest data managed by the Iceberg table format. - * This inputsource talks to the configured catalog, executes any configured filters and retrieves the data file paths upto the latest snapshot associated with the iceberg table. - * The data file paths are then provided to a native {@link SplittableInputSource} implementation depending on the warehouse source defined. + * Input source for ingesting data from Iceberg tables. + * + * Connects to a configured Iceberg catalog, executes partition filters, and retrieves + * data file paths. For Iceberg v1 tables (no delete files), file paths are delegated + * to {@code warehouseSource}. For v2 tables with active delete files, an + * {@link IcebergFileTaskInputSource} is created per task to apply deletes at read time. + * + * V2 detection is automatic -- no user configuration needed. */ public class IcebergInputSource implements SplittableInputSource> { public static final String TYPE_KEY = "iceberg"; + private static final Logger log = new Logger(IcebergInputSource.class); @JsonProperty private final String tableName; @@ -80,6 +93,13 @@ public class IcebergInputSource implements SplittableInputSource> private SplittableInputSource delegateInputSource; + /** + * When v2 delete files are detected, this holds the per-task input sources + * for the native reader path. + */ + @Nullable + private List v2TaskInputSources; + @JsonCreator public IcebergInputSource( @JsonProperty("tableName") String tableName, @@ -103,7 +123,14 @@ public IcebergInputSource( @Override public boolean needsFormat() { - return true; + if (!isLoaded) { + retrieveIcebergDatafiles(); + } + // V2 path uses native Iceberg readers and does not need an InputFormat + if (v2TaskInputSources != null) { + return false; + } + return getDelegateInputSource().needsFormat(); } @Override @@ -116,6 +143,12 @@ public InputSourceReader reader( if (!isLoaded) { retrieveIcebergDatafiles(); } + + // V2 path: use native Iceberg reader with delete application + if (v2TaskInputSources != null && !v2TaskInputSources.isEmpty()) { + return new CompositeInputSourceReader(v2TaskInputSources, inputRowSchema, inputFormat, temporaryDirectory); + } + return getDelegateInputSource().reader(inputRowSchema, inputFormat, temporaryDirectory); } @@ -164,6 +197,12 @@ public String getNamespace() return namespace; } + @JsonProperty + public IcebergFilter getIcebergFilter() + { + return icebergFilter; + } + @JsonProperty public IcebergCatalog getIcebergCatalog() { @@ -171,12 +210,11 @@ public IcebergCatalog getIcebergCatalog() } @JsonProperty - public IcebergFilter getIcebergFilter() + public InputSourceFactory getWarehouseSource() { - return icebergFilter; + return warehouseSource; } - @Nullable @JsonProperty public DateTime getSnapshotTime() { @@ -189,34 +227,199 @@ public ResidualFilterMode getResidualFilterMode() return residualFilterMode; } - public SplittableInputSource getDelegateInputSource() + protected SplittableInputSource getDelegateInputSource() { + if (delegateInputSource == null) { + delegateInputSource = new EmptyInputSource(); + } return delegateInputSource; } + /** + * Scans the Iceberg table and routes to V1 or V2 path based on delete file presence. + * + * V1 path (no deletes): extracts file paths, delegates to warehouseSource. + * V2 path (deletes detected): creates IcebergFileTaskInputSource per FileScanTask + * carrying data file path, delete file metadata, and serialized table schema. + */ protected void retrieveIcebergDatafiles() { - List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( + final IcebergCatalog.FileScanResult scanResult = icebergCatalog.extractFileScanTasks( getNamespace(), getTableName(), getIcebergFilter(), getSnapshotTime(), getResidualFilterMode() ); - if (snapshotDataFiles.isEmpty()) { + + if (scanResult.getFileScanTasks().isEmpty()) { + delegateInputSource = new EmptyInputSource(); + isLoaded = true; + return; + } + + if (scanResult.hasDeleteFiles()) { + // V2 path: create per-task input sources with delete file metadata + final String schemaJson = SchemaParser.toJson(scanResult.getTable().schema()); + v2TaskInputSources = new ArrayList<>(); + + for (final FileScanTask task : scanResult.getFileScanTasks()) { + final String dataFilePath = task.file().location(); + final List deleteFileInfos = new ArrayList<>(); + + for (final DeleteFile deleteFile : task.deletes()) { + deleteFileInfos.add(new DeleteFileInfo( + deleteFile.location(), + deleteFile.content() == FileContent.EQUALITY_DELETES + ? DeleteFileInfo.ContentType.EQUALITY + : DeleteFileInfo.ContentType.POSITION, + deleteFile.content() == FileContent.EQUALITY_DELETES + ? deleteFile.equalityFieldIds() + : Collections.emptyList() + )); + } + + v2TaskInputSources.add(new IcebergFileTaskInputSource( + dataFilePath, + deleteFileInfos, + schemaJson, + warehouseSource + )); + } + + // Set a delegate for createSplits/estimateNumSplits compatibility delegateInputSource = new EmptyInputSource(); + + log.info( + "Iceberg v2 delete files detected for table [%s.%s]. Using native reader for [%d] tasks.", + getNamespace(), + getTableName(), + v2TaskInputSources.size() + ); } else { - delegateInputSource = warehouseSource.create(snapshotDataFiles); + // V1 path: extract file paths, delegate to warehouseSource + final List dataFilePaths = new ArrayList<>(); + for (final FileScanTask task : scanResult.getFileScanTasks()) { + dataFilePaths.add(task.file().location()); + } + delegateInputSource = warehouseSource.create(dataFilePaths); } + isLoaded = true; } /** - * This input source is used in place of a delegate input source if there are no input file paths. - * Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource - * may use this input source as delegate in such cases. + * Composes readers from multiple {@link IcebergFileTaskInputSource} instances, + * iterating through tasks sequentially. Each task's reader is opened lazily + * and closed before opening the next. + */ + private static class CompositeInputSourceReader implements InputSourceReader + { + private final List taskSources; + private final InputRowSchema inputRowSchema; + private final InputFormat inputFormat; + private final File temporaryDirectory; + + CompositeInputSourceReader( + final List taskSources, + final InputRowSchema inputRowSchema, + final InputFormat inputFormat, + final File temporaryDirectory + ) + { + this.taskSources = taskSources; + this.inputRowSchema = inputRowSchema; + this.inputFormat = inputFormat; + this.temporaryDirectory = temporaryDirectory; + } + + @Override + public CloseableIterator read(final InputStats inputStats) throws IOException + { + final Iterator taskIterator = taskSources.iterator(); + return new CloseableIterator() + { + private CloseableIterator current = null; + + @Override + public boolean hasNext() + { + while ((current == null || !current.hasNext()) && taskIterator.hasNext()) { + closeCurrent(); + try { + current = taskIterator.next() + .reader(inputRowSchema, inputFormat, temporaryDirectory) + .read(inputStats); + } + catch (IOException e) { + throw new RuntimeException("Failed to open Iceberg task reader", e); + } + } + return current != null && current.hasNext(); + } + + @Override + public InputRow next() + { + if (!hasNext()) { + throw new java.util.NoSuchElementException(); + } + return current.next(); + } + + @Override + public void close() + { + closeCurrent(); + } + + private void closeCurrent() + { + if (current != null) { + try { + current.close(); + } + catch (IOException e) { + throw new RuntimeException("Failed to close Iceberg task reader", e); + } + current = null; + } + } + }; + } + + @Override + public CloseableIterator sample() throws IOException + { + final CloseableIterator reader = read(null); + return new CloseableIterator() + { + @Override + public boolean hasNext() + { + return reader.hasNext(); + } + + @Override + public InputRowListPlusRawValues next() + { + final InputRow row = reader.next(); + return InputRowListPlusRawValues.of(row, Collections.emptyMap()); + } + + @Override + public void close() throws IOException + { + reader.close(); + } + }; + } + } + + /** + * Placeholder input source used when there are no input file paths. */ - private static class EmptyInputSource implements SplittableInputSource + static class EmptyInputSource implements SplittableInputSource { @Override public boolean needsFormat() diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergNativeRecordReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergNativeRecordReader.java new file mode 100644 index 000000000000..7f4ceaf3b3f0 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergNativeRecordReader.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceFactory; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * An {@link InputSourceReader} that reads an Iceberg data file and applies + * associated positional and equality delete files before converting records + * to Druid {@link InputRow} objects. + * + * Delete application follows the Iceberg v2 spec: + *
    + *
  1. Positional deletes: read (file_path, pos) pairs, filter by current data file, + * build a Set of deleted positions
  2. + *
  3. Equality deletes: read key tuples from equality delete files, build Sets + * of deleted key values per equality field set
  4. + *
  5. Stream data file: for each record, skip if position-deleted or equality-deleted
  6. + *
+ * + * All reads use Iceberg's Parquet reader with {@link GenericParquetReaders} for + * schema-aware reading. Files are accessed via Hadoop {@link Configuration}. + */ +public class IcebergNativeRecordReader implements InputSourceReader +{ + private static final Logger log = new Logger(IcebergNativeRecordReader.class); + + private final String dataFilePath; + private final List deleteFiles; + private final String tableSchemaJson; + private final InputSourceFactory warehouseSource; + private final InputRowSchema inputRowSchema; + private final Configuration hadoopConf; + + public IcebergNativeRecordReader( + final String dataFilePath, + final List deleteFiles, + final String tableSchemaJson, + final InputSourceFactory warehouseSource, + final InputRowSchema inputRowSchema + ) + { + this.dataFilePath = dataFilePath; + this.deleteFiles = deleteFiles; + this.tableSchemaJson = tableSchemaJson; + this.warehouseSource = warehouseSource; + this.inputRowSchema = inputRowSchema; + this.hadoopConf = new Configuration(); + } + + @Override + public CloseableIterator read(final InputStats inputStats) throws IOException + { + final Schema tableSchema = SchemaParser.fromJson(tableSchemaJson); + + // Step 1: Collect positional deletes + final Set deletedPositions = collectPositionalDeletes(); + + // Step 2: Collect equality deletes + final List equalityDeleteSets = collectEqualityDeletes(tableSchema); + + // Step 3: Stream data file with delete application + final InputFile dataInputFile = HadoopInputFile.fromLocation(dataFilePath, hadoopConf); + final CloseableIterable records = Parquet.read(dataInputFile) + .project(tableSchema) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader( + tableSchema, + fileSchema + ) + ) + .build(); + + final IcebergRecordConverter converter = new IcebergRecordConverter(tableSchema); + + return new CloseableIterator() + { + private final Iterator delegate = records.iterator(); + private long position = 0; + private InputRow nextRow = null; + + @Override + public boolean hasNext() + { + while (nextRow == null && delegate.hasNext()) { + final Record record = delegate.next(); + final long currentPos = position++; + + // Step 4: Apply positional deletes + if (!deletedPositions.isEmpty() && deletedPositions.contains(currentPos)) { + continue; + } + + // Step 5: Apply equality deletes + if (isEqualityDeleted(record, equalityDeleteSets)) { + continue; + } + + // Step 6: Convert surviving record + final Map map = converter.convert(record); + nextRow = MapInputRowParser.parse(inputRowSchema, map); + } + return nextRow != null; + } + + @Override + public InputRow next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final InputRow row = nextRow; + nextRow = null; + return row; + } + + @Override + public void close() throws IOException + { + records.close(); + } + }; + } + + @Override + public CloseableIterator sample() throws IOException + { + final CloseableIterator reader = read(null); + return new CloseableIterator() + { + @Override + public boolean hasNext() + { + return reader.hasNext(); + } + + @Override + public InputRowListPlusRawValues next() + { + final InputRow row = reader.next(); + return InputRowListPlusRawValues.of(row, Collections.emptyMap()); + } + + @Override + public void close() throws IOException + { + reader.close(); + } + }; + } + + /** + * Reads all positional delete files and collects positions that apply to the + * current data file. + */ + private Set collectPositionalDeletes() throws IOException + { + final Set deletedPositions = new HashSet<>(); + final Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema(); + + for (final DeleteFileInfo deleteFileInfo : deleteFiles) { + if (deleteFileInfo.getContentType() != DeleteFileInfo.ContentType.POSITION) { + continue; + } + + final InputFile deleteInputFile = HadoopInputFile.fromLocation(deleteFileInfo.getPath(), hadoopConf); + + try (CloseableIterable deleteRecords = Parquet.read(deleteInputFile) + .project(posDeleteSchema) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader( + posDeleteSchema, + fileSchema + ) + ) + .build()) { + for (final Record deleteRecord : deleteRecords) { + final String filePath = deleteRecord.getField("file_path").toString(); + if (dataFilePath.equals(filePath)) { + final long pos = (Long) deleteRecord.getField("pos"); + deletedPositions.add(pos); + } + } + } + } + + if (!deletedPositions.isEmpty()) { + log.info("Collected [%d] positional deletes for data file [%s]", deletedPositions.size(), dataFilePath); + } + + return deletedPositions; + } + + /** + * Reads all equality delete files and builds sets of deleted key tuples. + */ + private List collectEqualityDeletes(final Schema tableSchema) throws IOException + { + final List result = new ArrayList<>(); + + for (final DeleteFileInfo deleteFileInfo : deleteFiles) { + if (deleteFileInfo.getContentType() != DeleteFileInfo.ContentType.EQUALITY) { + continue; + } + + // Build projected schema from equality field IDs + final List equalityFields = new ArrayList<>(); + final List fieldNames = new ArrayList<>(); + for (final int fieldId : deleteFileInfo.getEqualityFieldIds()) { + final Types.NestedField field = tableSchema.findField(fieldId); + if (field != null) { + equalityFields.add(field); + fieldNames.add(field.name()); + } + } + + if (equalityFields.isEmpty()) { + continue; + } + + final Schema deleteSchema = new Schema(equalityFields); + final InputFile deleteInputFile = HadoopInputFile.fromLocation(deleteFileInfo.getPath(), hadoopConf); + + final Set> deletedKeys = new HashSet<>(); + try (CloseableIterable deleteRecords = Parquet.read(deleteInputFile) + .project(deleteSchema) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader( + deleteSchema, + fileSchema + ) + ) + .build()) { + for (final Record deleteRecord : deleteRecords) { + final List key = new ArrayList<>(fieldNames.size()); + for (final String fieldName : fieldNames) { + final Object value = deleteRecord.getField(fieldName); + key.add(value); + } + deletedKeys.add(key); + } + } + + if (!deletedKeys.isEmpty()) { + result.add(new EqualityDeleteSet(fieldNames, deletedKeys)); + log.info( + "Collected [%d] equality deletes on fields %s for data file [%s]", + deletedKeys.size(), + fieldNames, + dataFilePath + ); + } + } + + return result; + } + + /** + * Checks whether a record matches any equality delete set. + */ + private static boolean isEqualityDeleted(final Record record, final List equalityDeleteSets) + { + for (final EqualityDeleteSet deleteSet : equalityDeleteSets) { + final List key = new ArrayList<>(deleteSet.fieldNames.size()); + for (final String fieldName : deleteSet.fieldNames) { + key.add(record.getField(fieldName)); + } + if (deleteSet.deletedKeys.contains(key)) { + return true; + } + } + return false; + } + + /** + * Holds a set of deleted key tuples for a single equality delete file. + */ + private static class EqualityDeleteSet + { + final List fieldNames; + final Set> deletedKeys; + + EqualityDeleteSet(final List fieldNames, final Set> deletedKeys) + { + this.fieldNames = fieldNames; + this.deletedKeys = deletedKeys; + } + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergRecordConverter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergRecordConverter.java new file mode 100644 index 000000000000..c8e5a1cabfe4 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergRecordConverter.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Converts an Iceberg {@link Record} into a {@code Map} suitable + * for consumption by Druid's {@link org.apache.druid.data.input.impl.MapInputRowParser}. + * + * Handles all Iceberg primitive types, nested structs, lists, and maps. + * The converter uses Iceberg field IDs internally for schema resolution, + * making it robust against column renames (V3 schema evolution). + */ +public class IcebergRecordConverter +{ + private final Schema schema; + + public IcebergRecordConverter(final Schema schema) + { + this.schema = schema; + } + + /** + * Converts an Iceberg Record to a flat Map using the converter's schema. + * Only top-level columns from the schema are included in the output map. + */ + public Map convert(final Record record) + { + final Map result = new LinkedHashMap<>(); + for (final Types.NestedField field : schema.columns()) { + final Object value = record.getField(field.name()); + result.put(field.name(), convertValue(value, field.type())); + } + return result; + } + + /** + * Converts a single Iceberg value to a Druid-compatible Java type. + * Returns null for null inputs. + */ + static Object convertValue(final Object value, final Type type) + { + if (value == null) { + return null; + } + + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + return value; + + case STRING: + // Iceberg may return CharSequence (e.g., Utf8); convert to String + return value.toString(); + + case DATE: + // GenericParquetReaders returns LocalDate + if (value instanceof LocalDate) { + return value.toString(); + } + // Fallback: integer days since epoch + if (value instanceof Integer) { + return LocalDate.ofEpochDay((Integer) value).toString(); + } + return value.toString(); + + case TIME: + // GenericParquetReaders returns LocalTime + if (value instanceof LocalTime) { + return value.toString(); + } + // Fallback: long microseconds since midnight + if (value instanceof Long) { + return LocalTime.ofNanoOfDay((Long) value * 1000L).toString(); + } + return value.toString(); + + case TIMESTAMP: + return convertTimestamp(value); + + case TIMESTAMP_NANO: + return convertTimestampNano(value); + + case DECIMAL: + // BigDecimal passes through; Druid handles it natively + if (value instanceof BigDecimal) { + return ((BigDecimal) value).doubleValue(); + } + return value; + + case UUID: + if (value instanceof UUID) { + return value.toString(); + } + return value.toString(); + + case FIXED: + case BINARY: + if (value instanceof ByteBuffer) { + final ByteBuffer buf = (ByteBuffer) value; + final byte[] bytes = new byte[buf.remaining()]; + buf.duplicate().get(bytes); + return bytes; + } + if (value instanceof byte[]) { + return value; + } + return value; + + case STRUCT: + return convertStruct(value, type.asStructType()); + + case LIST: + return convertList(value, type.asListType()); + + case MAP: + return convertMap(value, type.asMapType()); + + default: + // GEOMETRY, GEOGRAPHY, VARIANT, UNKNOWN: convert to string as best-effort + return value.toString(); + } + } + + private static Object convertTimestamp(final Object value) + { + // GenericParquetReaders returns LocalDateTime (no tz) or OffsetDateTime (with tz) + if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value).toInstant().toEpochMilli(); + } + if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).toInstant(ZoneOffset.UTC).toEpochMilli(); + } + // Fallback: long microseconds since epoch + if (value instanceof Long) { + return (Long) value / 1000L; + } + return value.toString(); + } + + private static Object convertTimestampNano(final Object value) + { + if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value).toInstant().toEpochMilli(); + } + if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).toInstant(ZoneOffset.UTC).toEpochMilli(); + } + // Fallback: long nanoseconds since epoch, truncate to millis + if (value instanceof Long) { + return (Long) value / 1_000_000L; + } + return value.toString(); + } + + private static Map convertStruct(final Object value, final Types.StructType structType) + { + if (!(value instanceof Record)) { + return null; + } + final Record nested = (Record) value; + final Map result = new LinkedHashMap<>(); + for (final Types.NestedField field : structType.fields()) { + final Object fieldValue = nested.getField(field.name()); + result.put(field.name(), convertValue(fieldValue, field.type())); + } + return result; + } + + @SuppressWarnings("unchecked") + private static List convertList(final Object value, final Types.ListType listType) + { + if (!(value instanceof List)) { + return null; + } + final List source = (List) value; + final Type elementType = listType.elementType(); + final List result = new ArrayList<>(source.size()); + for (final Object element : source) { + result.add(convertValue(element, elementType)); + } + return result; + } + + @SuppressWarnings("unchecked") + private static Map convertMap(final Object value, final Types.MapType mapType) + { + if (!(value instanceof Map)) { + return null; + } + final Map source = (Map) value; + final Type keyType = mapType.keyType(); + final Type valueType = mapType.valueType(); + final Map result = new LinkedHashMap<>(); + for (final Map.Entry entry : source.entrySet()) { + // Map keys are always converted to String for Druid compatibility + final String key = String.valueOf(convertValue(entry.getKey(), keyType)); + result.put(key, convertValue(entry.getValue(), valueType)); + } + return result; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index 93d7412cc77a..95ea24322d3b 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -100,8 +100,7 @@ public void testInputSource() throws IOException testCatalog, new LocalInputSourceFactory(), null, - null - ); + null); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); List localInputSourceList = splits.map(inputSource::withSplit) .map(inpSource -> (LocalInputSource) inpSource) @@ -136,8 +135,7 @@ public void testInputSourceWithEmptySource() throws IOException testCatalog, new LocalInputSourceFactory(), null, - null - ); + null); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(0, splits.count()); } @@ -152,8 +150,7 @@ public void testInputSourceWithFilter() throws IOException testCatalog, new LocalInputSourceFactory(), null, - null - ); + null); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); List localInputSourceList = splits.map(inputSource::withSplit) .map(inpSource -> (LocalInputSource) inpSource) @@ -188,8 +185,7 @@ public void testInputSourceReadFromLatestSnapshot() throws IOException testCatalog, new LocalInputSourceFactory(), DateTimes.nowUtc(), - null - ); + null); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(1, splits.count()); } @@ -208,8 +204,7 @@ public void testCaseInsensitiveFiltering() throws IOException caseInsensitiveCatalog, new LocalInputSourceFactory(), null, - null - ); + null); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); List localInputSourceList = splits.map(inputSource::withSplit) @@ -233,8 +228,7 @@ public void testResidualFilterModeIgnore() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.IGNORE - ); + ResidualFilterMode.IGNORE); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(1, splits.count()); } @@ -250,8 +244,7 @@ public void testResidualFilterModeFail() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL - ); + ResidualFilterMode.FAIL); DruidException exception = Assert.assertThrows( DruidException.class, () -> inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)) @@ -278,8 +271,7 @@ public void testResidualFilterModeFailWithPartitionedTable() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL - ); + ResidualFilterMode.FAIL); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(1, splits.count()); } @@ -301,8 +293,7 @@ public void testResidualFilterModeFailWithPartitionedTableNonPartitionColumn() t testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL - ); + ResidualFilterMode.FAIL); DruidException exception = Assert.assertThrows( DruidException.class, () -> inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)) diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergRecordConverterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergRecordConverterTest.java new file mode 100644 index 000000000000..54320b719dc4 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergRecordConverterTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class IcebergRecordConverterTest +{ + @Test + public void testConvertPrimitiveTypes() + { + final Schema schema = new Schema( + Types.NestedField.required(1, "boolCol", Types.BooleanType.get()), + Types.NestedField.required(2, "intCol", Types.IntegerType.get()), + Types.NestedField.required(3, "longCol", Types.LongType.get()), + Types.NestedField.required(4, "floatCol", Types.FloatType.get()), + Types.NestedField.required(5, "doubleCol", Types.DoubleType.get()), + Types.NestedField.required(6, "stringCol", Types.StringType.get()) + ); + + final GenericRecord record = GenericRecord.create(schema); + record.setField("boolCol", true); + record.setField("intCol", 42); + record.setField("longCol", 123456789L); + record.setField("floatCol", 3.14f); + record.setField("doubleCol", 2.718); + record.setField("stringCol", "hello"); + + final IcebergRecordConverter converter = new IcebergRecordConverter(schema); + final Map result = converter.convert(record); + + Assert.assertEquals(true, result.get("boolCol")); + Assert.assertEquals(42, result.get("intCol")); + Assert.assertEquals(123456789L, result.get("longCol")); + Assert.assertEquals(3.14f, result.get("floatCol")); + Assert.assertEquals(2.718, result.get("doubleCol")); + Assert.assertEquals("hello", result.get("stringCol")); + } + + @Test + public void testConvertDateAndTimeTypes() + { + final Schema schema = new Schema( + Types.NestedField.required(1, "dateCol", Types.DateType.get()), + Types.NestedField.required(2, "timeCol", Types.TimeType.get()), + Types.NestedField.required(3, "tsCol", Types.TimestampType.withoutZone()), + Types.NestedField.required(4, "tstzCol", Types.TimestampType.withZone()) + ); + + final GenericRecord record = GenericRecord.create(schema); + record.setField("dateCol", LocalDate.of(2024, 6, 15)); + record.setField("timeCol", LocalTime.of(14, 30, 0)); + record.setField("tsCol", LocalDateTime.of(2024, 6, 15, 14, 30, 0)); + record.setField("tstzCol", OffsetDateTime.of(2024, 6, 15, 14, 30, 0, 0, ZoneOffset.UTC)); + + final IcebergRecordConverter converter = new IcebergRecordConverter(schema); + final Map result = converter.convert(record); + + Assert.assertEquals("2024-06-15", result.get("dateCol")); + Assert.assertEquals("14:30", result.get("timeCol")); + // Both timestamp types should return epoch millis + Assert.assertTrue(result.get("tsCol") instanceof Long); + Assert.assertTrue(result.get("tstzCol") instanceof Long); + Assert.assertEquals( + LocalDateTime.of(2024, 6, 15, 14, 30, 0).toInstant(ZoneOffset.UTC).toEpochMilli(), + result.get("tsCol") + ); + Assert.assertEquals( + OffsetDateTime.of(2024, 6, 15, 14, 30, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli(), + result.get("tstzCol") + ); + } + + @Test + public void testConvertDecimalType() + { + final Schema schema = new Schema( + Types.NestedField.required(1, "decimalCol", Types.DecimalType.of(10, 2)) + ); + + final GenericRecord record = GenericRecord.create(schema); + record.setField("decimalCol", new BigDecimal("123.45")); + + final IcebergRecordConverter converter = new IcebergRecordConverter(schema); + final Map result = converter.convert(record); + + Assert.assertEquals(123.45, (Double) result.get("decimalCol"), 0.001); + } + + @Test + public void testConvertBinaryType() + { + final Schema schema = new Schema( + Types.NestedField.required(1, "binaryCol", Types.BinaryType.get()) + ); + + final byte[] data = new byte[]{1, 2, 3, 4}; + final GenericRecord record = GenericRecord.create(schema); + record.setField("binaryCol", ByteBuffer.wrap(data)); + + final IcebergRecordConverter converter = new IcebergRecordConverter(schema); + final Map result = converter.convert(record); + + Assert.assertArrayEquals(data, (byte[]) result.get("binaryCol")); + } + + @Test + public void testConvertNullValues() + { + final Schema schema = new Schema( + Types.NestedField.optional(1, "nullableStr", Types.StringType.get()), + Types.NestedField.optional(2, "nullableInt", Types.IntegerType.get()) + ); + + final GenericRecord record = GenericRecord.create(schema); + record.setField("nullableStr", null); + record.setField("nullableInt", null); + + final IcebergRecordConverter converter = new IcebergRecordConverter(schema); + final Map result = converter.convert(record); + + Assert.assertNull(result.get("nullableStr")); + Assert.assertNull(result.get("nullableInt")); + } + + @Test + public void testConvertNestedStruct() + { + final Types.StructType addressType = Types.StructType.of( + Types.NestedField.required(3, "city", Types.StringType.get()), + Types.NestedField.required(4, "zip", Types.IntegerType.get()) + ); + final Schema schema = new Schema( + Types.NestedField.required(1, "name", Types.StringType.get()), + Types.NestedField.required(2, "address", addressType) + ); + + final GenericRecord addressRecord = GenericRecord.create(addressType); + addressRecord.setField("city", "SanFrancisco"); + addressRecord.setField("zip", 94105); + + final GenericRecord record = GenericRecord.create(schema); + record.setField("name", "Alice"); + record.setField("address", addressRecord); + + final IcebergRecordConverter converter = new IcebergRecordConverter(schema); + final Map result = converter.convert(record); + + Assert.assertEquals("Alice", result.get("name")); + @SuppressWarnings("unchecked") + final Map addressMap = (Map) result.get("address"); + Assert.assertNotNull(addressMap); + Assert.assertEquals("SanFrancisco", addressMap.get("city")); + Assert.assertEquals(94105, addressMap.get("zip")); + } + + @Test + public void testConvertListType() + { + final Schema schema = new Schema( + Types.NestedField.required(1, "tags", Types.ListType.ofRequired(2, Types.StringType.get())) + ); + + final GenericRecord record = GenericRecord.create(schema); + record.setField("tags", Arrays.asList("tag1", "tag2", "tag3")); + + final IcebergRecordConverter converter = new IcebergRecordConverter(schema); + final Map result = converter.convert(record); + + @SuppressWarnings("unchecked") + final List tags = (List) result.get("tags"); + Assert.assertNotNull(tags); + Assert.assertEquals(3, tags.size()); + Assert.assertEquals("tag1", tags.get(0)); + Assert.assertEquals("tag2", tags.get(1)); + Assert.assertEquals("tag3", tags.get(2)); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/V2DeleteHandlingTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/V2DeleteHandlingTest.java new file mode 100644 index 000000000000..328e60cd7869 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/V2DeleteHandlingTest.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.LocalInputSourceFactory; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +/** + * Tests for automatic Iceberg v2 delete file detection and application. + * Verifies that IcebergInputSource transparently handles positional and + * equality deletes without any user configuration. + */ +public class V2DeleteHandlingTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private LocalCatalog testCatalog; + private File warehouseDir; + + private static final String NAMESPACE = "default"; + private static final String TABLE_NAME = "v2TestTable"; + + private final Schema tableSchema = new Schema( + Types.NestedField.required(1, "order_id", Types.IntegerType.get()), + Types.NestedField.required(2, "product", Types.StringType.get()), + Types.NestedField.required(3, "amount", Types.DoubleType.get()) + ); + + private final InputRowSchema inputRowSchema = new InputRowSchema( + new TimestampSpec("order_id", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("order_id", "product", "amount"))), + ColumnsFilter.all(), + ImmutableSet.of() + ); + + @Before + public void setup() + { + warehouseDir = FileUtils.createTempDir(); + testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true); + } + + @After + public void tearDown() + { + final TableIdentifier tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE_NAME); + try { + testCatalog.retrieveCatalog().dropTable(tableId); + } + catch (Exception e) { + // ignore + } + } + + @Test + public void testAutoDetectWithEqualityDelete() throws IOException + { + createTableWithEqualityDelete(); + + final IcebergInputSource inputSource = new IcebergInputSource( + TABLE_NAME, + NAMESPACE, + null, + testCatalog, + new LocalInputSourceFactory(), + null, + null + ); + + final InputSourceReader reader = inputSource.reader(inputRowSchema, null, temporaryFolder.newFolder()); + final List rows = readAll(reader); + + // Equality delete removed order_id=2, so only 2 rows remain + Assert.assertEquals(2, rows.size()); + + final List orderIds = new ArrayList<>(); + for (final InputRow row : rows) { + orderIds.add(row.getDimension("order_id").get(0)); + } + Assert.assertTrue("Should contain order_id 1", orderIds.contains("1")); + Assert.assertTrue("Should contain order_id 3", orderIds.contains("3")); + Assert.assertFalse("Should NOT contain deleted order_id 2", orderIds.contains("2")); + } + + @Test + public void testAutoDetectWithPositionalDelete() throws IOException + { + createTableWithPositionalDelete(); + + final IcebergInputSource inputSource = new IcebergInputSource( + TABLE_NAME, + NAMESPACE, + null, + testCatalog, + new LocalInputSourceFactory(), + null, + null + ); + + final InputSourceReader reader = inputSource.reader(inputRowSchema, null, temporaryFolder.newFolder()); + final List rows = readAll(reader); + + // Positional delete removed row at position 1 (order_id=2), so 2 rows remain + Assert.assertEquals(2, rows.size()); + } + + @Test + public void testV1TableWithNoDeletesUsesFilePaths() throws IOException + { + createTableWithoutDeletes(); + + // Verify scan detects no deletes + final IcebergCatalog.FileScanResult result = testCatalog.extractFileScanTasks( + NAMESPACE, + TABLE_NAME, + null, + null, + ResidualFilterMode.IGNORE + ); + + Assert.assertFalse("V1 table should have no delete files", result.hasDeleteFiles()); + Assert.assertEquals(1, result.getFileScanTasks().size()); + } + + @Test + public void testV2TableDeleteDetection() throws IOException + { + createTableWithEqualityDelete(); + + final IcebergCatalog.FileScanResult result = testCatalog.extractFileScanTasks( + NAMESPACE, + TABLE_NAME, + null, + null, + ResidualFilterMode.IGNORE + ); + + Assert.assertTrue("V2 table with deletes should have hasDeleteFiles=true", result.hasDeleteFiles()); + Assert.assertEquals(1, result.getFileScanTasks().size()); + Assert.assertFalse("Should have delete files", result.getFileScanTasks().get(0).deletes().isEmpty()); + } + + @Test + public void testDeleteFileInfoSerialization() + { + final DeleteFileInfo posDelete = new DeleteFileInfo( + "s3://bucket/data/pos-del.parquet", + DeleteFileInfo.ContentType.POSITION, + null + ); + Assert.assertEquals("s3://bucket/data/pos-del.parquet", posDelete.getPath()); + Assert.assertEquals(DeleteFileInfo.ContentType.POSITION, posDelete.getContentType()); + Assert.assertTrue(posDelete.getEqualityFieldIds().isEmpty()); + + final DeleteFileInfo eqDelete = new DeleteFileInfo( + "s3://bucket/data/eq-del.parquet", + DeleteFileInfo.ContentType.EQUALITY, + ImmutableList.of(1, 2) + ); + Assert.assertEquals(DeleteFileInfo.ContentType.EQUALITY, eqDelete.getContentType()); + Assert.assertEquals(ImmutableList.of(1, 2), eqDelete.getEqualityFieldIds()); + } + + @Test + public void testIcebergFileTaskInputSourceCreation() throws IOException + { + createTableWithEqualityDelete(); + + final IcebergCatalog.FileScanResult result = testCatalog.extractFileScanTasks( + NAMESPACE, + TABLE_NAME, + null, + null, + ResidualFilterMode.IGNORE + ); + + final String schemaJson = org.apache.iceberg.SchemaParser.toJson(result.getTable().schema()); + Assert.assertNotNull(schemaJson); + Assert.assertFalse(schemaJson.isEmpty()); + + // Verify schema round-trip + final Schema roundTripped = org.apache.iceberg.SchemaParser.fromJson(schemaJson); + Assert.assertEquals(tableSchema.columns().size(), roundTripped.columns().size()); + } + + @Test + public void testEmptyTableReturnsNoRows() throws IOException + { + // Create table with no data + final TableIdentifier tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE_NAME); + testCatalog.retrieveCatalog().createTable( + tableId, + tableSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of("format-version", "2") + ); + + final IcebergInputSource inputSource = new IcebergInputSource( + TABLE_NAME, + NAMESPACE, + null, + testCatalog, + new LocalInputSourceFactory(), + null, + null + ); + + final InputSourceReader reader = inputSource.reader(inputRowSchema, null, temporaryFolder.newFolder()); + final List rows = readAll(reader); + Assert.assertEquals(0, rows.size()); + } + + // --- Helper methods --- + + private List readAll(final InputSourceReader reader) throws IOException + { + final List rows = new ArrayList<>(); + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + rows.add(iterator.next()); + } + } + return rows; + } + + private Table createBaseTable() + { + final TableIdentifier tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE_NAME); + return testCatalog.retrieveCatalog().createTable( + tableId, + tableSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of("format-version", "2") + ); + } + + private DataFile writeDataFile(final Table table) throws IOException + { + final GenericRecord r1 = GenericRecord.create(tableSchema); + r1.setField("order_id", 1); + r1.setField("product", "Widget"); + r1.setField("amount", 10.0); + + final GenericRecord r2 = GenericRecord.create(tableSchema); + r2.setField("order_id", 2); + r2.setField("product", "Gadget"); + r2.setField("amount", 20.0); + + final GenericRecord r3 = GenericRecord.create(tableSchema); + r3.setField("order_id", 3); + r3.setField("product", "Doohickey"); + r3.setField("amount", 30.0); + + final List records = ImmutableList.of(r1, r2, r3); + + final String filepath = table.location() + "/data/" + UUID.randomUUID() + ".parquet"; + final OutputFile outputFile = table.io().newOutputFile(filepath); + + final DataWriter writer = Parquet.writeData(outputFile) + .schema(tableSchema) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + try { + for (final GenericRecord rec : records) { + writer.write(rec); + } + } + finally { + writer.close(); + } + return writer.toDataFile(); + } + + private void createTableWithoutDeletes() throws IOException + { + final Table table = createBaseTable(); + final DataFile dataFile = writeDataFile(table); + table.newAppend().appendFile(dataFile).commit(); + } + + private void createTableWithEqualityDelete() throws IOException + { + final Table table = createBaseTable(); + final DataFile dataFile = writeDataFile(table); + table.newAppend().appendFile(dataFile).commit(); + + final Schema deleteSchema = new Schema( + Types.NestedField.required(1, "order_id", Types.IntegerType.get()) + ); + final String deletePath = table.location() + "/data/" + UUID.randomUUID() + "-eq-delete.parquet"; + final OutputFile deleteOutputFile = table.io().newOutputFile(deletePath); + + final EqualityDeleteWriter eqDeleteWriter = Parquet.writeDeletes(deleteOutputFile) + .forTable(table) + .rowSchema(deleteSchema) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .equalityFieldIds(1) + .buildEqualityWriter(); + try { + final GenericRecord deleteRecord = GenericRecord.create(deleteSchema); + deleteRecord.setField("order_id", 2); + eqDeleteWriter.write(deleteRecord); + } + finally { + eqDeleteWriter.close(); + } + + final DeleteFile eqDeleteFile = eqDeleteWriter.toDeleteFile(); + table.newRowDelta().addDeletes(eqDeleteFile).commit(); + } + + private void createTableWithPositionalDelete() throws IOException + { + final Table table = createBaseTable(); + final DataFile dataFile = writeDataFile(table); + table.newAppend().appendFile(dataFile).commit(); + + final String deletePath = table.location() + "/data/" + UUID.randomUUID() + "-pos-delete.parquet"; + final OutputFile deleteOutputFile = table.io().newOutputFile(deletePath); + + final PositionDeleteWriter posDeleteWriter = Parquet.writeDeletes(deleteOutputFile) + .forTable(table) + .createWriterFunc(GenericParquetWriter::create) + .rowSchema(tableSchema) + .overwrite() + .buildPositionWriter(); + try { + final PositionDelete posDelete = PositionDelete.create(); + final GenericRecord deleteRow = GenericRecord.create(tableSchema); + deleteRow.setField("order_id", 2); + deleteRow.setField("product", "Gadget"); + deleteRow.setField("amount", 20.0); + posDelete.set(dataFile.location(), 1L, deleteRow); + posDeleteWriter.write(posDelete); + } + finally { + posDeleteWriter.close(); + } + + final DeleteFile posDeleteFile = posDeleteWriter.toDeleteFile(); + table.newRowDelta().addDeletes(posDeleteFile).commit(); + } +}