From d7e8246804b7e6c5b726a419bc28f4ff01d68742 Mon Sep 17 00:00:00 2001 From: Anton Lin Date: Mon, 23 Mar 2026 00:26:06 +0100 Subject: [PATCH 1/3] Spark: fix _partition child ID collision with MAP/LIST columns in allUsedFieldIds BaseSparkScanBuilder.allUsedFieldIds() used TypeUtil.getProjectedIds() which omits MAP and LIST field IDs (it is designed for column projection, not collision avoidance). This caused _partition struct child IDs to be reassigned to the same IDs as MAP/LIST columns, triggering a NPE in PruneColumns.isStruct() during merge-on-read scans when the _partition metadata column is included in the projection. Fix: use TypeUtil.indexById() which indexes ALL field IDs recursively, matching the behavior of the pre-1.11 Spark 3.5 code that this replaced. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../spark/source/BaseSparkScanBuilder.java | 2 +- .../source/TestSparkMetadataColumns.java | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java index ec72a1505a66..be34eaeed5dc 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java @@ -230,7 +230,7 @@ private Optional findPartitionField(List f // collects used data field IDs across all known table schemas private Set allUsedFieldIds() { return table.schemas().values().stream() - .flatMap(tableSchema -> TypeUtil.getProjectedIds(tableSchema.asStruct()).stream()) + .flatMap(tableSchema -> TypeUtil.indexById(tableSchema.asStruct()).keySet().stream()) .collect(Collectors.toSet()); } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java index 68f6f17aaed5..f14ae0ab3d58 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -343,6 +343,57 @@ public void testRowLineageColumnsResolvedInV3OrHigher() { } } + /** + * Regression test for a NPE in PruneColumns.isStruct() caused by a _partition child field ID + * being assigned the same ID as a MAP/LIST column. + */ + @TestTemplate + public void testPartitionMetadataColumnWithMapColumnDoesNotNPE() throws IOException { + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); + + // Schema: id(1) and ts(2) are the only primitives before the MAP. + // getProjectedIds returns {1, 2, 4, 5} — omitting id=3 (MAP container). + // bucket(1, id) partition spec field id=1000 is reassigned to id=3, colliding with tags. + Schema mapSchema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "ts", Types.LongType.get()), + Types.NestedField.optional( + 3, + "tags", + Types.MapType.ofOptional(4, 5, Types.StringType.get(), Types.StringType.get()))); + PartitionSpec bucketSpec = PartitionSpec.builderFor(mapSchema).bucket("id", 1).build(); + + Map properties = Maps.newHashMap(); + properties.put(FORMAT_VERSION, String.valueOf(formatVersion)); + properties.put(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()); + properties.put(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)); + // merge-on-read: DELETE writes position delete files instead of rewriting data files. + // This routes through SupportsDelta which adds _partition to the scan projection. + properties.put("write.delete.mode", "merge-on-read"); + + String mapTableName = "test_map_partition_collision"; + TestTables.create( + Files.createTempDirectory(temp, "junit").toFile(), + mapTableName, + mapSchema, + bucketSpec, + properties); + + // Both rows in a single INSERT so they land in the same Parquet file. + // With both rows sharing a file, Spark uses merge-on-read which adds _partition to the scan + // projection, + // triggering BatchDataReader.open() → ReadConf → PruneColumns → NPE. + sql( + "INSERT INTO TABLE %s VALUES (1, 1000, map('env', 'prod')), (2, 9999999999999999, map('env', 'dev'))", + mapTableName); + + // ensure NPE is not thrown + sql("DELETE FROM %s WHERE ts < 9999999999999999", mapTableName); + assertThat(sql("SELECT id FROM %s", mapTableName)).hasSize(1); + } + private void createAndInitTable() throws IOException { Map properties = Maps.newHashMap(); properties.put(FORMAT_VERSION, String.valueOf(formatVersion)); From afb97acc05b13afc68a92573126bf9dcda87b449 Mon Sep 17 00:00:00 2001 From: Anton Lin Date: Tue, 24 Mar 2026 10:19:35 +0100 Subject: [PATCH 2/3] rename test + add list column test --- .../source/TestSparkMetadataColumns.java | 53 +++++++++++++++---- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java index f14ae0ab3d58..b8cdbcd523a5 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -343,18 +343,11 @@ public void testRowLineageColumnsResolvedInV3OrHigher() { } } - /** - * Regression test for a NPE in PruneColumns.isStruct() caused by a _partition child field ID - * being assigned the same ID as a MAP/LIST column. - */ @TestTemplate - public void testPartitionMetadataColumnWithMapColumnDoesNotNPE() throws IOException { + public void testPartitionMetadataColumnWithMapColumn() throws IOException { assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); assumeThat(formatVersion).isGreaterThanOrEqualTo(2); - // Schema: id(1) and ts(2) are the only primitives before the MAP. - // getProjectedIds returns {1, 2, 4, 5} — omitting id=3 (MAP container). - // bucket(1, id) partition spec field id=1000 is reassigned to id=3, colliding with tags. Schema mapSchema = new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), @@ -383,17 +376,55 @@ public void testPartitionMetadataColumnWithMapColumnDoesNotNPE() throws IOExcept // Both rows in a single INSERT so they land in the same Parquet file. // With both rows sharing a file, Spark uses merge-on-read which adds _partition to the scan - // projection, - // triggering BatchDataReader.open() → ReadConf → PruneColumns → NPE. + // projection. sql( "INSERT INTO TABLE %s VALUES (1, 1000, map('env', 'prod')), (2, 9999999999999999, map('env', 'dev'))", mapTableName); - // ensure NPE is not thrown sql("DELETE FROM %s WHERE ts < 9999999999999999", mapTableName); assertThat(sql("SELECT id FROM %s", mapTableName)).hasSize(1); } + @TestTemplate + public void testPartitionMetadataColumnWithListColumn() throws IOException { + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); + + Schema listSchema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "ts", Types.LongType.get()), + Types.NestedField.optional( + 3, "tags", Types.ListType.ofOptional(4, Types.StringType.get()))); + PartitionSpec bucketSpec = PartitionSpec.builderFor(listSchema).bucket("id", 1).build(); + + Map properties = Maps.newHashMap(); + properties.put(FORMAT_VERSION, String.valueOf(formatVersion)); + properties.put(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()); + properties.put(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)); + // merge-on-read: DELETE writes position delete files instead of rewriting data files. + // This routes through SupportsDelta which adds _partition to the scan projection. + properties.put("write.delete.mode", "merge-on-read"); + + String listTableName = "test_list_partition_collision"; + TestTables.create( + Files.createTempDirectory(temp, "junit").toFile(), + listTableName, + listSchema, + bucketSpec, + properties); + + // Both rows in a single INSERT so they land in the same Parquet file. + // With both rows sharing a file, Spark uses merge-on-read which adds _partition to the scan + // projection. + sql( + "INSERT INTO TABLE %s VALUES (1, 1000, array('prod')), (2, 9999999999999999, array('dev'))", + listTableName); + + sql("DELETE FROM %s WHERE ts < 9999999999999999", listTableName); + assertThat(sql("SELECT id FROM %s", listTableName)).hasSize(1); + } + private void createAndInitTable() throws IOException { Map properties = Maps.newHashMap(); properties.put(FORMAT_VERSION, String.valueOf(formatVersion)); From 455df5ecd55269beefe1b6d8d68fcb458f6cb77d Mon Sep 17 00:00:00 2001 From: Anton Lin Date: Wed, 25 Mar 2026 00:16:33 +0100 Subject: [PATCH 3/3] add `_partition` to final assert select query in regression tests --- .../apache/iceberg/spark/source/TestSparkMetadataColumns.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java index b8cdbcd523a5..7498f129b21b 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -382,7 +382,7 @@ public void testPartitionMetadataColumnWithMapColumn() throws IOException { mapTableName); sql("DELETE FROM %s WHERE ts < 9999999999999999", mapTableName); - assertThat(sql("SELECT id FROM %s", mapTableName)).hasSize(1); + assertThat(sql("SELECT id, _partition FROM %s", mapTableName)).hasSize(1); } @TestTemplate @@ -422,7 +422,7 @@ public void testPartitionMetadataColumnWithListColumn() throws IOException { listTableName); sql("DELETE FROM %s WHERE ts < 9999999999999999", listTableName); - assertThat(sql("SELECT id FROM %s", listTableName)).hasSize(1); + assertThat(sql("SELECT id, _partition FROM %s", listTableName)).hasSize(1); } private void createAndInitTable() throws IOException {