From 486652f410421dbb16997debdcad23a355908789 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 18 Mar 2026 14:20:39 +0800 Subject: [PATCH 1/4] Data: Add TCK tests for Metadata Columns in BaseFormatModelTests --- .../iceberg/data/BaseFormatModelTests.java | 422 +++++++++++++++++- .../flink/data/TestFlinkFormatModel.java | 25 ++ .../flink/data/TestFlinkFormatModel.java | 25 ++ .../flink/data/TestFlinkFormatModel.java | 25 ++ .../spark/data/TestSparkFormatModel.java | 28 ++ .../spark/data/TestSparkFormatModel.java | 28 ++ .../spark/data/TestSparkFormatModel.java | 28 ++ .../spark/data/TestSparkFormatModel.java | 28 ++ 8 files changed, 607 insertions(+), 2 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index e295b5fbc1bb..b758ac3a8b40 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -35,6 +35,8 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.TableProperties; @@ -56,6 +58,7 @@ import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -73,6 +76,11 @@ public abstract class BaseFormatModelTests { protected abstract void assertEquals(Schema schema, List expected, List actual); + protected abstract Object convertConstantToEngine(Types.NestedField field, Object value); + + protected abstract List convertToPartitionIdentity( + List actual, int index, Class clazz); + protected boolean supportsBatchReads() { return false; } @@ -92,13 +100,14 @@ protected boolean supportsBatchReads() { static final String FEATURE_CASE_SENSITIVE = "caseSensitive"; static final String FEATURE_SPLIT = "split"; static final String FEATURE_REUSE_CONTAINERS = "reuseContainers"; + static final String FEATURE_META_ROW_LINEAGE = "metaRowLineage"; private static final Map MISSING_FEATURES = Map.of( FileFormat.AVRO, new String[] {FEATURE_FILTER, FEATURE_CASE_SENSITIVE, FEATURE_SPLIT}, FileFormat.ORC, - new String[] {FEATURE_REUSE_CONTAINERS}); + new String[] {FEATURE_REUSE_CONTAINERS, FEATURE_META_ROW_LINEAGE}); private InMemoryFileIO fileIO; private EncryptedOutputFile encryptedFile; @@ -204,7 +213,8 @@ void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator d readRecords = ImmutableList.copyOf(reader); } - assertEquals(schema, convertToEngineRecords(genericRecords, schema), readRecords); + List list = convertToEngineRecords(genericRecords, schema); + assertEquals(schema, list, readRecords); } /** Write with engine type T, read with Generic Record */ @@ -628,6 +638,390 @@ void testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws .isInstanceOf(UnsupportedOperationException.class); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnsFilePathAndSpecId(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + String filePath = "test-data-file.parquet"; + int specId = 0; + Schema projectionSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.SPEC_ID); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.FILE_PATH.fieldId(), filePath, + MetadataColumns.SPEC_ID.fieldId(), specId); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.FILE_PATH.name(), filePath, + MetadataColumns.SPEC_ID.name(), specId)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowPosition(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + Schema projectionSchema = new Schema(MetadataColumns.ROW_POSITION); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.ROW_POSITION.name(), (long) i)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnIsDeleted(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + Schema projectionSchema = new Schema(MetadataColumns.IS_DELETED); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.IS_DELETED.name(), false)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowLinage(FileFormat fileFormat) throws IOException { + assumeSupports(fileFormat, FEATURE_META_ROW_LINEAGE); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + long baseRowId = 100L; + long fileSeqNumber = 5L; + Schema projectionSchema = + new Schema(MetadataColumns.ROW_ID, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.ROW_ID.fieldId(), baseRowId, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), fileSeqNumber); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + baseRowId + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + fileSeqNumber)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + PartitionSpec spec = PartitionSpec.builderFor(dataGenerator.schema()).identity("col_a").build(); + + Types.StructType partitionType = spec.partitionType(); + PartitionData partitionData = new PartitionData(partitionType); + partitionData.set(0, "test_col_a"); + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataGenerator.schema()) + .spec(spec) + .partition(partitionData) + .build(); + + List records = dataGenerator.generateRecords(); + try (writer) { + records.forEach(writer::write); + } + + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + partitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, partitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(records.size()); + assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) + .allMatch(s -> s.equals("test_col_a")); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionBucketTransform(FileFormat fileFormat) throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + PartitionSpec spec = PartitionSpec.builderFor(dataSchema).bucket("col_a", 4).build(); + + Types.StructType partitionType = spec.partitionType(); + PartitionData partitionData = new PartitionData(partitionType); + // bucket(4, 1) = 1 + partitionData.set(0, 1); + + List records = dataGenerator.generateRecords(); + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataSchema) + .spec(spec) + .partition(partitionData) + .build(); + + try (writer) { + records.forEach(writer::write); + } + + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + partitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, partitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(records.size()); + assertThat(convertToPartitionIdentity(readRecords, 0, Integer.class)).allMatch(s -> s == 1); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionEvolutionAddColumn(FileFormat fileFormat) throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + // Old spec: partition by col_a only (spec id = 0) + PartitionSpec oldSpec = PartitionSpec.builderFor(dataSchema).identity("col_a").build(); + + // New spec: partition by col_a + col_b (spec id = 1, simulates partition evolution) + PartitionSpec newSpec = + PartitionSpec.builderFor(dataSchema) + .withSpecId(1) + .identity("col_a") + .identity("col_b") + .build(); + + // Partition data for the old file (only col_a is set, col_b is absent) + PartitionData oldPartitionData = new PartitionData(oldSpec.partitionType()); + oldPartitionData.set(0, "test_data"); + + // Write data using the old spec + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataSchema) + .spec(oldSpec) + .partition(oldPartitionData) + .build(); + + List records = dataGenerator.generateRecords(); + + try (writer) { + records.forEach(writer::write); + } + + Types.StructType unifiedPartitionType = newSpec.partitionType(); + + // Build projection schema with PARTITION_COLUMN using the unified partition type + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + unifiedPartitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, oldPartitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(records.size()); + assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) + .allMatch(s -> s.equals("test_data")); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionEvolutionRemoveColumn(FileFormat fileFormat) + throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + PartitionSpec oldSpec = + PartitionSpec.builderFor(dataSchema).identity("col_a").identity("col_b").build(); + + PartitionSpec newSpec = + PartitionSpec.builderFor(dataSchema).withSpecId(1).identity("col_a").build(); + + // Partition data for the old file (both col_a and col_b are set) + PartitionData oldPartitionData = new PartitionData(oldSpec.partitionType()); + oldPartitionData.set(0, "test_col_a"); + oldPartitionData.set(1, 1); + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataSchema) + .spec(oldSpec) + .partition(oldPartitionData) + .build(); + + List records = dataGenerator.generateRecords(); + + try (writer) { + records.forEach(writer::write); + } + + // Use the new spec's partition type for projection (only col_a remains after evolution) + // This simulates reading an old file from the perspective of the new spec + Types.StructType newPartitionType = newSpec.partitionType(); + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + newPartitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, oldPartitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(records.size()); + assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) + .allMatch(s -> s.equals("test_col_a")); + } + private void readAndAssertGenericRecords( FileFormat fileFormat, Schema schema, List expected) throws IOException { InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); @@ -638,6 +1032,7 @@ private void readAndAssertGenericRecords( .build()) { readRecords = ImmutableList.copyOf(reader); } + DataTestHelpers.assertEquals(schema.asStruct(), expected, readRecords); } @@ -719,4 +1114,27 @@ private static String splitSizeProperty(FileFormat fileFormat) { "No split size property defined for format: " + fileFormat); }; } + + private Map convertConstantsToEngine( + Schema projectionSchema, Map idToConstant) { + return idToConstant.entrySet().stream() + .collect( + ImmutableMap.toImmutableMap( + Map.Entry::getKey, + entry -> + convertConstantToEngine( + projectionSchema.findField(entry.getKey()), entry.getValue()))); + } + + private Record partitionDataToRecord( + Types.StructType partitionType, PartitionData partitionData) { + Record record = GenericRecord.create(partitionType); + List fields = partitionType.fields(); + for (int i = 0; i < fields.size(); i++) { + Types.NestedField field = fields.get(i); + record.setField(field.name(), partitionData.get(i, field.type().typeId().javaClass())); + } + + return record; + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..b53768169f6e 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,25 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Types.NestedField field, Object value) { + return RowDataUtil.convertConstant(field.type(), value); + } + + @Override + protected List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (RowData row : actual) { + Object object = ((GenericRowData) row).getField(0); + if (object instanceof PartitionData partition) { + partitionIdentity.add(partition.get(index, clazz)); + } else { + throw new IllegalArgumentException("Not a partition data"); + } + } + + return partitionIdentity; + } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..b53768169f6e 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,25 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Types.NestedField field, Object value) { + return RowDataUtil.convertConstant(field.type(), value); + } + + @Override + protected List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (RowData row : actual) { + Object object = ((GenericRowData) row).getField(0); + if (object instanceof PartitionData partition) { + partitionIdentity.add(partition.get(index, clazz)); + } else { + throw new IllegalArgumentException("Not a partition data"); + } + } + + return partitionIdentity; + } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..b53768169f6e 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,25 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Types.NestedField field, Object value) { + return RowDataUtil.convertConstant(field.type(), value); + } + + @Override + protected List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (RowData row : actual) { + Object object = ((GenericRowData) row).getField(0); + if (object instanceof PartitionData partition) { + partitionIdentity.add(partition.get(index, clazz)); + } else { + throw new IllegalArgumentException("Not a partition data"); + } + } + + return partitionIdentity; + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..bb47e8de5f9e 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,8 +24,13 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +56,27 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (InternalRow row : actual) { + GenericInternalRow genericInternalRow = + (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); + Object value = genericInternalRow.genericGet(index); + if (clazz == String.class && value instanceof UTF8String) { + partitionIdentity.add(clazz.cast(value.toString())); + } else { + partitionIdentity.add(clazz.cast(value)); + } + } + + return partitionIdentity; + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..bb47e8de5f9e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,8 +24,13 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +56,27 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (InternalRow row : actual) { + GenericInternalRow genericInternalRow = + (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); + Object value = genericInternalRow.genericGet(index); + if (clazz == String.class && value instanceof UTF8String) { + partitionIdentity.add(clazz.cast(value.toString())); + } else { + partitionIdentity.add(clazz.cast(value)); + } + } + + return partitionIdentity; + } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..bb47e8de5f9e 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,8 +24,13 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +56,27 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (InternalRow row : actual) { + GenericInternalRow genericInternalRow = + (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); + Object value = genericInternalRow.genericGet(index); + if (clazz == String.class && value instanceof UTF8String) { + partitionIdentity.add(clazz.cast(value.toString())); + } else { + partitionIdentity.add(clazz.cast(value)); + } + } + + return partitionIdentity; + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..bb47e8de5f9e 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,8 +24,13 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +56,27 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (InternalRow row : actual) { + GenericInternalRow genericInternalRow = + (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); + Object value = genericInternalRow.genericGet(index); + if (clazz == String.class && value instanceof UTF8String) { + partitionIdentity.add(clazz.cast(value.toString())); + } else { + partitionIdentity.add(clazz.cast(value)); + } + } + + return partitionIdentity; + } } From 455a30d3ab9a8246e0d8b2d68886a9c7dc985964 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Tue, 24 Mar 2026 19:33:43 +0800 Subject: [PATCH 2/4] Address Comment --- .../iceberg/data/BaseFormatModelTests.java | 107 +++++++++++++++--- 1 file changed, 93 insertions(+), 14 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index b758ac3a8b40..ed1e14bca1f0 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; +import org.apache.hadoop.util.Lists; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -794,6 +795,97 @@ void testReadMetadataColumnRowLinage(FileFormat fileFormat) throws IOException { assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowLinageExistValue(FileFormat fileFormat) throws IOException { + assumeSupports(fileFormat, FEATURE_META_ROW_LINEAGE); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + Schema writeSchema = MetadataColumns.schemaWithRowLineage(dataSchema); + + List baseRecords = dataGenerator.generateRecords(); + List writeRecords = Lists.newArrayListWithExpectedSize(baseRecords.size()); + for (int i = 0; i < baseRecords.size(); i++) { + Record base = baseRecords.get(i); + Record rec = GenericRecord.create(writeSchema); + for (Types.NestedField col : dataSchema.columns()) { + rec.setField(col.name(), base.getField(col.name())); + } + + if (i % 2 == 0) { + rec.setField(MetadataColumns.ROW_ID.name(), 555L + i); + rec.setField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 7L); + } else { + rec.setField(MetadataColumns.ROW_ID.name(), null); + rec.setField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), null); + } + + writeRecords.add(rec); + } + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(writeSchema) + .spec(PartitionSpec.unpartitioned()) + .build(); + + try (writer) { + writeRecords.forEach(writer::write); + } + + long baseRowId = 100L; + long fileSeqNumber = 5L; + Schema projectionSchema = + new Schema(MetadataColumns.ROW_ID, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.ROW_ID.fieldId(), baseRowId, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), fileSeqNumber); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + // Expected results: + // - Even rows (explicit values): _row_id = 555+i, _last_updated_sequence_number = 7 + // - Odd rows (null values): _row_id = baseRowId+pos, _last_updated_sequence_number = + // fileSeqNumber + List expected = + IntStream.range(0, baseRecords.size()) + .mapToObj( + i -> { + if (i % 2 == 0) { + return GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + 555L + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + 7L); + } else { + return GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + baseRowId + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + fileSeqNumber); + } + }) + .toList(); + + assertThat(readRecords).hasSize(baseRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + @ParameterizedTest @FieldSource("FILE_FORMATS") void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOException { @@ -808,8 +900,7 @@ void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOExc DataWriter writer = FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) .schema(dataGenerator.schema()) - .spec(spec) - .partition(partitionData) + .spec(PartitionSpec.unpartitioned()) .build(); List records = dataGenerator.generateRecords(); @@ -1125,16 +1216,4 @@ private Map convertConstantsToEngine( convertConstantToEngine( projectionSchema.findField(entry.getKey()), entry.getValue()))); } - - private Record partitionDataToRecord( - Types.StructType partitionType, PartitionData partitionData) { - Record record = GenericRecord.create(partitionType); - List fields = partitionType.fields(); - for (int i = 0; i < fields.size(); i++) { - Types.NestedField field = fields.get(i); - record.setField(field.name(), partitionData.get(i, field.type().typeId().javaClass())); - } - - return record; - } } From 2fa854a539d036ddbabd493d945575e61dbd859a Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Tue, 24 Mar 2026 19:59:12 +0800 Subject: [PATCH 3/4] fix ci --- .../test/java/org/apache/iceberg/data/BaseFormatModelTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index ed1e14bca1f0..aa073866a7e5 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; -import org.apache.hadoop.util.Lists; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -60,6 +59,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; From 53c0dd21de3d219ab000860304cf9e60cb06c737 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Tue, 24 Mar 2026 22:42:25 +0800 Subject: [PATCH 4/4] remove convertToPartitionIdentity and convertConstantToEngine --- .../iceberg/data/BaseFormatModelTests.java | 100 ++++++++++++++---- .../flink/data/TestFlinkFormatModel.java | 23 +--- .../flink/data/TestFlinkFormatModel.java | 23 +--- .../flink/data/TestFlinkFormatModel.java | 23 +--- .../spark/data/TestSparkFormatModel.java | 26 +---- .../spark/data/TestSparkFormatModel.java | 26 +---- .../spark/data/TestSparkFormatModel.java | 26 +---- .../spark/data/TestSparkFormatModel.java | 26 +---- 8 files changed, 96 insertions(+), 177 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index aa073866a7e5..8af87ec90e0c 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.stream.IntStream; import org.apache.iceberg.DataFile; @@ -39,6 +40,7 @@ import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.TableProperties; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; @@ -60,6 +62,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -77,10 +80,7 @@ public abstract class BaseFormatModelTests { protected abstract void assertEquals(Schema schema, List expected, List actual); - protected abstract Object convertConstantToEngine(Types.NestedField field, Object value); - - protected abstract List convertToPartitionIdentity( - List actual, int index, Class clazz); + protected abstract Object getFieldFromEngineRow(T engineRow, int index); protected boolean supportsBatchReads() { return false; @@ -930,9 +930,17 @@ void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOExc readRecords = ImmutableList.copyOf(reader); } + Record partitionRecord = structLikeToRecord(partitionData, partitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + assertThat(readRecords).hasSize(records.size()); - assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) - .allMatch(s -> s.equals("test_col_a")); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } @ParameterizedTest @@ -983,8 +991,17 @@ void testReadMetadataColumnPartitionBucketTransform(FileFormat fileFormat) throw readRecords = ImmutableList.copyOf(reader); } + Record partitionRecord = structLikeToRecord(partitionData, partitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + assertThat(readRecords).hasSize(records.size()); - assertThat(convertToPartitionIdentity(readRecords, 0, Integer.class)).allMatch(s -> s == 1); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } @ParameterizedTest @@ -1047,9 +1064,17 @@ void testReadMetadataColumnPartitionEvolutionAddColumn(FileFormat fileFormat) th readRecords = ImmutableList.copyOf(reader); } + Record partitionRecord = structLikeToRecord(oldPartitionData, unifiedPartitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + assertThat(readRecords).hasSize(records.size()); - assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) - .allMatch(s -> s.equals("test_data")); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } @ParameterizedTest @@ -1108,9 +1133,17 @@ void testReadMetadataColumnPartitionEvolutionRemoveColumn(FileFormat fileFormat) readRecords = ImmutableList.copyOf(reader); } + Record partitionRecord = structLikeToRecord(oldPartitionData, newPartitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + assertThat(readRecords).hasSize(records.size()); - assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) - .allMatch(s -> s.equals("test_col_a")); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } private void readAndAssertGenericRecords( @@ -1208,12 +1241,43 @@ private static String splitSizeProperty(FileFormat fileFormat) { private Map convertConstantsToEngine( Schema projectionSchema, Map idToConstant) { - return idToConstant.entrySet().stream() - .collect( - ImmutableMap.toImmutableMap( - Map.Entry::getKey, - entry -> - convertConstantToEngine( - projectionSchema.findField(entry.getKey()), entry.getValue()))); + List fields = + idToConstant.keySet().stream() + .map(projectionSchema::findField) + .filter(Objects::nonNull) + .toList(); + + Schema constantSchema = new Schema(fields); + Record record = GenericRecord.create(constantSchema); + for (Types.NestedField field : fields) { + Object value = idToConstant.get(field.fieldId()); + if (value instanceof StructLike structLike && field.type().isStructType()) { + record.setField(field.name(), structLikeToRecord(structLike, field.type().asStructType())); + } else { + record.setField(field.name(), value); + } + } + + T engineRow = convertToEngine(record, constantSchema); + Map result = Maps.newHashMap(); + for (int i = 0; i < fields.size(); i++) { + result.put(fields.get(i).fieldId(), getFieldFromEngineRow(engineRow, i)); + } + + return result; + } + + private static Record structLikeToRecord(StructLike structLike, Types.StructType structType) { + Record record = GenericRecord.create(structType); + int sourceSize = structLike.size(); + for (int i = 0; i < structType.fields().size(); i++) { + if (i < sourceSize) { + record.set(i, structLike.get(i, Object.class)); + } else { + record.set(i, null); + } + } + + return record; } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index b53768169f6e..8853ac6e3f16 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -21,15 +21,12 @@ import java.util.List; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -54,23 +51,7 @@ protected void assertEquals(Schema schema, List expected, List } @Override - protected Object convertConstantToEngine(Types.NestedField field, Object value) { - return RowDataUtil.convertConstant(field.type(), value); - } - - @Override - protected List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (RowData row : actual) { - Object object = ((GenericRowData) row).getField(0); - if (object instanceof PartitionData partition) { - partitionIdentity.add(partition.get(index, clazz)); - } else { - throw new IllegalArgumentException("Not a partition data"); - } - } - - return partitionIdentity; + protected Object getFieldFromEngineRow(RowData engineRow, int index) { + return ((GenericRowData) engineRow).getField(index); } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index b53768169f6e..8853ac6e3f16 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -21,15 +21,12 @@ import java.util.List; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -54,23 +51,7 @@ protected void assertEquals(Schema schema, List expected, List } @Override - protected Object convertConstantToEngine(Types.NestedField field, Object value) { - return RowDataUtil.convertConstant(field.type(), value); - } - - @Override - protected List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (RowData row : actual) { - Object object = ((GenericRowData) row).getField(0); - if (object instanceof PartitionData partition) { - partitionIdentity.add(partition.get(index, clazz)); - } else { - throw new IllegalArgumentException("Not a partition data"); - } - } - - return partitionIdentity; + protected Object getFieldFromEngineRow(RowData engineRow, int index) { + return ((GenericRowData) engineRow).getField(index); } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index b53768169f6e..8853ac6e3f16 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -21,15 +21,12 @@ import java.util.List; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -54,23 +51,7 @@ protected void assertEquals(Schema schema, List expected, List } @Override - protected Object convertConstantToEngine(Types.NestedField field, Object value) { - return RowDataUtil.convertConstant(field.type(), value); - } - - @Override - protected List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (RowData row : actual) { - Object object = ((GenericRowData) row).getField(0); - if (object instanceof PartitionData partition) { - partitionIdentity.add(partition.get(index, clazz)); - } else { - throw new IllegalArgumentException("Not a partition data"); - } - } - - return partitionIdentity; + protected Object getFieldFromEngineRow(RowData engineRow, int index) { + return ((GenericRowData) engineRow).getField(index); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index bb47e8de5f9e..f8c5acededd3 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,13 +24,9 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -58,25 +54,7 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (InternalRow row : actual) { - GenericInternalRow genericInternalRow = - (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); - Object value = genericInternalRow.genericGet(index); - if (clazz == String.class && value instanceof UTF8String) { - partitionIdentity.add(clazz.cast(value.toString())); - } else { - partitionIdentity.add(clazz.cast(value)); - } - } - - return partitionIdentity; + protected Object getFieldFromEngineRow(InternalRow engineRow, int index) { + return ((GenericInternalRow) engineRow).genericGet(index); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index bb47e8de5f9e..f8c5acededd3 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,13 +24,9 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -58,25 +54,7 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (InternalRow row : actual) { - GenericInternalRow genericInternalRow = - (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); - Object value = genericInternalRow.genericGet(index); - if (clazz == String.class && value instanceof UTF8String) { - partitionIdentity.add(clazz.cast(value.toString())); - } else { - partitionIdentity.add(clazz.cast(value)); - } - } - - return partitionIdentity; + protected Object getFieldFromEngineRow(InternalRow engineRow, int index) { + return ((GenericInternalRow) engineRow).genericGet(index); } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index bb47e8de5f9e..f8c5acededd3 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,13 +24,9 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -58,25 +54,7 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (InternalRow row : actual) { - GenericInternalRow genericInternalRow = - (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); - Object value = genericInternalRow.genericGet(index); - if (clazz == String.class && value instanceof UTF8String) { - partitionIdentity.add(clazz.cast(value.toString())); - } else { - partitionIdentity.add(clazz.cast(value)); - } - } - - return partitionIdentity; + protected Object getFieldFromEngineRow(InternalRow engineRow, int index) { + return ((GenericInternalRow) engineRow).genericGet(index); } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index bb47e8de5f9e..f8c5acededd3 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,13 +24,9 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -58,25 +54,7 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (InternalRow row : actual) { - GenericInternalRow genericInternalRow = - (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); - Object value = genericInternalRow.genericGet(index); - if (clazz == String.class && value instanceof UTF8String) { - partitionIdentity.add(clazz.cast(value.toString())); - } else { - partitionIdentity.add(clazz.cast(value)); - } - } - - return partitionIdentity; + protected Object getFieldFromEngineRow(InternalRow engineRow, int index) { + return ((GenericInternalRow) engineRow).genericGet(index); } }