Skip to content

Spark: fix NPE thrown for MAP/LIST columns on DELETE, UPDATE, and MERGE operations#15726

Merged
singhpk234 merged 3 commits intoapache:mainfrom
antonlin1:fix/partition-id-collision-map-list-columns
Mar 25, 2026
Merged

Spark: fix NPE thrown for MAP/LIST columns on DELETE, UPDATE, and MERGE operations#15726
singhpk234 merged 3 commits intoapache:mainfrom
antonlin1:fix/partition-id-collision-map-list-columns

Conversation

@antonlin1
Copy link
Contributor

@antonlin1 antonlin1 commented Mar 22, 2026

Partitioned tables with MAP or LIST columns throws NPE on DELETE, UPDATE, and MERGE operations when running Spark 4.1 + Iceberg 1.11 (reproduced with iceberg-spark-runtime-4.1_2.13:1.11.0-20260306.003105-70):

Cannot invoke "org.apache.iceberg.types.Types$NestedField.type()" because "expected" is null
	at org.apache.iceberg.parquet.PruneColumns.isStruct(PruneColumns.java:173)
	at org.apache.iceberg.parquet.PruneColumns.message(PruneColumns.java:61)
	at org.apache.iceberg.parquet.PruneColumns.message(PruneColumns.java:38)
	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:48)
	at org.apache.iceberg.parquet.ParquetSchemaUtil.pruneColumns(ParquetSchemaUtil.java:134)
	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:82)
	at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
	at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
	at org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
	at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:120)
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:42)
	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:140)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:148)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:186)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:72)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:72)
	at scala.Option.exists(Option.scala:406)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:103)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

allUsedFieldIds() (introduced in #15297) uses TypeUtil.getProjectedIds() to build the set of field IDs already in use. This omits MAP and LIST container IDs, creating gaps in the ID space. A _partition child field gets reassigned into one of these gaps, landing on the same ID as a MAP or LIST column.

During the Parquet scan, PruneColumns finds this ID in selectedIds (via _partition) and looks it up in the expected Iceberg schema — but the ID belongs to a nested _partition child, not a top-level data column, so expected.field(id) returns null. isStruct() then NPEs on that null.

This regression did not exist in 1.10.x where the equivalent code used TypeUtil.indexById() which correctly indexes all field IDs including MAP/LIST containers.

Fix

Replace TypeUtil.getProjectedIds() with TypeUtil.indexById().keySet() in allUsedFieldIds() in order to take MAP + LIST columns into account

@github-actions github-actions bot added the spark label Mar 22, 2026
@antonlin1 antonlin1 force-pushed the fix/partition-id-collision-map-list-columns branch 8 times, most recently from 09f6640 to 831b8d5 Compare March 23, 2026 14:51
@antonlin1 antonlin1 marked this pull request as ready for review March 23, 2026 14:57
@antonlin1 antonlin1 changed the title Spark: fix _partition child ID collision with MAP/LIST columns in allUsedFieldIds Spark: fix NPE thrown for MAP/LIST columns on DELETE, UPDATE, and MERGE operations Mar 23, 2026
@antonlin1 antonlin1 force-pushed the fix/partition-id-collision-map-list-columns branch from 831b8d5 to c605e64 Compare March 23, 2026 15:57
…UsedFieldIds

BaseSparkScanBuilder.allUsedFieldIds() used TypeUtil.getProjectedIds() which
omits MAP and LIST field IDs (it is designed for column projection, not collision
avoidance). This caused _partition struct child IDs to be reassigned to the same
IDs as MAP/LIST columns, triggering a NPE in PruneColumns.isStruct() during
merge-on-read scans when the _partition metadata column is included in the
projection.

Fix: use TypeUtil.indexById() which indexes ALL field IDs recursively, matching
the behavior of the pre-1.11 Spark 3.5 code that this replaced.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@antonlin1 antonlin1 force-pushed the fix/partition-id-collision-map-list-columns branch from c605e64 to d7e8246 Compare March 23, 2026 16:07
Copy link
Contributor

@huaxingao huaxingao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho
Copy link
Member

Oh, also can you add a test for List as well?

Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

+1 to @szehon-ho's suggestion

@antonlin1
Copy link
Contributor Author

@szehon-ho added list column unit test here: afb97ac

@singhpk234 singhpk234 merged commit 63ecc7c into apache:main Mar 25, 2026
24 checks passed
@singhpk234
Copy link
Contributor

Thanks for the change @antonlin1 ! Thank you for review @huaxingao @szehon-ho !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants