diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index e295b5fbc1bb..923578975d18 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -35,8 +35,11 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.TableProperties; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; @@ -48,6 +51,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.formats.FileWriterBuilder; import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.inmemory.InMemoryFileIO; @@ -56,6 +60,9 @@ import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -73,6 +80,8 @@ public abstract class BaseFormatModelTests { protected abstract void assertEquals(Schema schema, List expected, List actual); + protected abstract Object convertConstantToEngine(Type type, Object value); + protected boolean supportsBatchReads() { return false; } @@ -91,14 +100,18 @@ protected boolean supportsBatchReads() { static final String FEATURE_FILTER = "filter"; static final String FEATURE_CASE_SENSITIVE = "caseSensitive"; static final String FEATURE_SPLIT = "split"; + static final String FEATURE_READER_DEFAULT = "readerDefault"; static final String FEATURE_REUSE_CONTAINERS = "reuseContainers"; + static final String FEATURE_META_ROW_LINEAGE = "metaRowLineage"; private static final Map MISSING_FEATURES = Map.of( FileFormat.AVRO, new String[] {FEATURE_FILTER, FEATURE_CASE_SENSITIVE, FEATURE_SPLIT}, FileFormat.ORC, - new String[] {FEATURE_REUSE_CONTAINERS}); + new String[] { + FEATURE_REUSE_CONTAINERS, FEATURE_META_ROW_LINEAGE, FEATURE_READER_DEFAULT + }); private InMemoryFileIO fileIO; private EncryptedOutputFile encryptedFile; @@ -609,6 +622,69 @@ void testReaderBuilderReuseContainers(FileFormat fileFormat) throws IOException reuseRecords.forEach(r -> assertThat(r).isSameAs(reuseRecords.get(0))); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReaderSchemaEvolutionNewColumnWithDefault(FileFormat fileFormat) throws IOException { + + assumeSupports(fileFormat, FEATURE_READER_DEFAULT); + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema writeSchema = dataGenerator.schema(); + + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, writeSchema, genericRecords); + + String defaultStringValue = "default_value"; + int defaultIntValue = 42; + + Schema evolvedSchema = + new Schema( + Types.NestedField.required(1, "col_a", Types.StringType.get()), + Types.NestedField.required(2, "col_b", Types.IntegerType.get()), + Types.NestedField.required(3, "col_c", Types.LongType.get()), + Types.NestedField.required(4, "col_d", Types.FloatType.get()), + Types.NestedField.required(5, "col_e", Types.DoubleType.get()), + Types.NestedField.required("col_f") + .withId(6) + .ofType(Types.StringType.get()) + .withInitialDefault(Literal.of(defaultStringValue)) + .build(), + Types.NestedField.optional("col_g") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(Literal.of(defaultIntValue)) + .build()); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(evolvedSchema) + .engineProjection(engineSchema(evolvedSchema)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(genericRecords.size()); + + List expectedGenericRecords = + genericRecords.stream() + .map( + record -> { + Record expected = GenericRecord.create(evolvedSchema); + for (Types.NestedField col : writeSchema.columns()) { + expected.setField(col.name(), record.getField(col.name())); + } + + expected.setField("col_f", defaultStringValue); + expected.setField("col_g", defaultIntValue); + return expected; + }) + .toList(); + + List expectedEngineRecords = convertToEngineRecords(expectedGenericRecords, evolvedSchema); + assertEquals(evolvedSchema, expectedEngineRecords, readRecords); + } + @ParameterizedTest @FieldSource("FILE_FORMATS") void testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws IOException { @@ -628,6 +704,450 @@ void testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws .isInstanceOf(UnsupportedOperationException.class); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnsFilePathAndSpecId(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + String filePath = "test-data-file.parquet"; + int specId = 0; + Schema projectionSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.SPEC_ID); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.FILE_PATH.fieldId(), filePath, + MetadataColumns.SPEC_ID.fieldId(), specId); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.FILE_PATH.name(), filePath, + MetadataColumns.SPEC_ID.name(), specId)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowPosition(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + Schema projectionSchema = new Schema(MetadataColumns.ROW_POSITION); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.ROW_POSITION.name(), (long) i)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnIsDeleted(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + Schema projectionSchema = new Schema(MetadataColumns.IS_DELETED); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.IS_DELETED.name(), false)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowLinage(FileFormat fileFormat) throws IOException { + assumeSupports(fileFormat, FEATURE_META_ROW_LINEAGE); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + long baseRowId = 100L; + long fileSeqNumber = 5L; + Schema projectionSchema = + new Schema(MetadataColumns.ROW_ID, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.ROW_ID.fieldId(), baseRowId, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), fileSeqNumber); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + baseRowId + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + fileSeqNumber)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowLinageExistValue(FileFormat fileFormat) throws IOException { + assumeSupports(fileFormat, FEATURE_META_ROW_LINEAGE); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + Schema writeSchema = MetadataColumns.schemaWithRowLineage(dataSchema); + + List baseRecords = dataGenerator.generateRecords(); + List writeRecords = Lists.newArrayListWithExpectedSize(baseRecords.size()); + for (int i = 0; i < baseRecords.size(); i++) { + Record base = baseRecords.get(i); + Record rec = GenericRecord.create(writeSchema); + for (Types.NestedField col : dataSchema.columns()) { + rec.setField(col.name(), base.getField(col.name())); + } + + if (i % 2 == 0) { + rec.setField(MetadataColumns.ROW_ID.name(), 555L + i); + rec.setField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 7L); + } else { + rec.setField(MetadataColumns.ROW_ID.name(), null); + rec.setField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), null); + } + + writeRecords.add(rec); + } + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(writeSchema) + .spec(PartitionSpec.unpartitioned()) + .build(); + + try (writer) { + writeRecords.forEach(writer::write); + } + + long baseRowId = 100L; + long fileSeqNumber = 5L; + Schema projectionSchema = + new Schema(MetadataColumns.ROW_ID, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.ROW_ID.fieldId(), baseRowId, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), fileSeqNumber); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + // Expected results: + // - Even rows (explicit values): _row_id = 555+i, _last_updated_sequence_number = 7 + // - Odd rows (null values): _row_id = baseRowId+pos, _last_updated_sequence_number = + // fileSeqNumber + List expected = + IntStream.range(0, baseRecords.size()) + .mapToObj( + i -> { + if (i % 2 == 0) { + return GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + 555L + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + 7L); + } else { + return GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + baseRowId + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + fileSeqNumber); + } + }) + .toList(); + + assertThat(readRecords).hasSize(baseRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + PartitionSpec spec = PartitionSpec.builderFor(dataGenerator.schema()).identity("col_a").build(); + + Types.StructType partitionType = spec.partitionType(); + PartitionData partitionData = new PartitionData(partitionType); + partitionData.set(0, "test_col_a"); + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataGenerator.schema()) + .spec(PartitionSpec.unpartitioned()) + .build(); + + List records = dataGenerator.generateRecords(); + try (writer) { + records.forEach(writer::write); + } + + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + partitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, partitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + Record partitionRecord = structLikeToRecord(partitionData, partitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + + assertThat(readRecords).hasSize(records.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionEvolutionAddColumn(FileFormat fileFormat) throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + // Old spec: partition by col_a only (spec id = 0) + PartitionSpec oldSpec = PartitionSpec.builderFor(dataSchema).identity("col_a").build(); + + // New spec: partition by col_a + col_b (spec id = 1, simulates partition evolution) + PartitionSpec newSpec = + PartitionSpec.builderFor(dataSchema) + .withSpecId(1) + .identity("col_a") + .identity("col_b") + .build(); + + // Partition data for the old file (only col_a is set, col_b is absent) + PartitionData oldPartitionData = new PartitionData(oldSpec.partitionType()); + oldPartitionData.set(0, "test_data"); + + // Write data using the old spec + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataSchema) + .spec(PartitionSpec.unpartitioned()) + .build(); + + List records = dataGenerator.generateRecords(); + + try (writer) { + records.forEach(writer::write); + } + + Types.StructType unifiedPartitionType = newSpec.partitionType(); + + // Build projection schema with PARTITION_COLUMN using the unified partition type + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + unifiedPartitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, oldPartitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + Record partitionRecord = structLikeToRecord(oldPartitionData, unifiedPartitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + + assertThat(readRecords).hasSize(records.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionEvolutionRemoveColumn(FileFormat fileFormat) + throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + PartitionSpec oldSpec = + PartitionSpec.builderFor(dataSchema).identity("col_a").identity("col_b").build(); + + PartitionSpec newSpec = + PartitionSpec.builderFor(dataSchema).withSpecId(1).identity("col_a").build(); + + // Partition data for the old file (both col_a and col_b are set) + PartitionData oldPartitionData = new PartitionData(oldSpec.partitionType()); + oldPartitionData.set(0, "test_col_a"); + oldPartitionData.set(1, 1); + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataSchema) + .spec(PartitionSpec.unpartitioned()) + .build(); + + List records = dataGenerator.generateRecords(); + + try (writer) { + records.forEach(writer::write); + } + + // Use the new spec's partition type for projection (only col_a remains after evolution) + // This simulates reading an old file from the perspective of the new spec + Types.StructType newPartitionType = newSpec.partitionType(); + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + newPartitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, oldPartitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + Record partitionRecord = structLikeToRecord(oldPartitionData, newPartitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + + assertThat(readRecords).hasSize(records.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + private void readAndAssertGenericRecords( FileFormat fileFormat, Schema schema, List expected) throws IOException { InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); @@ -638,6 +1158,7 @@ private void readAndAssertGenericRecords( .build()) { readRecords = ImmutableList.copyOf(reader); } + DataTestHelpers.assertEquals(schema.asStruct(), expected, readRecords); } @@ -719,4 +1240,30 @@ private static String splitSizeProperty(FileFormat fileFormat) { "No split size property defined for format: " + fileFormat); }; } + + private Map convertConstantsToEngine( + Schema projectionSchema, Map idToConstant) { + return idToConstant.entrySet().stream() + .collect( + ImmutableMap.toImmutableMap( + Map.Entry::getKey, + entry -> + convertConstantToEngine( + projectionSchema.findType(entry.getKey()), entry.getValue()))); + } + + private static Record structLikeToRecord(StructLike structLike, Types.StructType structType) { + Record record = GenericRecord.create(structType); + int sourceSize = structLike.size(); + for (int i = 0; i < structType.fields().size(); i++) { + if (i < sourceSize) { + record.set(i, structLike.get(i, Object.class)); + } else { + Types.NestedField field = structType.fields().get(i); + record.set(i, field.initialDefault()); + } + } + + return record; + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..1f0fe70ac53b 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,26 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Type type, Object value) { + if (value instanceof PartitionData partitionData) { + Types.StructType structType = type.asStructType(); + List fields = structType.fields(); + GenericRowData rowData = new GenericRowData(fields.size()); + int sourceSize = partitionData.size(); + for (int i = 0; i < fields.size(); i++) { + if (i < sourceSize) { + Object fieldValue = partitionData.get(i, Object.class); + rowData.setField(i, convertConstantToEngine(fields.get(i).type(), fieldValue)); + } else { + rowData.setField(i, null); + } + } + + return rowData; + } + + return RowDataUtil.convertConstant(type, value); + } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..1f0fe70ac53b 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,26 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Type type, Object value) { + if (value instanceof PartitionData partitionData) { + Types.StructType structType = type.asStructType(); + List fields = structType.fields(); + GenericRowData rowData = new GenericRowData(fields.size()); + int sourceSize = partitionData.size(); + for (int i = 0; i < fields.size(); i++) { + if (i < sourceSize) { + Object fieldValue = partitionData.get(i, Object.class); + rowData.setField(i, convertConstantToEngine(fields.get(i).type(), fieldValue)); + } else { + rowData.setField(i, null); + } + } + + return rowData; + } + + return RowDataUtil.convertConstant(type, value); + } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..1f0fe70ac53b 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,26 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Type type, Object value) { + if (value instanceof PartitionData partitionData) { + Types.StructType structType = type.asStructType(); + List fields = structType.fields(); + GenericRowData rowData = new GenericRowData(fields.size()); + int sourceSize = partitionData.size(); + for (int i = 0; i < fields.size(); i++) { + if (i < sourceSize) { + Object fieldValue = partitionData.get(i, Object.class); + rowData.setField(i, convertConstantToEngine(fields.get(i).type(), fieldValue)); + } else { + rowData.setField(i, null); + } + } + + return rowData; + } + + return RowDataUtil.convertConstant(type, value); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..291bb2bca4f5 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -25,6 +25,8 @@ import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Type; import org.apache.spark.sql.catalyst.InternalRow; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +53,9 @@ protected void assertEquals(Schema schema, List expected, List { @@ -51,4 +53,9 @@ protected void assertEquals(Schema schema, List expected, List { @@ -51,4 +53,9 @@ protected void assertEquals(Schema schema, List expected, List { @@ -51,4 +53,9 @@ protected void assertEquals(Schema schema, List expected, List