diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java index ec72a1505a66..be34eaeed5dc 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java @@ -230,7 +230,7 @@ private Optional findPartitionField(List f // collects used data field IDs across all known table schemas private Set allUsedFieldIds() { return table.schemas().values().stream() - .flatMap(tableSchema -> TypeUtil.getProjectedIds(tableSchema.asStruct()).stream()) + .flatMap(tableSchema -> TypeUtil.indexById(tableSchema.asStruct()).keySet().stream()) .collect(Collectors.toSet()); } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java index 68f6f17aaed5..7498f129b21b 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -343,6 +343,88 @@ public void testRowLineageColumnsResolvedInV3OrHigher() { } } + @TestTemplate + public void testPartitionMetadataColumnWithMapColumn() throws IOException { + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); + + Schema mapSchema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "ts", Types.LongType.get()), + Types.NestedField.optional( + 3, + "tags", + Types.MapType.ofOptional(4, 5, Types.StringType.get(), Types.StringType.get()))); + PartitionSpec bucketSpec = PartitionSpec.builderFor(mapSchema).bucket("id", 1).build(); + + Map properties = Maps.newHashMap(); + properties.put(FORMAT_VERSION, String.valueOf(formatVersion)); + properties.put(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()); + properties.put(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)); + // merge-on-read: DELETE writes position delete files instead of rewriting data files. + // This routes through SupportsDelta which adds _partition to the scan projection. + properties.put("write.delete.mode", "merge-on-read"); + + String mapTableName = "test_map_partition_collision"; + TestTables.create( + Files.createTempDirectory(temp, "junit").toFile(), + mapTableName, + mapSchema, + bucketSpec, + properties); + + // Both rows in a single INSERT so they land in the same Parquet file. + // With both rows sharing a file, Spark uses merge-on-read which adds _partition to the scan + // projection. + sql( + "INSERT INTO TABLE %s VALUES (1, 1000, map('env', 'prod')), (2, 9999999999999999, map('env', 'dev'))", + mapTableName); + + sql("DELETE FROM %s WHERE ts < 9999999999999999", mapTableName); + assertThat(sql("SELECT id, _partition FROM %s", mapTableName)).hasSize(1); + } + + @TestTemplate + public void testPartitionMetadataColumnWithListColumn() throws IOException { + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); + + Schema listSchema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "ts", Types.LongType.get()), + Types.NestedField.optional( + 3, "tags", Types.ListType.ofOptional(4, Types.StringType.get()))); + PartitionSpec bucketSpec = PartitionSpec.builderFor(listSchema).bucket("id", 1).build(); + + Map properties = Maps.newHashMap(); + properties.put(FORMAT_VERSION, String.valueOf(formatVersion)); + properties.put(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()); + properties.put(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)); + // merge-on-read: DELETE writes position delete files instead of rewriting data files. + // This routes through SupportsDelta which adds _partition to the scan projection. + properties.put("write.delete.mode", "merge-on-read"); + + String listTableName = "test_list_partition_collision"; + TestTables.create( + Files.createTempDirectory(temp, "junit").toFile(), + listTableName, + listSchema, + bucketSpec, + properties); + + // Both rows in a single INSERT so they land in the same Parquet file. + // With both rows sharing a file, Spark uses merge-on-read which adds _partition to the scan + // projection. + sql( + "INSERT INTO TABLE %s VALUES (1, 1000, array('prod')), (2, 9999999999999999, array('dev'))", + listTableName); + + sql("DELETE FROM %s WHERE ts < 9999999999999999", listTableName); + assertThat(sql("SELECT id, _partition FROM %s", listTableName)).hasSize(1); + } + private void createAndInitTable() throws IOException { Map properties = Maps.newHashMap(); properties.put(FORMAT_VERSION, String.valueOf(formatVersion));